"""Send high-rate control packets to benchmark the KCP control session.""" from __future__ import annotations import argparse from pathlib import Path import sys import time ROOT = Path(__file__).resolve().parents[1] sys.path.insert(0, str(ROOT)) import yaml from omnisocket_control import make_control_packet try: from omnisocket import CONTROL_DEFAULTS, Session except ImportError: sys.path.insert(0, str(ROOT / "python")) from omnisocket import CONTROL_DEFAULTS, Session def load_config() -> dict: config_path = ROOT / "config" / "omnisocket_demo.yaml" if not config_path.exists(): return {} with config_path.open("r", encoding="utf-8") as file: return yaml.safe_load(file) or {} def main() -> None: parser = argparse.ArgumentParser() parser.add_argument("--rate", type=float, default=200.0, help="send rate in Hz") parser.add_argument("--count", type=int, default=1000, help="packets to send") args = parser.parse_args() config = load_config() transport_cfg = config.get("transport", {}) sender_cfg = config.get("control_sender", {}) session = Session() session.connect( server_addr=str(transport_cfg.get("server_addr", "127.0.0.1:10909")), peer_id=str(sender_cfg.get("peer_id", "peer-a-ctrl")), relay_via=str(transport_cfg.get("relay_via", "")), bind_ip=str(transport_cfg.get("bind_ip", "")), bind_device=str(transport_cfg.get("bind_device", "")), **CONTROL_DEFAULTS, ) target_peer = str(sender_cfg.get("target_peer", "peer-b-ctrl")) spacing = 1.0 / args.rate if args.rate > 0 else 0.0 start = time.perf_counter() try: for seq_id in range(args.count): packet = make_control_packet(seq_id, "set_surge", drive_value=0.25) session.send(to=target_peer, data=packet.encode()) if spacing > 0: target = start + (seq_id + 1) * spacing remaining = target - time.perf_counter() if remaining > 0: time.sleep(remaining) finally: elapsed = time.perf_counter() - start print( f"sent {args.count} control packets in {elapsed:.3f}s " f"({(args.count / elapsed) if elapsed > 0 else 0.0:.1f} pkt/s)" ) print(f"stats={session.stats()}") session.close() if __name__ == "__main__": main()