Files
syscall_monitor/collector/syscall_tracer.py
MarceloZoeng 47fbee4b8b
All checks were successful
CI / lint-and-build (push) Successful in 2m33s
CD / deploy (push) Successful in 1m4s
更新tracer:基于kprobe,实现动态系统调用计数
2026-06-16 01:23:13 +08:00

151 lines
5.6 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""基于 eBPF kprobe 的系统调用计数器。
为 config/monitors.json 中配置的每个系统调用,动态生成一个 BPF 探针函数,
通过 kprobe 挂载到对应的内核 syscall 入口(如 __x64_sys_read
Python 端定时读取 BPF_HASH 映射并对外暴露各系统调用的计数。
配置变更时自动重载:先将当前计数累加到 Python 端,再重新编译挂载,
保证计数连续性。
"""
import json
import os
import threading
from pathlib import Path
from bcc import BPF
class SyscallTracer:
"""系统调用追踪器:基于 kprobe为每个配置的 syscall 挂载独立探针。"""
def __init__(self, config_path: Path):
self.config_path = Path(config_path)
self._bpf: BPF | None = None
self._lock = threading.Lock()
self._snapshot: dict[str, int] = {}
# Python 端累加器:重载 BPF 时保留历史计数
self._accumulated: dict[str, int] = {}
# 当前已挂载的 syscall 列表(有序,与 BPF 中 index 对应)
self._attached: list[str] = []
# index -> syscall name 映射表
self._index_map: dict[int, str] = {}
self._stop = threading.Event()
self._thread: threading.Thread | None = None
def load_config(self) -> list[str]:
"""从 monitors.json 读取需要监控的系统调用名称列表。"""
try:
with self.config_path.open("r", encoding="utf-8") as f:
data = json.load(f)
names = data.get("syscalls", [])
return [str(n).strip() for n in names if str(n).strip()]
except FileNotFoundError:
return []
def start(self) -> None:
"""加载 BPF 程序并启动后台轮询线程。"""
if self._thread is not None:
return
syscalls = self.load_config()
if syscalls:
self._attach(syscalls)
self._thread = threading.Thread(target=self._poll_loop, daemon=True)
self._thread.start()
def stop(self) -> None:
"""停止追踪:通知后台线程退出并卸载 BPF 程序。"""
self._stop.set()
if self._thread:
self._thread.join(timeout=2)
self._detach()
def get_counts(self) -> dict[str, int]:
"""供 Web 层调用:返回最近一次快照的副本。"""
with self._lock:
return dict(self._snapshot)
def _build_bpf_text(self, syscalls: list[str]) -> str:
"""根据 syscall 列表动态生成 BPF C 源码,每个 syscall 一个探针函数。"""
funcs = []
for i, name in enumerate(syscalls):
funcs.append(
f"int trace_{name}(struct pt_regs *ctx) {{\n"
f" u32 id = {i};\n"
f" u64 zero = 0, *val;\n"
f" val = counts.lookup_or_try_init(&id, &zero);\n"
f" if (val) {{ __sync_fetch_and_add(val, 1); }}\n"
f" return 0;\n"
f"}}"
)
return "BPF_HASH(counts, u32, u64);\n\n" + "\n\n".join(funcs) + "\n"
def _attach(self, syscalls: list[str]) -> None:
"""编译 BPF 程序并为每个 syscall 挂载 kprobe。"""
text = self._build_bpf_text(syscalls)
self._bpf = BPF(text=text)
for name in syscalls:
fn = self._bpf.get_syscall_fnname(name)
self._bpf.attach_kprobe(event=fn, fn_name=f"trace_{name}")
self._attached = list(syscalls)
self._index_map = {i: name for i, name in enumerate(syscalls)}
def _detach(self) -> None:
"""卸载当前 BPF 程序及所有 kprobe。"""
if self._bpf is not None:
self._bpf.cleanup()
self._bpf = None
self._attached = []
self._index_map = {}
def _reload(self, new_syscalls: list[str]) -> None:
"""配置变更时重载:累加当前计数后重新编译挂载。"""
self._accumulate_counts()
self._detach()
if new_syscalls:
self._attach(new_syscalls)
def _accumulate_counts(self) -> None:
"""将当前 BPF 计数累加到 Python 端,防止重载丢失数据。"""
if self._bpf is None:
return
for k, v in self._bpf["counts"].items():
name = self._index_map.get(k.value)
if name:
self._accumulated[name] = self._accumulated.get(name, 0) + v.value
def _poll_loop(self) -> None:
"""后台轮询:检测配置变更 + 刷新快照。"""
while not self._stop.wait(1.0):
current_config = self.load_config()
if set(current_config) != set(self._attached):
self._reload(current_config)
self._refresh_snapshot()
def _refresh_snapshot(self) -> None:
"""从 BPF 映射读取计数,合并累加器,写入快照。"""
if not self._attached:
with self._lock:
self._snapshot = {}
return
totals: dict[str, int] = {name: self._accumulated.get(name, 0) for name in self._attached}
if self._bpf is not None:
for k, v in self._bpf["counts"].items():
name = self._index_map.get(k.value)
if name:
totals[name] = totals.get(name, 0) + v.value
with self._lock:
self._snapshot = totals
_tracer: SyscallTracer | None = None
def get_tracer(config_path: str | os.PathLike) -> SyscallTracer:
"""获取(必要时创建)全局追踪器实例。"""
global _tracer
if _tracer is None:
_tracer = SyscallTracer(Path(config_path))
_tracer.start()
return _tracer