1 Commits

Author SHA1 Message Date
47fbee4b8b 更新tracer:基于kprobe,实现动态系统调用计数
All checks were successful
CI / lint-and-build (push) Successful in 2m33s
CD / deploy (push) Successful in 1m4s
2026-06-16 01:23:13 +08:00

View File

@@ -1,74 +1,55 @@
"""基于 eBPF 的系统调用计数器。
"""基于 eBPF kprobe 的系统调用计数器。
将一段 BPF 程序挂载到 raw_syscalls:sys_enter 跟踪点,按系统调用号在
BPF_HASH 映射中累计调用次数。Python 端定时读取该映射,并按
config/monitors.json 中配置的名称对外暴露各系统调用的计数。
为 config/monitors.json 中配置的每个系统调用,动态生成一个 BPF 探针函数,
通过 kprobe 挂载到对应的内核 syscall 入口(如 __x64_sys_read
Python 端定时读取 BPF_HASH 映射并对外暴露各系统调用的计数。
为什么在内核里统计所有系统调用,而不是只统计配置中列出的?
- 配置可能在运行期变化;如果在 BPF 中做过滤,每次改配置都要
重建并重新挂载程序。直接在内核里全部计数(一次哈希自增)开销极低,
Web 层只需挑选要展示的名称即可。
配置变更时自动重载:先将当前计数累加到 Python 端,再重新编译挂载,
保证计数连续性。
"""
import json
import os
import threading
import time
from pathlib import Path
from bcc import BPF
from bcc.syscall import syscall_name
# 内核态 BPF 程序:在每次系统调用入口处,对该调用号对应的计数器自增 1
BPF_TEXT = r"""
BPF_HASH(counts, u32, u64);
TRACEPOINT_PROBE(raw_syscalls, sys_enter) {
u32 id = (u32)args->id;
u64 zero = 0, *val;
val = counts.lookup_or_try_init(&id, &zero);
if (val) {
// 原子自增,保证多 CPU 并发下计数正确
__sync_fetch_and_add(val, 1);
}
return 0;
}
"""
class SyscallTracer:
"""系统调用追踪器:加载 BPF 程序并周期性刷新计数快照"""
"""系统调用追踪器:基于 kprobe为每个配置的 syscall 挂载独立探针"""
def __init__(self, config_path: Path):
self.config_path = Path(config_path)
self._bpf = None
# 保护 _snapshot 的并发访问后台轮询线程写入HTTP 请求线程读取)
self._bpf: BPF | None = None
self._lock = threading.Lock()
# 最近一次快照:{系统调用名: 累计次数}
self._snapshot: dict[str, int] = {}
# 用于通知后台线程退出的事件
# Python 端累加器:重载 BPF 时保留历史计数
self._accumulated: dict[str, int] = {}
# 当前已挂载的 syscall 列表(有序,与 BPF 中 index 对应)
self._attached: list[str] = []
# index -> syscall name 映射表
self._index_map: dict[int, str] = {}
self._stop = threading.Event()
self._thread: threading.Thread | None = None
def load_config(self) -> list[str]:
"""从 monitors.json 读取需要展示的系统调用名称列表。"""
"""从 monitors.json 读取需要监控的系统调用名称列表。"""
try:
with self.config_path.open("r", encoding="utf-8") as f:
data = json.load(f)
names = data.get("syscalls", [])
# 过滤空字符串并去除首尾空格
return [str(n).strip() for n in names if str(n).strip()]
except FileNotFoundError:
return []
def start(self) -> None:
"""加载并挂载 BPF 程序启动后台轮询线程。"""
# 防止重复加载(已启动则直接返回)
if self._bpf is not None:
"""加载 BPF 程序启动后台轮询线程。"""
if self._thread is not None:
return
self._bpf = BPF(text=BPF_TEXT)
# daemon=True主进程退出时后台线程自动终止
syscalls = self.load_config()
if syscalls:
self._attach(syscalls)
self._thread = threading.Thread(target=self._poll_loop, daemon=True)
self._thread.start()
@@ -77,44 +58,86 @@ class SyscallTracer:
self._stop.set()
if self._thread:
self._thread.join(timeout=2)
self._bpf = None
def _poll_loop(self) -> None:
# 每秒刷新一次快照;用 Event.wait 兼顾「定时」与「可中断」
while not self._stop.wait(1.0):
self._refresh_snapshot()
def _refresh_snapshot(self) -> None:
"""从 BPF 映射读取原始计数,按配置过滤出关心的系统调用。"""
if self._bpf is None:
return
names = set(self.load_config())
# 配置为空:清空快照,避免展示过期数据
if not names:
with self._lock:
self._snapshot = {}
return
# 初始化:未触发过的系统调用也展示为 0
totals: dict[str, int] = {n: 0 for n in names}
for k, v in self._bpf["counts"].items():
try:
# 把内核里的系统调用号转换成可读名称(如 0 -> "read"
name = syscall_name(k.value).decode("utf-8", "replace")
except Exception:
# 解析失败的条目直接跳过,避免影响其他计数
continue
if name in totals:
totals[name] += v.value
with self._lock:
self._snapshot = totals
self._detach()
def get_counts(self) -> dict[str, int]:
"""供 Web 层调用:返回最近一次快照的副本。"""
with self._lock:
return dict(self._snapshot)
def _build_bpf_text(self, syscalls: list[str]) -> str:
"""根据 syscall 列表动态生成 BPF C 源码,每个 syscall 一个探针函数。"""
funcs = []
for i, name in enumerate(syscalls):
funcs.append(
f"int trace_{name}(struct pt_regs *ctx) {{\n"
f" u32 id = {i};\n"
f" u64 zero = 0, *val;\n"
f" val = counts.lookup_or_try_init(&id, &zero);\n"
f" if (val) {{ __sync_fetch_and_add(val, 1); }}\n"
f" return 0;\n"
f"}}"
)
return "BPF_HASH(counts, u32, u64);\n\n" + "\n\n".join(funcs) + "\n"
def _attach(self, syscalls: list[str]) -> None:
"""编译 BPF 程序并为每个 syscall 挂载 kprobe。"""
text = self._build_bpf_text(syscalls)
self._bpf = BPF(text=text)
for name in syscalls:
fn = self._bpf.get_syscall_fnname(name)
self._bpf.attach_kprobe(event=fn, fn_name=f"trace_{name}")
self._attached = list(syscalls)
self._index_map = {i: name for i, name in enumerate(syscalls)}
def _detach(self) -> None:
"""卸载当前 BPF 程序及所有 kprobe。"""
if self._bpf is not None:
self._bpf.cleanup()
self._bpf = None
self._attached = []
self._index_map = {}
def _reload(self, new_syscalls: list[str]) -> None:
"""配置变更时重载:累加当前计数后重新编译挂载。"""
self._accumulate_counts()
self._detach()
if new_syscalls:
self._attach(new_syscalls)
def _accumulate_counts(self) -> None:
"""将当前 BPF 计数累加到 Python 端,防止重载丢失数据。"""
if self._bpf is None:
return
for k, v in self._bpf["counts"].items():
name = self._index_map.get(k.value)
if name:
self._accumulated[name] = self._accumulated.get(name, 0) + v.value
def _poll_loop(self) -> None:
"""后台轮询:检测配置变更 + 刷新快照。"""
while not self._stop.wait(1.0):
current_config = self.load_config()
if set(current_config) != set(self._attached):
self._reload(current_config)
self._refresh_snapshot()
def _refresh_snapshot(self) -> None:
"""从 BPF 映射读取计数,合并累加器,写入快照。"""
if not self._attached:
with self._lock:
self._snapshot = {}
return
totals: dict[str, int] = {name: self._accumulated.get(name, 0) for name in self._attached}
if self._bpf is not None:
for k, v in self._bpf["counts"].items():
name = self._index_map.get(k.value)
if name:
totals[name] = totals.get(name, 0) + v.value
with self._lock:
self._snapshot = totals
# 进程级单例:整个应用只挂载一次 BPF 程序,避免重复挂载和资源浪费
_tracer: SyscallTracer | None = None