Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 47fbee4b8b | |||
| 81a8573e51 |
@@ -1,74 +1,55 @@
|
|||||||
"""基于 eBPF 的系统调用计数器。
|
"""基于 eBPF kprobe 的系统调用计数器。
|
||||||
|
|
||||||
将一段 BPF 程序挂载到 raw_syscalls:sys_enter 跟踪点,按系统调用号在
|
为 config/monitors.json 中配置的每个系统调用,动态生成一个 BPF 探针函数,
|
||||||
BPF_HASH 映射中累计调用次数。Python 端定时读取该映射,并按
|
通过 kprobe 挂载到对应的内核 syscall 入口(如 __x64_sys_read)。
|
||||||
config/monitors.json 中配置的名称对外暴露各系统调用的计数。
|
Python 端定时读取 BPF_HASH 映射并对外暴露各系统调用的计数。
|
||||||
|
|
||||||
为什么在内核里统计所有系统调用,而不是只统计配置中列出的?
|
配置变更时自动重载:先将当前计数累加到 Python 端,再重新编译挂载,
|
||||||
- 配置可能在运行期变化;如果在 BPF 中做过滤,每次改配置都要
|
保证计数连续性。
|
||||||
重建并重新挂载程序。直接在内核里全部计数(一次哈希自增)开销极低,
|
|
||||||
Web 层只需挑选要展示的名称即可。
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import json
|
import json
|
||||||
import os
|
import os
|
||||||
import threading
|
import threading
|
||||||
import time
|
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
from bcc import BPF
|
from bcc import BPF
|
||||||
from bcc.syscall import syscall_name
|
|
||||||
|
|
||||||
|
|
||||||
# 内核态 BPF 程序:在每次系统调用入口处,对该调用号对应的计数器自增 1
|
|
||||||
BPF_TEXT = r"""
|
|
||||||
BPF_HASH(counts, u32, u64);
|
|
||||||
|
|
||||||
TRACEPOINT_PROBE(raw_syscalls, sys_enter) {
|
|
||||||
u32 id = (u32)args->id;
|
|
||||||
u64 zero = 0, *val;
|
|
||||||
val = counts.lookup_or_try_init(&id, &zero);
|
|
||||||
if (val) {
|
|
||||||
// 原子自增,保证多 CPU 并发下计数正确
|
|
||||||
__sync_fetch_and_add(val, 1);
|
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
"""
|
|
||||||
|
|
||||||
|
|
||||||
class SyscallTracer:
|
class SyscallTracer:
|
||||||
"""系统调用追踪器:加载 BPF 程序并周期性刷新计数快照。"""
|
"""系统调用追踪器:基于 kprobe,为每个配置的 syscall 挂载独立探针。"""
|
||||||
|
|
||||||
def __init__(self, config_path: Path):
|
def __init__(self, config_path: Path):
|
||||||
self.config_path = Path(config_path)
|
self.config_path = Path(config_path)
|
||||||
self._bpf = None
|
self._bpf: BPF | None = None
|
||||||
# 保护 _snapshot 的并发访问(后台轮询线程写入,HTTP 请求线程读取)
|
|
||||||
self._lock = threading.Lock()
|
self._lock = threading.Lock()
|
||||||
# 最近一次快照:{系统调用名: 累计次数}
|
|
||||||
self._snapshot: dict[str, int] = {}
|
self._snapshot: dict[str, int] = {}
|
||||||
# 用于通知后台线程退出的事件
|
# Python 端累加器:重载 BPF 时保留历史计数
|
||||||
|
self._accumulated: dict[str, int] = {}
|
||||||
|
# 当前已挂载的 syscall 列表(有序,与 BPF 中 index 对应)
|
||||||
|
self._attached: list[str] = []
|
||||||
|
# index -> syscall name 映射表
|
||||||
|
self._index_map: dict[int, str] = {}
|
||||||
self._stop = threading.Event()
|
self._stop = threading.Event()
|
||||||
self._thread: threading.Thread | None = None
|
self._thread: threading.Thread | None = None
|
||||||
|
|
||||||
def load_config(self) -> list[str]:
|
def load_config(self) -> list[str]:
|
||||||
"""从 monitors.json 读取需要展示的系统调用名称列表。"""
|
"""从 monitors.json 读取需要监控的系统调用名称列表。"""
|
||||||
try:
|
try:
|
||||||
with self.config_path.open("r", encoding="utf-8") as f:
|
with self.config_path.open("r", encoding="utf-8") as f:
|
||||||
data = json.load(f)
|
data = json.load(f)
|
||||||
names = data.get("syscalls", [])
|
names = data.get("syscalls", [])
|
||||||
# 过滤空字符串并去除首尾空格
|
|
||||||
return [str(n).strip() for n in names if str(n).strip()]
|
return [str(n).strip() for n in names if str(n).strip()]
|
||||||
except FileNotFoundError:
|
except FileNotFoundError:
|
||||||
return []
|
return []
|
||||||
|
|
||||||
def start(self) -> None:
|
def start(self) -> None:
|
||||||
"""加载并挂载 BPF 程序,启动后台轮询线程。"""
|
"""加载 BPF 程序并启动后台轮询线程。"""
|
||||||
# 防止重复加载(已启动则直接返回)
|
if self._thread is not None:
|
||||||
if self._bpf is not None:
|
|
||||||
return
|
return
|
||||||
self._bpf = BPF(text=BPF_TEXT)
|
syscalls = self.load_config()
|
||||||
# daemon=True:主进程退出时后台线程自动终止
|
if syscalls:
|
||||||
|
self._attach(syscalls)
|
||||||
self._thread = threading.Thread(target=self._poll_loop, daemon=True)
|
self._thread = threading.Thread(target=self._poll_loop, daemon=True)
|
||||||
self._thread.start()
|
self._thread.start()
|
||||||
|
|
||||||
@@ -77,44 +58,86 @@ class SyscallTracer:
|
|||||||
self._stop.set()
|
self._stop.set()
|
||||||
if self._thread:
|
if self._thread:
|
||||||
self._thread.join(timeout=2)
|
self._thread.join(timeout=2)
|
||||||
self._bpf = None
|
self._detach()
|
||||||
|
|
||||||
def _poll_loop(self) -> None:
|
|
||||||
# 每秒刷新一次快照;用 Event.wait 兼顾「定时」与「可中断」
|
|
||||||
while not self._stop.wait(1.0):
|
|
||||||
self._refresh_snapshot()
|
|
||||||
|
|
||||||
def _refresh_snapshot(self) -> None:
|
|
||||||
"""从 BPF 映射读取原始计数,按配置过滤出关心的系统调用。"""
|
|
||||||
if self._bpf is None:
|
|
||||||
return
|
|
||||||
names = set(self.load_config())
|
|
||||||
# 配置为空:清空快照,避免展示过期数据
|
|
||||||
if not names:
|
|
||||||
with self._lock:
|
|
||||||
self._snapshot = {}
|
|
||||||
return
|
|
||||||
# 初始化:未触发过的系统调用也展示为 0
|
|
||||||
totals: dict[str, int] = {n: 0 for n in names}
|
|
||||||
for k, v in self._bpf["counts"].items():
|
|
||||||
try:
|
|
||||||
# 把内核里的系统调用号转换成可读名称(如 0 -> "read")
|
|
||||||
name = syscall_name(k.value).decode("utf-8", "replace")
|
|
||||||
except Exception:
|
|
||||||
# 解析失败的条目直接跳过,避免影响其他计数
|
|
||||||
continue
|
|
||||||
if name in totals:
|
|
||||||
totals[name] += v.value
|
|
||||||
with self._lock:
|
|
||||||
self._snapshot = totals
|
|
||||||
|
|
||||||
def get_counts(self) -> dict[str, int]:
|
def get_counts(self) -> dict[str, int]:
|
||||||
"""供 Web 层调用:返回最近一次快照的副本。"""
|
"""供 Web 层调用:返回最近一次快照的副本。"""
|
||||||
with self._lock:
|
with self._lock:
|
||||||
return dict(self._snapshot)
|
return dict(self._snapshot)
|
||||||
|
|
||||||
|
def _build_bpf_text(self, syscalls: list[str]) -> str:
|
||||||
|
"""根据 syscall 列表动态生成 BPF C 源码,每个 syscall 一个探针函数。"""
|
||||||
|
funcs = []
|
||||||
|
for i, name in enumerate(syscalls):
|
||||||
|
funcs.append(
|
||||||
|
f"int trace_{name}(struct pt_regs *ctx) {{\n"
|
||||||
|
f" u32 id = {i};\n"
|
||||||
|
f" u64 zero = 0, *val;\n"
|
||||||
|
f" val = counts.lookup_or_try_init(&id, &zero);\n"
|
||||||
|
f" if (val) {{ __sync_fetch_and_add(val, 1); }}\n"
|
||||||
|
f" return 0;\n"
|
||||||
|
f"}}"
|
||||||
|
)
|
||||||
|
return "BPF_HASH(counts, u32, u64);\n\n" + "\n\n".join(funcs) + "\n"
|
||||||
|
|
||||||
|
def _attach(self, syscalls: list[str]) -> None:
|
||||||
|
"""编译 BPF 程序并为每个 syscall 挂载 kprobe。"""
|
||||||
|
text = self._build_bpf_text(syscalls)
|
||||||
|
self._bpf = BPF(text=text)
|
||||||
|
for name in syscalls:
|
||||||
|
fn = self._bpf.get_syscall_fnname(name)
|
||||||
|
self._bpf.attach_kprobe(event=fn, fn_name=f"trace_{name}")
|
||||||
|
self._attached = list(syscalls)
|
||||||
|
self._index_map = {i: name for i, name in enumerate(syscalls)}
|
||||||
|
|
||||||
|
def _detach(self) -> None:
|
||||||
|
"""卸载当前 BPF 程序及所有 kprobe。"""
|
||||||
|
if self._bpf is not None:
|
||||||
|
self._bpf.cleanup()
|
||||||
|
self._bpf = None
|
||||||
|
self._attached = []
|
||||||
|
self._index_map = {}
|
||||||
|
|
||||||
|
def _reload(self, new_syscalls: list[str]) -> None:
|
||||||
|
"""配置变更时重载:累加当前计数后重新编译挂载。"""
|
||||||
|
self._accumulate_counts()
|
||||||
|
self._detach()
|
||||||
|
if new_syscalls:
|
||||||
|
self._attach(new_syscalls)
|
||||||
|
|
||||||
|
def _accumulate_counts(self) -> None:
|
||||||
|
"""将当前 BPF 计数累加到 Python 端,防止重载丢失数据。"""
|
||||||
|
if self._bpf is None:
|
||||||
|
return
|
||||||
|
for k, v in self._bpf["counts"].items():
|
||||||
|
name = self._index_map.get(k.value)
|
||||||
|
if name:
|
||||||
|
self._accumulated[name] = self._accumulated.get(name, 0) + v.value
|
||||||
|
|
||||||
|
def _poll_loop(self) -> None:
|
||||||
|
"""后台轮询:检测配置变更 + 刷新快照。"""
|
||||||
|
while not self._stop.wait(1.0):
|
||||||
|
current_config = self.load_config()
|
||||||
|
if set(current_config) != set(self._attached):
|
||||||
|
self._reload(current_config)
|
||||||
|
self._refresh_snapshot()
|
||||||
|
|
||||||
|
def _refresh_snapshot(self) -> None:
|
||||||
|
"""从 BPF 映射读取计数,合并累加器,写入快照。"""
|
||||||
|
if not self._attached:
|
||||||
|
with self._lock:
|
||||||
|
self._snapshot = {}
|
||||||
|
return
|
||||||
|
totals: dict[str, int] = {name: self._accumulated.get(name, 0) for name in self._attached}
|
||||||
|
if self._bpf is not None:
|
||||||
|
for k, v in self._bpf["counts"].items():
|
||||||
|
name = self._index_map.get(k.value)
|
||||||
|
if name:
|
||||||
|
totals[name] = totals.get(name, 0) + v.value
|
||||||
|
with self._lock:
|
||||||
|
self._snapshot = totals
|
||||||
|
|
||||||
|
|
||||||
# 进程级单例:整个应用只挂载一次 BPF 程序,避免重复挂载和资源浪费
|
|
||||||
_tracer: SyscallTracer | None = None
|
_tracer: SyscallTracer | None = None
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
33
demo/client.py
Normal file
33
demo/client.py
Normal file
@@ -0,0 +1,33 @@
|
|||||||
|
"""TCP 单次发送客户端:发送 3 条测试消息,验证基本连通性"""
|
||||||
|
|
||||||
|
import socket
|
||||||
|
import time
|
||||||
|
|
||||||
|
HOST = "127.0.0.1"
|
||||||
|
PORT = 8888
|
||||||
|
MESSAGE_COUNT = 3
|
||||||
|
|
||||||
|
|
||||||
|
def send_once(index: int) -> bool:
|
||||||
|
try:
|
||||||
|
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||||
|
client.settimeout(5)
|
||||||
|
client.connect((HOST, PORT))
|
||||||
|
msg = f"test_message_{index}_{time.time()}"
|
||||||
|
client.send(msg.encode("utf-8"))
|
||||||
|
reply = client.recv(1024)
|
||||||
|
client.close()
|
||||||
|
print(f"消息 {index} 发送成功,回复: {reply.decode('utf-8', errors='replace')}")
|
||||||
|
return True
|
||||||
|
except Exception as exc:
|
||||||
|
print(f"消息 {index} 失败: {exc}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
print(f"开始连通性测试: 发送 {MESSAGE_COUNT} 条消息到 {HOST}:{PORT}")
|
||||||
|
success = 0
|
||||||
|
for i in range(MESSAGE_COUNT):
|
||||||
|
if send_once(i):
|
||||||
|
success += 1
|
||||||
|
print(f"完成: 成功 {success}/{MESSAGE_COUNT}")
|
||||||
48
demo/server.py
Normal file
48
demo/server.py
Normal file
@@ -0,0 +1,48 @@
|
|||||||
|
"""TCP 文本写入服务端(实验大纲示例应用)"""
|
||||||
|
|
||||||
|
import os
|
||||||
|
import socket
|
||||||
|
|
||||||
|
HOST = "0.0.0.0"
|
||||||
|
PORT = 8888
|
||||||
|
SAVE_PATH = "./data.txt"
|
||||||
|
|
||||||
|
if not os.path.exists(os.path.dirname(SAVE_PATH)) and os.path.dirname(SAVE_PATH):
|
||||||
|
os.makedirs(os.path.dirname(SAVE_PATH))
|
||||||
|
|
||||||
|
|
||||||
|
def save_to_file(content):
|
||||||
|
"""文本追加写入本地磁盘"""
|
||||||
|
with open(SAVE_PATH, "a", encoding="utf-8") as f:
|
||||||
|
f.write(content + "\n")
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||||
|
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||||
|
server_socket.bind((HOST, PORT))
|
||||||
|
server_socket.listen(5)
|
||||||
|
print(f"TCP 写入服务已启动,监听端口:{PORT}")
|
||||||
|
print(f"数据保存路径:{os.path.abspath(SAVE_PATH)}")
|
||||||
|
|
||||||
|
while True:
|
||||||
|
conn, addr = server_socket.accept()
|
||||||
|
print(f"\n客户端已连接:{addr}")
|
||||||
|
try:
|
||||||
|
while True:
|
||||||
|
data = conn.recv(1024)
|
||||||
|
if not data:
|
||||||
|
break
|
||||||
|
text = data.decode("utf-8").strip()
|
||||||
|
print(f"收到数据:{text}")
|
||||||
|
save_to_file(text)
|
||||||
|
conn.send("写入成功,数据已保存到磁盘".encode("utf-8"))
|
||||||
|
except Exception as e:
|
||||||
|
print(f"连接异常:{e}")
|
||||||
|
finally:
|
||||||
|
conn.close()
|
||||||
|
print("客户端连接断开")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
47
demo/stress_client.py
Normal file
47
demo/stress_client.py
Normal file
@@ -0,0 +1,47 @@
|
|||||||
|
"""TCP 压力测试客户端:模拟峰值负载,触发大量系统调用"""
|
||||||
|
|
||||||
|
import argparse
|
||||||
|
import socket
|
||||||
|
import time
|
||||||
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||||
|
|
||||||
|
HOST = "127.0.0.1"
|
||||||
|
PORT = 8888
|
||||||
|
|
||||||
|
|
||||||
|
def send_once(index: int) -> bool:
|
||||||
|
try:
|
||||||
|
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||||
|
client.settimeout(5)
|
||||||
|
client.connect((HOST, PORT))
|
||||||
|
msg = f"stress_test_message_{index}_{time.time()}"
|
||||||
|
client.send(msg.encode("utf-8"))
|
||||||
|
client.recv(1024)
|
||||||
|
client.close()
|
||||||
|
return True
|
||||||
|
except Exception as exc:
|
||||||
|
print(f"请求 {index} 失败: {exc}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def run_stress(total: int, workers: int):
|
||||||
|
print(f"开始压力测试: {total} 次请求, {workers} 并发")
|
||||||
|
start = time.time()
|
||||||
|
success = 0
|
||||||
|
|
||||||
|
with ThreadPoolExecutor(max_workers=workers) as pool:
|
||||||
|
futures = [pool.submit(send_once, i) for i in range(total)]
|
||||||
|
for future in as_completed(futures):
|
||||||
|
if future.result():
|
||||||
|
success += 1
|
||||||
|
|
||||||
|
elapsed = time.time() - start
|
||||||
|
print(f"完成: 成功 {success}/{total}, 耗时 {elapsed:.2f}s, QPS={success/elapsed:.1f}")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
parser = argparse.ArgumentParser(description="TCP 压力测试客户端")
|
||||||
|
parser.add_argument("-n", "--count", type=int, default=200, help="总请求数")
|
||||||
|
parser.add_argument("-w", "--workers", type=int, default=20, help="并发线程数")
|
||||||
|
args = parser.parse_args()
|
||||||
|
run_stress(args.count, args.workers)
|
||||||
22
hello.py
Normal file
22
hello.py
Normal file
@@ -0,0 +1,22 @@
|
|||||||
|
"""基础验证:eBPF kprobe Hello World 程序(实验大纲 基础验证部分)"""
|
||||||
|
|
||||||
|
from bcc import BPF
|
||||||
|
|
||||||
|
prog = """
|
||||||
|
int hello(void *ctx) {
|
||||||
|
bpf_trace_printk("Hello, World!\\n");
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
"""
|
||||||
|
|
||||||
|
b = BPF(text=prog)
|
||||||
|
execve_function = b.get_syscall_fnname("execve")
|
||||||
|
b.attach_kprobe(event=execve_function, fn_name="hello")
|
||||||
|
|
||||||
|
print("%-18s %-16s %-6s %s" % ("TIME(s)", "COMM", "PID", "MESSAGE"))
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
task, pid, cpu, flags, ts, msg = b.trace_fields()
|
||||||
|
except ValueError:
|
||||||
|
continue
|
||||||
|
print("%-18.9f %-16s %-6d %s" % (ts, task, pid, msg))
|
||||||
Reference in New Issue
Block a user