添加测试功能:TCP 写入服务端、单次发送客户端、压力测试客户端、kprobe Hello World基础验证
This commit is contained in:
47
demo/stress_client.py
Normal file
47
demo/stress_client.py
Normal file
@@ -0,0 +1,47 @@
|
||||
"""TCP 压力测试客户端:模拟峰值负载,触发大量系统调用"""
|
||||
|
||||
import argparse
|
||||
import socket
|
||||
import time
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
|
||||
HOST = "127.0.0.1"
|
||||
PORT = 8888
|
||||
|
||||
|
||||
def send_once(index: int) -> bool:
|
||||
try:
|
||||
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
client.settimeout(5)
|
||||
client.connect((HOST, PORT))
|
||||
msg = f"stress_test_message_{index}_{time.time()}"
|
||||
client.send(msg.encode("utf-8"))
|
||||
client.recv(1024)
|
||||
client.close()
|
||||
return True
|
||||
except Exception as exc:
|
||||
print(f"请求 {index} 失败: {exc}")
|
||||
return False
|
||||
|
||||
|
||||
def run_stress(total: int, workers: int):
|
||||
print(f"开始压力测试: {total} 次请求, {workers} 并发")
|
||||
start = time.time()
|
||||
success = 0
|
||||
|
||||
with ThreadPoolExecutor(max_workers=workers) as pool:
|
||||
futures = [pool.submit(send_once, i) for i in range(total)]
|
||||
for future in as_completed(futures):
|
||||
if future.result():
|
||||
success += 1
|
||||
|
||||
elapsed = time.time() - start
|
||||
print(f"完成: 成功 {success}/{total}, 耗时 {elapsed:.2f}s, QPS={success/elapsed:.1f}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser(description="TCP 压力测试客户端")
|
||||
parser.add_argument("-n", "--count", type=int, default=200, help="总请求数")
|
||||
parser.add_argument("-w", "--workers", type=int, default=20, help="并发线程数")
|
||||
args = parser.parse_args()
|
||||
run_stress(args.count, args.workers)
|
||||
Reference in New Issue
Block a user