"""Minimal video-plane sample that sends fixed-size binary frames over KCP.""" from __future__ import annotations from pathlib import Path import os import sys import time import yaml try: from omnisocket import Session, VIDEO_DEFAULTS except ImportError: sys.path.insert(0, str(Path(__file__).resolve().parent / "python")) from omnisocket import Session, VIDEO_DEFAULTS def load_config() -> dict: config_path = Path(__file__).resolve().parent / "config" / "omnisocket_demo.yaml" if not config_path.exists(): return {} with config_path.open("r", encoding="utf-8") as file: return yaml.safe_load(file) or {} def main() -> None: config = load_config() transport_cfg = config.get("transport", {}) video_cfg = config.get("video_sender", {}) server_addr = str(transport_cfg.get("server_addr", "127.0.0.1:10909")) relay_via = str(transport_cfg.get("relay_via", "")) bind_ip = str(transport_cfg.get("bind_ip", "")) bind_device = str(transport_cfg.get("bind_device", "")) peer_id = str(video_cfg.get("peer_id", "peer-b-video")) target_peer = str(video_cfg.get("target_peer", "peer-a-video")) frame_bytes = int(video_cfg.get("frame_bytes", 30720)) frame_interval_ms = int(video_cfg.get("frame_interval_ms", 66)) session = Session() session.connect( server_addr=server_addr, peer_id=peer_id, relay_via=relay_via, bind_ip=bind_ip, bind_device=bind_device, **VIDEO_DEFAULTS, ) sequence = 0 try: while True: frame = sequence.to_bytes(8, "big") + os.urandom(max(0, frame_bytes - 8)) session.send(to=target_peer, data=frame) print(f"sent frame={sequence} bytes={len(frame)}") sequence += 1 time.sleep(frame_interval_ms / 1000.0) except KeyboardInterrupt: pass finally: session.close() if __name__ == "__main__": main()