"""基于 eBPF kprobe 的系统调用计数器。 为 config/monitors.json 中配置的每个系统调用,动态生成一个 BPF 探针函数, 通过 kprobe 挂载到对应的内核 syscall 入口(如 __x64_sys_read)。 Python 端定时读取 BPF_HASH 映射并对外暴露各系统调用的计数。 配置变更时自动重载:先将当前计数累加到 Python 端,再重新编译挂载, 保证计数连续性。 """ import json import os import threading from pathlib import Path from bcc import BPF class SyscallTracer: """系统调用追踪器:基于 kprobe,为每个配置的 syscall 挂载独立探针。""" def __init__(self, config_path: Path): self.config_path = Path(config_path) self._bpf: BPF | None = None self._lock = threading.Lock() self._snapshot: dict[str, int] = {} # Python 端累加器:重载 BPF 时保留历史计数 self._accumulated: dict[str, int] = {} # 当前已挂载的 syscall 列表(有序,与 BPF 中 index 对应) self._attached: list[str] = [] # index -> syscall name 映射表 self._index_map: dict[int, str] = {} self._stop = threading.Event() self._thread: threading.Thread | None = None def load_config(self) -> list[str]: """从 monitors.json 读取需要监控的系统调用名称列表。""" try: with self.config_path.open("r", encoding="utf-8") as f: data = json.load(f) names = data.get("syscalls", []) return [str(n).strip() for n in names if str(n).strip()] except FileNotFoundError: return [] def start(self) -> None: """加载 BPF 程序并启动后台轮询线程。""" if self._thread is not None: return syscalls = self.load_config() if syscalls: self._attach(syscalls) self._thread = threading.Thread(target=self._poll_loop, daemon=True) self._thread.start() def stop(self) -> None: """停止追踪:通知后台线程退出并卸载 BPF 程序。""" self._stop.set() if self._thread: self._thread.join(timeout=2) self._detach() def get_counts(self) -> dict[str, int]: """供 Web 层调用:返回最近一次快照的副本。""" with self._lock: return dict(self._snapshot) def _build_bpf_text(self, syscalls: list[str]) -> str: """根据 syscall 列表动态生成 BPF C 源码,每个 syscall 一个探针函数。""" funcs = [] for i, name in enumerate(syscalls): funcs.append( f"int trace_{name}(struct pt_regs *ctx) {{\n" f" u32 id = {i};\n" f" u64 zero = 0, *val;\n" f" val = counts.lookup_or_try_init(&id, &zero);\n" f" if (val) {{ __sync_fetch_and_add(val, 1); }}\n" f" return 0;\n" f"}}" ) return "BPF_HASH(counts, u32, u64);\n\n" + "\n\n".join(funcs) + "\n" def _attach(self, syscalls: list[str]) -> None: """编译 BPF 程序并为每个 syscall 挂载 kprobe。""" text = self._build_bpf_text(syscalls) self._bpf = BPF(text=text) for name in syscalls: fn = self._bpf.get_syscall_fnname(name) self._bpf.attach_kprobe(event=fn, fn_name=f"trace_{name}") self._attached = list(syscalls) self._index_map = {i: name for i, name in enumerate(syscalls)} def _detach(self) -> None: """卸载当前 BPF 程序及所有 kprobe。""" if self._bpf is not None: self._bpf.cleanup() self._bpf = None self._attached = [] self._index_map = {} def _reload(self, new_syscalls: list[str]) -> None: """配置变更时重载:累加当前计数后重新编译挂载。""" self._accumulate_counts() self._detach() if new_syscalls: self._attach(new_syscalls) def _accumulate_counts(self) -> None: """将当前 BPF 计数累加到 Python 端,防止重载丢失数据。""" if self._bpf is None: return for k, v in self._bpf["counts"].items(): name = self._index_map.get(k.value) if name: self._accumulated[name] = self._accumulated.get(name, 0) + v.value def _poll_loop(self) -> None: """后台轮询:检测配置变更 + 刷新快照。""" while not self._stop.wait(1.0): current_config = self.load_config() if set(current_config) != set(self._attached): self._reload(current_config) self._refresh_snapshot() def _refresh_snapshot(self) -> None: """从 BPF 映射读取计数,合并累加器,写入快照。""" if not self._attached: with self._lock: self._snapshot = {} return totals: dict[str, int] = {name: self._accumulated.get(name, 0) for name in self._attached} if self._bpf is not None: for k, v in self._bpf["counts"].items(): name = self._index_map.get(k.value) if name: totals[name] = totals.get(name, 0) + v.value with self._lock: self._snapshot = totals _tracer: SyscallTracer | None = None def get_tracer(config_path: str | os.PathLike) -> SyscallTracer: """获取(必要时创建)全局追踪器实例。""" global _tracer if _tracer is None: _tracer = SyscallTracer(Path(config_path)) _tracer.start() return _tracer