From 47fbee4b8be784c7f4d2ee78bcf6610f2f82730a Mon Sep 17 00:00:00 2001 From: MarceloZoeng <2280535520@qq.com> Date: Tue, 16 Jun 2026 01:23:13 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0tracer=EF=BC=9A=E5=9F=BA?= =?UTF-8?q?=E4=BA=8Ekprobe=EF=BC=8C=E5=AE=9E=E7=8E=B0=E5=8A=A8=E6=80=81?= =?UTF-8?q?=E7=B3=BB=E7=BB=9F=E8=B0=83=E7=94=A8=E8=AE=A1=E6=95=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- collector/syscall_tracer.py | 163 ++++++++++++++++++++---------------- 1 file changed, 93 insertions(+), 70 deletions(-) diff --git a/collector/syscall_tracer.py b/collector/syscall_tracer.py index d2ee608..fd2770b 100644 --- a/collector/syscall_tracer.py +++ b/collector/syscall_tracer.py @@ -1,74 +1,55 @@ -"""基于 eBPF 的系统调用计数器。 +"""基于 eBPF kprobe 的系统调用计数器。 -将一段 BPF 程序挂载到 raw_syscalls:sys_enter 跟踪点,按系统调用号在 -BPF_HASH 映射中累计调用次数。Python 端定时读取该映射,并按 -config/monitors.json 中配置的名称对外暴露各系统调用的计数。 +为 config/monitors.json 中配置的每个系统调用,动态生成一个 BPF 探针函数, +通过 kprobe 挂载到对应的内核 syscall 入口(如 __x64_sys_read)。 +Python 端定时读取 BPF_HASH 映射并对外暴露各系统调用的计数。 -为什么在内核里统计所有系统调用,而不是只统计配置中列出的? - - 配置可能在运行期变化;如果在 BPF 中做过滤,每次改配置都要 - 重建并重新挂载程序。直接在内核里全部计数(一次哈希自增)开销极低, - Web 层只需挑选要展示的名称即可。 +配置变更时自动重载:先将当前计数累加到 Python 端,再重新编译挂载, +保证计数连续性。 """ import json import os import threading -import time from pathlib import Path from bcc import BPF -from bcc.syscall import syscall_name - - -# 内核态 BPF 程序:在每次系统调用入口处,对该调用号对应的计数器自增 1 -BPF_TEXT = r""" -BPF_HASH(counts, u32, u64); - -TRACEPOINT_PROBE(raw_syscalls, sys_enter) { - u32 id = (u32)args->id; - u64 zero = 0, *val; - val = counts.lookup_or_try_init(&id, &zero); - if (val) { - // 原子自增,保证多 CPU 并发下计数正确 - __sync_fetch_and_add(val, 1); - } - return 0; -} -""" class SyscallTracer: - """系统调用追踪器:加载 BPF 程序并周期性刷新计数快照。""" + """系统调用追踪器:基于 kprobe,为每个配置的 syscall 挂载独立探针。""" def __init__(self, config_path: Path): self.config_path = Path(config_path) - self._bpf = None - # 保护 _snapshot 的并发访问(后台轮询线程写入,HTTP 请求线程读取) + self._bpf: BPF | None = None self._lock = threading.Lock() - # 最近一次快照:{系统调用名: 累计次数} self._snapshot: dict[str, int] = {} - # 用于通知后台线程退出的事件 + # Python 端累加器:重载 BPF 时保留历史计数 + self._accumulated: dict[str, int] = {} + # 当前已挂载的 syscall 列表(有序,与 BPF 中 index 对应) + self._attached: list[str] = [] + # index -> syscall name 映射表 + self._index_map: dict[int, str] = {} self._stop = threading.Event() self._thread: threading.Thread | None = None def load_config(self) -> list[str]: - """从 monitors.json 读取需要展示的系统调用名称列表。""" + """从 monitors.json 读取需要监控的系统调用名称列表。""" try: with self.config_path.open("r", encoding="utf-8") as f: data = json.load(f) names = data.get("syscalls", []) - # 过滤空字符串并去除首尾空格 return [str(n).strip() for n in names if str(n).strip()] except FileNotFoundError: return [] def start(self) -> None: - """加载并挂载 BPF 程序,启动后台轮询线程。""" - # 防止重复加载(已启动则直接返回) - if self._bpf is not None: + """加载 BPF 程序并启动后台轮询线程。""" + if self._thread is not None: return - self._bpf = BPF(text=BPF_TEXT) - # daemon=True:主进程退出时后台线程自动终止 + syscalls = self.load_config() + if syscalls: + self._attach(syscalls) self._thread = threading.Thread(target=self._poll_loop, daemon=True) self._thread.start() @@ -77,44 +58,86 @@ class SyscallTracer: self._stop.set() if self._thread: self._thread.join(timeout=2) - self._bpf = None - - def _poll_loop(self) -> None: - # 每秒刷新一次快照;用 Event.wait 兼顾「定时」与「可中断」 - while not self._stop.wait(1.0): - self._refresh_snapshot() - - def _refresh_snapshot(self) -> None: - """从 BPF 映射读取原始计数,按配置过滤出关心的系统调用。""" - if self._bpf is None: - return - names = set(self.load_config()) - # 配置为空:清空快照,避免展示过期数据 - if not names: - with self._lock: - self._snapshot = {} - return - # 初始化:未触发过的系统调用也展示为 0 - totals: dict[str, int] = {n: 0 for n in names} - for k, v in self._bpf["counts"].items(): - try: - # 把内核里的系统调用号转换成可读名称(如 0 -> "read") - name = syscall_name(k.value).decode("utf-8", "replace") - except Exception: - # 解析失败的条目直接跳过,避免影响其他计数 - continue - if name in totals: - totals[name] += v.value - with self._lock: - self._snapshot = totals + self._detach() def get_counts(self) -> dict[str, int]: """供 Web 层调用:返回最近一次快照的副本。""" with self._lock: return dict(self._snapshot) + def _build_bpf_text(self, syscalls: list[str]) -> str: + """根据 syscall 列表动态生成 BPF C 源码,每个 syscall 一个探针函数。""" + funcs = [] + for i, name in enumerate(syscalls): + funcs.append( + f"int trace_{name}(struct pt_regs *ctx) {{\n" + f" u32 id = {i};\n" + f" u64 zero = 0, *val;\n" + f" val = counts.lookup_or_try_init(&id, &zero);\n" + f" if (val) {{ __sync_fetch_and_add(val, 1); }}\n" + f" return 0;\n" + f"}}" + ) + return "BPF_HASH(counts, u32, u64);\n\n" + "\n\n".join(funcs) + "\n" + + def _attach(self, syscalls: list[str]) -> None: + """编译 BPF 程序并为每个 syscall 挂载 kprobe。""" + text = self._build_bpf_text(syscalls) + self._bpf = BPF(text=text) + for name in syscalls: + fn = self._bpf.get_syscall_fnname(name) + self._bpf.attach_kprobe(event=fn, fn_name=f"trace_{name}") + self._attached = list(syscalls) + self._index_map = {i: name for i, name in enumerate(syscalls)} + + def _detach(self) -> None: + """卸载当前 BPF 程序及所有 kprobe。""" + if self._bpf is not None: + self._bpf.cleanup() + self._bpf = None + self._attached = [] + self._index_map = {} + + def _reload(self, new_syscalls: list[str]) -> None: + """配置变更时重载:累加当前计数后重新编译挂载。""" + self._accumulate_counts() + self._detach() + if new_syscalls: + self._attach(new_syscalls) + + def _accumulate_counts(self) -> None: + """将当前 BPF 计数累加到 Python 端,防止重载丢失数据。""" + if self._bpf is None: + return + for k, v in self._bpf["counts"].items(): + name = self._index_map.get(k.value) + if name: + self._accumulated[name] = self._accumulated.get(name, 0) + v.value + + def _poll_loop(self) -> None: + """后台轮询:检测配置变更 + 刷新快照。""" + while not self._stop.wait(1.0): + current_config = self.load_config() + if set(current_config) != set(self._attached): + self._reload(current_config) + self._refresh_snapshot() + + def _refresh_snapshot(self) -> None: + """从 BPF 映射读取计数,合并累加器,写入快照。""" + if not self._attached: + with self._lock: + self._snapshot = {} + return + totals: dict[str, int] = {name: self._accumulated.get(name, 0) for name in self._attached} + if self._bpf is not None: + for k, v in self._bpf["counts"].items(): + name = self._index_map.get(k.value) + if name: + totals[name] = totals.get(name, 0) + v.value + with self._lock: + self._snapshot = totals + -# 进程级单例:整个应用只挂载一次 BPF 程序,避免重复挂载和资源浪费 _tracer: SyscallTracer | None = None