"""eBPF syscall counter. Attaches a single BPF program to the raw_syscalls:sys_enter tracepoint and counts every syscall by its numeric id in a BPF_HASH map. The Python side periodically reads the map and exposes per-syscall counts for the names listed in config/monitors.json. Why count all syscalls, not just configured ones? - Config changes at runtime; if we filtered inside BPF we'd have to rebuild and reattach the program. Counting everything in-kernel is cheap (one hash increment) and lets the web layer pick which names to surface. """ import json import os import threading import time from pathlib import Path from bcc import BPF from bcc.syscall import syscall_name BPF_TEXT = r""" BPF_HASH(counts, u32, u64); TRACEPOINT_PROBE(raw_syscalls, sys_enter) { u32 id = (u32)args->id; u64 zero = 0, *val; val = counts.lookup_or_try_init(&id, &zero); if (val) { __sync_fetch_and_add(val, 1); } return 0; } """ class SyscallTracer: def __init__(self, config_path: Path): self.config_path = Path(config_path) self._bpf = None self._lock = threading.Lock() self._snapshot: dict[str, int] = {} self._stop = threading.Event() self._thread: threading.Thread | None = None def load_config(self) -> list[str]: try: with self.config_path.open("r", encoding="utf-8") as f: data = json.load(f) names = data.get("syscalls", []) return [str(n).strip() for n in names if str(n).strip()] except FileNotFoundError: return [] def start(self) -> None: if self._bpf is not None: return self._bpf = BPF(text=BPF_TEXT) self._thread = threading.Thread(target=self._poll_loop, daemon=True) self._thread.start() def stop(self) -> None: self._stop.set() if self._thread: self._thread.join(timeout=2) self._bpf = None def _poll_loop(self) -> None: while not self._stop.wait(1.0): self._refresh_snapshot() def _refresh_snapshot(self) -> None: if self._bpf is None: return names = set(self.load_config()) if not names: with self._lock: self._snapshot = {} return totals: dict[str, int] = {n: 0 for n in names} for k, v in self._bpf["counts"].items(): try: name = syscall_name(k.value).decode("utf-8", "replace") except Exception: continue if name in totals: totals[name] += v.value with self._lock: self._snapshot = totals def get_counts(self) -> dict[str, int]: with self._lock: return dict(self._snapshot) _tracer: SyscallTracer | None = None def get_tracer(config_path: str | os.PathLike) -> SyscallTracer: global _tracer if _tracer is None: _tracer = SyscallTracer(Path(config_path)) _tracer.start() return _tracer