更新tracer:基于kprobe,实现动态系统调用计数

This commit is contained in:
2026-06-16 01:23:13 +08:00
parent 81a8573e51
commit 84340e55d2

View File

@@ -1,74 +1,55 @@
"""基于 eBPF 的系统调用计数器。 """基于 eBPF kprobe 的系统调用计数器。
将一段 BPF 程序挂载到 raw_syscalls:sys_enter 跟踪点,按系统调用号在 为 config/monitors.json 中配置的每个系统调用,动态生成一个 BPF 探针函数,
BPF_HASH 映射中累计调用次数。Python 端定时读取该映射,并按 通过 kprobe 挂载到对应的内核 syscall 入口(如 __x64_sys_read
config/monitors.json 中配置的名称对外暴露各系统调用的计数。 Python 端定时读取 BPF_HASH 映射并对外暴露各系统调用的计数。
为什么在内核里统计所有系统调用,而不是只统计配置中列出的? 配置变更时自动重载:先将当前计数累加到 Python 端,再重新编译挂载,
- 配置可能在运行期变化;如果在 BPF 中做过滤,每次改配置都要 保证计数连续性。
重建并重新挂载程序。直接在内核里全部计数(一次哈希自增)开销极低,
Web 层只需挑选要展示的名称即可。
""" """
import json import json
import os import os
import threading import threading
import time
from pathlib import Path from pathlib import Path
from bcc import BPF from bcc import BPF
from bcc.syscall import syscall_name
# 内核态 BPF 程序:在每次系统调用入口处,对该调用号对应的计数器自增 1
BPF_TEXT = r"""
BPF_HASH(counts, u32, u64);
TRACEPOINT_PROBE(raw_syscalls, sys_enter) {
u32 id = (u32)args->id;
u64 zero = 0, *val;
val = counts.lookup_or_try_init(&id, &zero);
if (val) {
// 原子自增,保证多 CPU 并发下计数正确
__sync_fetch_and_add(val, 1);
}
return 0;
}
"""
class SyscallTracer: class SyscallTracer:
"""系统调用追踪器:加载 BPF 程序并周期性刷新计数快照""" """系统调用追踪器:基于 kprobe为每个配置的 syscall 挂载独立探针"""
def __init__(self, config_path: Path): def __init__(self, config_path: Path):
self.config_path = Path(config_path) self.config_path = Path(config_path)
self._bpf = None self._bpf: BPF | None = None
# 保护 _snapshot 的并发访问后台轮询线程写入HTTP 请求线程读取)
self._lock = threading.Lock() self._lock = threading.Lock()
# 最近一次快照:{系统调用名: 累计次数}
self._snapshot: dict[str, int] = {} self._snapshot: dict[str, int] = {}
# 用于通知后台线程退出的事件 # Python 端累加器:重载 BPF 时保留历史计数
self._accumulated: dict[str, int] = {}
# 当前已挂载的 syscall 列表(有序,与 BPF 中 index 对应)
self._attached: list[str] = []
# index -> syscall name 映射表
self._index_map: dict[int, str] = {}
self._stop = threading.Event() self._stop = threading.Event()
self._thread: threading.Thread | None = None self._thread: threading.Thread | None = None
def load_config(self) -> list[str]: def load_config(self) -> list[str]:
"""从 monitors.json 读取需要展示的系统调用名称列表。""" """从 monitors.json 读取需要监控的系统调用名称列表。"""
try: try:
with self.config_path.open("r", encoding="utf-8") as f: with self.config_path.open("r", encoding="utf-8") as f:
data = json.load(f) data = json.load(f)
names = data.get("syscalls", []) names = data.get("syscalls", [])
# 过滤空字符串并去除首尾空格
return [str(n).strip() for n in names if str(n).strip()] return [str(n).strip() for n in names if str(n).strip()]
except FileNotFoundError: except FileNotFoundError:
return [] return []
def start(self) -> None: def start(self) -> None:
"""加载并挂载 BPF 程序启动后台轮询线程。""" """加载 BPF 程序启动后台轮询线程。"""
# 防止重复加载(已启动则直接返回) if self._thread is not None:
if self._bpf is not None:
return return
self._bpf = BPF(text=BPF_TEXT) syscalls = self.load_config()
# daemon=True主进程退出时后台线程自动终止 if syscalls:
self._attach(syscalls)
self._thread = threading.Thread(target=self._poll_loop, daemon=True) self._thread = threading.Thread(target=self._poll_loop, daemon=True)
self._thread.start() self._thread.start()
@@ -77,44 +58,86 @@ class SyscallTracer:
self._stop.set() self._stop.set()
if self._thread: if self._thread:
self._thread.join(timeout=2) self._thread.join(timeout=2)
self._bpf = None self._detach()
def _poll_loop(self) -> None:
# 每秒刷新一次快照;用 Event.wait 兼顾「定时」与「可中断」
while not self._stop.wait(1.0):
self._refresh_snapshot()
def _refresh_snapshot(self) -> None:
"""从 BPF 映射读取原始计数,按配置过滤出关心的系统调用。"""
if self._bpf is None:
return
names = set(self.load_config())
# 配置为空:清空快照,避免展示过期数据
if not names:
with self._lock:
self._snapshot = {}
return
# 初始化:未触发过的系统调用也展示为 0
totals: dict[str, int] = {n: 0 for n in names}
for k, v in self._bpf["counts"].items():
try:
# 把内核里的系统调用号转换成可读名称(如 0 -> "read"
name = syscall_name(k.value).decode("utf-8", "replace")
except Exception:
# 解析失败的条目直接跳过,避免影响其他计数
continue
if name in totals:
totals[name] += v.value
with self._lock:
self._snapshot = totals
def get_counts(self) -> dict[str, int]: def get_counts(self) -> dict[str, int]:
"""供 Web 层调用:返回最近一次快照的副本。""" """供 Web 层调用:返回最近一次快照的副本。"""
with self._lock: with self._lock:
return dict(self._snapshot) return dict(self._snapshot)
def _build_bpf_text(self, syscalls: list[str]) -> str:
"""根据 syscall 列表动态生成 BPF C 源码,每个 syscall 一个探针函数。"""
funcs = []
for i, name in enumerate(syscalls):
funcs.append(
f"int trace_{name}(struct pt_regs *ctx) {{\n"
f" u32 id = {i};\n"
f" u64 zero = 0, *val;\n"
f" val = counts.lookup_or_try_init(&id, &zero);\n"
f" if (val) {{ __sync_fetch_and_add(val, 1); }}\n"
f" return 0;\n"
f"}}"
)
return "BPF_HASH(counts, u32, u64);\n\n" + "\n\n".join(funcs) + "\n"
def _attach(self, syscalls: list[str]) -> None:
"""编译 BPF 程序并为每个 syscall 挂载 kprobe。"""
text = self._build_bpf_text(syscalls)
self._bpf = BPF(text=text)
for name in syscalls:
fn = self._bpf.get_syscall_fnname(name)
self._bpf.attach_kprobe(event=fn, fn_name=f"trace_{name}")
self._attached = list(syscalls)
self._index_map = {i: name for i, name in enumerate(syscalls)}
def _detach(self) -> None:
"""卸载当前 BPF 程序及所有 kprobe。"""
if self._bpf is not None:
self._bpf.cleanup()
self._bpf = None
self._attached = []
self._index_map = {}
def _reload(self, new_syscalls: list[str]) -> None:
"""配置变更时重载:累加当前计数后重新编译挂载。"""
self._accumulate_counts()
self._detach()
if new_syscalls:
self._attach(new_syscalls)
def _accumulate_counts(self) -> None:
"""将当前 BPF 计数累加到 Python 端,防止重载丢失数据。"""
if self._bpf is None:
return
for k, v in self._bpf["counts"].items():
name = self._index_map.get(k.value)
if name:
self._accumulated[name] = self._accumulated.get(name, 0) + v.value
def _poll_loop(self) -> None:
"""后台轮询:检测配置变更 + 刷新快照。"""
while not self._stop.wait(1.0):
current_config = self.load_config()
if set(current_config) != set(self._attached):
self._reload(current_config)
self._refresh_snapshot()
def _refresh_snapshot(self) -> None:
"""从 BPF 映射读取计数,合并累加器,写入快照。"""
if not self._attached:
with self._lock:
self._snapshot = {}
return
totals: dict[str, int] = {name: self._accumulated.get(name, 0) for name in self._attached}
if self._bpf is not None:
for k, v in self._bpf["counts"].items():
name = self._index_map.get(k.value)
if name:
totals[name] = totals.get(name, 0) + v.value
with self._lock:
self._snapshot = totals
# 进程级单例:整个应用只挂载一次 BPF 程序,避免重复挂载和资源浪费
_tracer: SyscallTracer | None = None _tracer: SyscallTracer | None = None