Files
syscall_monitor/collector/syscall_tracer.py
MarceloZoeng 26a5f99587
All checks were successful
CI / lint-and-build (push) Successful in 8s
中文+README
2026-06-14 12:08:27 +08:00

128 lines
4.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""基于 eBPF 的系统调用计数器。
将一段 BPF 程序挂载到 raw_syscalls:sys_enter 跟踪点,按系统调用号在
BPF_HASH 映射中累计调用次数。Python 端定时读取该映射,并按
config/monitors.json 中配置的名称对外暴露各系统调用的计数。
为什么在内核里统计所有系统调用,而不是只统计配置中列出的?
- 配置可能在运行期变化;如果在 BPF 中做过滤,每次改配置都要
重建并重新挂载程序。直接在内核里全部计数(一次哈希自增)开销极低,
Web 层只需挑选要展示的名称即可。
"""
import json
import os
import threading
import time
from pathlib import Path
from bcc import BPF
from bcc.syscall import syscall_name
# 内核态 BPF 程序:在每次系统调用入口处,对该调用号对应的计数器自增 1
BPF_TEXT = r"""
BPF_HASH(counts, u32, u64);
TRACEPOINT_PROBE(raw_syscalls, sys_enter) {
u32 id = (u32)args->id;
u64 zero = 0, *val;
val = counts.lookup_or_try_init(&id, &zero);
if (val) {
// 原子自增,保证多 CPU 并发下计数正确
__sync_fetch_and_add(val, 1);
}
return 0;
}
"""
class SyscallTracer:
"""系统调用追踪器:加载 BPF 程序并周期性刷新计数快照。"""
def __init__(self, config_path: Path):
self.config_path = Path(config_path)
self._bpf = None
# 保护 _snapshot 的并发访问后台轮询线程写入HTTP 请求线程读取)
self._lock = threading.Lock()
# 最近一次快照:{系统调用名: 累计次数}
self._snapshot: dict[str, int] = {}
# 用于通知后台线程退出的事件
self._stop = threading.Event()
self._thread: threading.Thread | None = None
def load_config(self) -> list[str]:
"""从 monitors.json 读取需要展示的系统调用名称列表。"""
try:
with self.config_path.open("r", encoding="utf-8") as f:
data = json.load(f)
names = data.get("syscalls", [])
# 过滤空字符串并去除首尾空格
return [str(n).strip() for n in names if str(n).strip()]
except FileNotFoundError:
return []
def start(self) -> None:
"""加载并挂载 BPF 程序,启动后台轮询线程。"""
# 防止重复加载(已启动则直接返回)
if self._bpf is not None:
return
self._bpf = BPF(text=BPF_TEXT)
# daemon=True主进程退出时后台线程自动终止
self._thread = threading.Thread(target=self._poll_loop, daemon=True)
self._thread.start()
def stop(self) -> None:
"""停止追踪:通知后台线程退出并卸载 BPF 程序。"""
self._stop.set()
if self._thread:
self._thread.join(timeout=2)
self._bpf = None
def _poll_loop(self) -> None:
# 每秒刷新一次快照;用 Event.wait 兼顾「定时」与「可中断」
while not self._stop.wait(1.0):
self._refresh_snapshot()
def _refresh_snapshot(self) -> None:
"""从 BPF 映射读取原始计数,按配置过滤出关心的系统调用。"""
if self._bpf is None:
return
names = set(self.load_config())
# 配置为空:清空快照,避免展示过期数据
if not names:
with self._lock:
self._snapshot = {}
return
# 初始化:未触发过的系统调用也展示为 0
totals: dict[str, int] = {n: 0 for n in names}
for k, v in self._bpf["counts"].items():
try:
# 把内核里的系统调用号转换成可读名称(如 0 -> "read"
name = syscall_name(k.value).decode("utf-8", "replace")
except Exception:
# 解析失败的条目直接跳过,避免影响其他计数
continue
if name in totals:
totals[name] += v.value
with self._lock:
self._snapshot = totals
def get_counts(self) -> dict[str, int]:
"""供 Web 层调用:返回最近一次快照的副本。"""
with self._lock:
return dict(self._snapshot)
# 进程级单例:整个应用只挂载一次 BPF 程序,避免重复挂载和资源浪费
_tracer: SyscallTracer | None = None
def get_tracer(config_path: str | os.PathLike) -> SyscallTracer:
"""获取(必要时创建)全局追踪器实例。"""
global _tracer
if _tracer is None:
_tracer = SyscallTracer(Path(config_path))
_tracer.start()
return _tracer