2 Commits
v1.0.0 ... main

Author SHA1 Message Date
47fbee4b8b 更新tracer:基于kprobe,实现动态系统调用计数
All checks were successful
CI / lint-and-build (push) Successful in 2m33s
CD / deploy (push) Successful in 1m4s
2026-06-16 01:23:13 +08:00
81a8573e51 添加测试功能:TCP 写入服务端、单次发送客户端、压力测试客户端、kprobe Hello World基础验证
All checks were successful
CI / lint-and-build (push) Successful in 2m35s
CD / deploy (push) Successful in 1m6s
2026-06-15 20:23:14 +08:00
6 changed files with 244 additions and 71 deletions

View File

@@ -1,74 +1,55 @@
"""基于 eBPF 的系统调用计数器。 """基于 eBPF kprobe 的系统调用计数器。
将一段 BPF 程序挂载到 raw_syscalls:sys_enter 跟踪点,按系统调用号在 为 config/monitors.json 中配置的每个系统调用,动态生成一个 BPF 探针函数,
BPF_HASH 映射中累计调用次数。Python 端定时读取该映射,并按 通过 kprobe 挂载到对应的内核 syscall 入口(如 __x64_sys_read
config/monitors.json 中配置的名称对外暴露各系统调用的计数。 Python 端定时读取 BPF_HASH 映射并对外暴露各系统调用的计数。
为什么在内核里统计所有系统调用,而不是只统计配置中列出的? 配置变更时自动重载:先将当前计数累加到 Python 端,再重新编译挂载,
- 配置可能在运行期变化;如果在 BPF 中做过滤,每次改配置都要 保证计数连续性。
重建并重新挂载程序。直接在内核里全部计数(一次哈希自增)开销极低,
Web 层只需挑选要展示的名称即可。
""" """
import json import json
import os import os
import threading import threading
import time
from pathlib import Path from pathlib import Path
from bcc import BPF from bcc import BPF
from bcc.syscall import syscall_name
# 内核态 BPF 程序:在每次系统调用入口处,对该调用号对应的计数器自增 1
BPF_TEXT = r"""
BPF_HASH(counts, u32, u64);
TRACEPOINT_PROBE(raw_syscalls, sys_enter) {
u32 id = (u32)args->id;
u64 zero = 0, *val;
val = counts.lookup_or_try_init(&id, &zero);
if (val) {
// 原子自增,保证多 CPU 并发下计数正确
__sync_fetch_and_add(val, 1);
}
return 0;
}
"""
class SyscallTracer: class SyscallTracer:
"""系统调用追踪器:加载 BPF 程序并周期性刷新计数快照""" """系统调用追踪器:基于 kprobe为每个配置的 syscall 挂载独立探针"""
def __init__(self, config_path: Path): def __init__(self, config_path: Path):
self.config_path = Path(config_path) self.config_path = Path(config_path)
self._bpf = None self._bpf: BPF | None = None
# 保护 _snapshot 的并发访问后台轮询线程写入HTTP 请求线程读取)
self._lock = threading.Lock() self._lock = threading.Lock()
# 最近一次快照:{系统调用名: 累计次数}
self._snapshot: dict[str, int] = {} self._snapshot: dict[str, int] = {}
# 用于通知后台线程退出的事件 # Python 端累加器:重载 BPF 时保留历史计数
self._accumulated: dict[str, int] = {}
# 当前已挂载的 syscall 列表(有序,与 BPF 中 index 对应)
self._attached: list[str] = []
# index -> syscall name 映射表
self._index_map: dict[int, str] = {}
self._stop = threading.Event() self._stop = threading.Event()
self._thread: threading.Thread | None = None self._thread: threading.Thread | None = None
def load_config(self) -> list[str]: def load_config(self) -> list[str]:
"""从 monitors.json 读取需要展示的系统调用名称列表。""" """从 monitors.json 读取需要监控的系统调用名称列表。"""
try: try:
with self.config_path.open("r", encoding="utf-8") as f: with self.config_path.open("r", encoding="utf-8") as f:
data = json.load(f) data = json.load(f)
names = data.get("syscalls", []) names = data.get("syscalls", [])
# 过滤空字符串并去除首尾空格
return [str(n).strip() for n in names if str(n).strip()] return [str(n).strip() for n in names if str(n).strip()]
except FileNotFoundError: except FileNotFoundError:
return [] return []
def start(self) -> None: def start(self) -> None:
"""加载并挂载 BPF 程序启动后台轮询线程。""" """加载 BPF 程序启动后台轮询线程。"""
# 防止重复加载(已启动则直接返回) if self._thread is not None:
if self._bpf is not None:
return return
self._bpf = BPF(text=BPF_TEXT) syscalls = self.load_config()
# daemon=True主进程退出时后台线程自动终止 if syscalls:
self._attach(syscalls)
self._thread = threading.Thread(target=self._poll_loop, daemon=True) self._thread = threading.Thread(target=self._poll_loop, daemon=True)
self._thread.start() self._thread.start()
@@ -77,44 +58,86 @@ class SyscallTracer:
self._stop.set() self._stop.set()
if self._thread: if self._thread:
self._thread.join(timeout=2) self._thread.join(timeout=2)
self._bpf = None self._detach()
def _poll_loop(self) -> None:
# 每秒刷新一次快照;用 Event.wait 兼顾「定时」与「可中断」
while not self._stop.wait(1.0):
self._refresh_snapshot()
def _refresh_snapshot(self) -> None:
"""从 BPF 映射读取原始计数,按配置过滤出关心的系统调用。"""
if self._bpf is None:
return
names = set(self.load_config())
# 配置为空:清空快照,避免展示过期数据
if not names:
with self._lock:
self._snapshot = {}
return
# 初始化:未触发过的系统调用也展示为 0
totals: dict[str, int] = {n: 0 for n in names}
for k, v in self._bpf["counts"].items():
try:
# 把内核里的系统调用号转换成可读名称(如 0 -> "read"
name = syscall_name(k.value).decode("utf-8", "replace")
except Exception:
# 解析失败的条目直接跳过,避免影响其他计数
continue
if name in totals:
totals[name] += v.value
with self._lock:
self._snapshot = totals
def get_counts(self) -> dict[str, int]: def get_counts(self) -> dict[str, int]:
"""供 Web 层调用:返回最近一次快照的副本。""" """供 Web 层调用:返回最近一次快照的副本。"""
with self._lock: with self._lock:
return dict(self._snapshot) return dict(self._snapshot)
def _build_bpf_text(self, syscalls: list[str]) -> str:
"""根据 syscall 列表动态生成 BPF C 源码,每个 syscall 一个探针函数。"""
funcs = []
for i, name in enumerate(syscalls):
funcs.append(
f"int trace_{name}(struct pt_regs *ctx) {{\n"
f" u32 id = {i};\n"
f" u64 zero = 0, *val;\n"
f" val = counts.lookup_or_try_init(&id, &zero);\n"
f" if (val) {{ __sync_fetch_and_add(val, 1); }}\n"
f" return 0;\n"
f"}}"
)
return "BPF_HASH(counts, u32, u64);\n\n" + "\n\n".join(funcs) + "\n"
def _attach(self, syscalls: list[str]) -> None:
"""编译 BPF 程序并为每个 syscall 挂载 kprobe。"""
text = self._build_bpf_text(syscalls)
self._bpf = BPF(text=text)
for name in syscalls:
fn = self._bpf.get_syscall_fnname(name)
self._bpf.attach_kprobe(event=fn, fn_name=f"trace_{name}")
self._attached = list(syscalls)
self._index_map = {i: name for i, name in enumerate(syscalls)}
def _detach(self) -> None:
"""卸载当前 BPF 程序及所有 kprobe。"""
if self._bpf is not None:
self._bpf.cleanup()
self._bpf = None
self._attached = []
self._index_map = {}
def _reload(self, new_syscalls: list[str]) -> None:
"""配置变更时重载:累加当前计数后重新编译挂载。"""
self._accumulate_counts()
self._detach()
if new_syscalls:
self._attach(new_syscalls)
def _accumulate_counts(self) -> None:
"""将当前 BPF 计数累加到 Python 端,防止重载丢失数据。"""
if self._bpf is None:
return
for k, v in self._bpf["counts"].items():
name = self._index_map.get(k.value)
if name:
self._accumulated[name] = self._accumulated.get(name, 0) + v.value
def _poll_loop(self) -> None:
"""后台轮询:检测配置变更 + 刷新快照。"""
while not self._stop.wait(1.0):
current_config = self.load_config()
if set(current_config) != set(self._attached):
self._reload(current_config)
self._refresh_snapshot()
def _refresh_snapshot(self) -> None:
"""从 BPF 映射读取计数,合并累加器,写入快照。"""
if not self._attached:
with self._lock:
self._snapshot = {}
return
totals: dict[str, int] = {name: self._accumulated.get(name, 0) for name in self._attached}
if self._bpf is not None:
for k, v in self._bpf["counts"].items():
name = self._index_map.get(k.value)
if name:
totals[name] = totals.get(name, 0) + v.value
with self._lock:
self._snapshot = totals
# 进程级单例:整个应用只挂载一次 BPF 程序,避免重复挂载和资源浪费
_tracer: SyscallTracer | None = None _tracer: SyscallTracer | None = None

33
demo/client.py Normal file
View File

@@ -0,0 +1,33 @@
"""TCP 单次发送客户端:发送 3 条测试消息,验证基本连通性"""
import socket
import time
HOST = "127.0.0.1"
PORT = 8888
MESSAGE_COUNT = 3
def send_once(index: int) -> bool:
try:
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.settimeout(5)
client.connect((HOST, PORT))
msg = f"test_message_{index}_{time.time()}"
client.send(msg.encode("utf-8"))
reply = client.recv(1024)
client.close()
print(f"消息 {index} 发送成功,回复: {reply.decode('utf-8', errors='replace')}")
return True
except Exception as exc:
print(f"消息 {index} 失败: {exc}")
return False
if __name__ == "__main__":
print(f"开始连通性测试: 发送 {MESSAGE_COUNT} 条消息到 {HOST}:{PORT}")
success = 0
for i in range(MESSAGE_COUNT):
if send_once(i):
success += 1
print(f"完成: 成功 {success}/{MESSAGE_COUNT}")

48
demo/server.py Normal file
View File

@@ -0,0 +1,48 @@
"""TCP 文本写入服务端(实验大纲示例应用)"""
import os
import socket
HOST = "0.0.0.0"
PORT = 8888
SAVE_PATH = "./data.txt"
if not os.path.exists(os.path.dirname(SAVE_PATH)) and os.path.dirname(SAVE_PATH):
os.makedirs(os.path.dirname(SAVE_PATH))
def save_to_file(content):
"""文本追加写入本地磁盘"""
with open(SAVE_PATH, "a", encoding="utf-8") as f:
f.write(content + "\n")
def main():
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_socket.bind((HOST, PORT))
server_socket.listen(5)
print(f"TCP 写入服务已启动,监听端口:{PORT}")
print(f"数据保存路径:{os.path.abspath(SAVE_PATH)}")
while True:
conn, addr = server_socket.accept()
print(f"\n客户端已连接:{addr}")
try:
while True:
data = conn.recv(1024)
if not data:
break
text = data.decode("utf-8").strip()
print(f"收到数据:{text}")
save_to_file(text)
conn.send("写入成功,数据已保存到磁盘".encode("utf-8"))
except Exception as e:
print(f"连接异常:{e}")
finally:
conn.close()
print("客户端连接断开")
if __name__ == "__main__":
main()

47
demo/stress_client.py Normal file
View File

@@ -0,0 +1,47 @@
"""TCP 压力测试客户端:模拟峰值负载,触发大量系统调用"""
import argparse
import socket
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
HOST = "127.0.0.1"
PORT = 8888
def send_once(index: int) -> bool:
try:
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.settimeout(5)
client.connect((HOST, PORT))
msg = f"stress_test_message_{index}_{time.time()}"
client.send(msg.encode("utf-8"))
client.recv(1024)
client.close()
return True
except Exception as exc:
print(f"请求 {index} 失败: {exc}")
return False
def run_stress(total: int, workers: int):
print(f"开始压力测试: {total} 次请求, {workers} 并发")
start = time.time()
success = 0
with ThreadPoolExecutor(max_workers=workers) as pool:
futures = [pool.submit(send_once, i) for i in range(total)]
for future in as_completed(futures):
if future.result():
success += 1
elapsed = time.time() - start
print(f"完成: 成功 {success}/{total}, 耗时 {elapsed:.2f}s, QPS={success/elapsed:.1f}")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="TCP 压力测试客户端")
parser.add_argument("-n", "--count", type=int, default=200, help="总请求数")
parser.add_argument("-w", "--workers", type=int, default=20, help="并发线程数")
args = parser.parse_args()
run_stress(args.count, args.workers)

22
hello.py Normal file
View File

@@ -0,0 +1,22 @@
"""基础验证eBPF kprobe Hello World 程序(实验大纲 基础验证部分)"""
from bcc import BPF
prog = """
int hello(void *ctx) {
bpf_trace_printk("Hello, World!\\n");
return 0;
}
"""
b = BPF(text=prog)
execve_function = b.get_syscall_fnname("execve")
b.attach_kprobe(event=execve_function, fn_name="hello")
print("%-18s %-16s %-6s %s" % ("TIME(s)", "COMM", "PID", "MESSAGE"))
while True:
try:
task, pid, cpu, flags, ts, msg = b.trace_fields()
except ValueError:
continue
print("%-18.9f %-16s %-6d %s" % (ts, task, pid, msg))

View File

@@ -6,7 +6,7 @@
import os import os
import sys import sys
# 引入 Flask 应用,用于创建 Web 服务实例 # 引入 Flask 应用工厂,用于创建 Web 服务实例
from web.app import create_app from web.app import create_app