from __future__ import annotations import os import struct from datetime import datetime, timezone from pathlib import Path from typing import Any PROJECT_ROOT = Path(__file__).resolve().parents[2] WORKSPACE_ROOT = PROJECT_ROOT.parent JPEG_FRAME_DIR = WORKSPACE_ROOT / "RobotDataShow" / "jpeg-frames" GEOSTREAM_JSON_PATH = WORKSPACE_ROOT / "GeoStream" / "gps_latest.json" GEOSTREAM_STALE_SECONDS = 15 OMNISOCKET_CONFIG_PATH = PROJECT_ROOT / "config" / "omnisocket_demo.yaml" VIDEO_SOURCE_MODE = os.getenv("VIDEO_SOURCE_MODE", "auto").strip().lower() OMNISOCKET_FRAME_FRESH_SECONDS = 2.0 VIDEO_TIMESTAMP_SAMPLE_SIZE = 10 VIDEO_TIMESTAMP_TRAILER_BYTES = 8 VIDEO_TIMESTAMP_ENDIANNESS = "little" VIDEO_TIMESTAMP_UNIT = "ms" VIDEO_TIMESTAMP_MULTIPLIER_NS = 1_000_000 VIDEO_TIMESTAMP_MAX_SKEW_NS = 7 * 24 * 60 * 60 * 1_000_000_000 CONTROL_PACKET = struct.Struct("<6f") CONTROL_PACKET_SIZE = CONTROL_PACKET.size CONTROL_SOURCE_NATIVE_UDP = "native_udp" CONTROL_SOURCE_WEB = "web" CONTROL_SOURCE_PRIORITY = (CONTROL_SOURCE_NATIVE_UDP, CONTROL_SOURCE_WEB) ZERO_CONTROL_PAYLOAD = CONTROL_PACKET.pack(0.0, 0.0, 0.0, 0.0, 0.0, 0.0) def utc_iso_now() -> str: return datetime.now(timezone.utc).isoformat(timespec="seconds").replace("+00:00", "Z") def parse_simple_yaml_scalar(value: str) -> Any: if value in {'""', "''"}: return "" if len(value) >= 2 and value[0] == value[-1] and value[0] in {'"', "'"}: return value[1:-1] if value.lower() == "true": return True if value.lower() == "false": return False if value and value.lstrip("-").isdigit(): return int(value) return value def load_simple_yaml_config(path: Path) -> dict[str, Any]: parsed: dict[str, Any] = {} current_section: str | None = None with path.open("r", encoding="utf-8") as file: for raw_line in file: line = raw_line.split("#", 1)[0].rstrip() if not line.strip(): continue if not line.startswith(" "): if not line.endswith(":"): raise ValueError(f"invalid top-level yaml line: {raw_line.strip()}") current_section = line[:-1].strip() parsed[current_section] = {} continue if current_section is None: raise ValueError(f"yaml key outside section: {raw_line.strip()}") stripped = line.strip() if ":" not in stripped: raise ValueError(f"invalid yaml key line: {raw_line.strip()}") key, value = stripped.split(":", 1) parsed[current_section][key.strip()] = parse_simple_yaml_scalar(value.strip()) return parsed def load_omnisocket_config() -> dict[str, Any]: config: dict[str, Any] = {} if OMNISOCKET_CONFIG_PATH.exists(): try: try: import yaml # type: ignore with OMNISOCKET_CONFIG_PATH.open("r", encoding="utf-8") as file: config = yaml.safe_load(file) or {} except ImportError: config = load_simple_yaml_config(OMNISOCKET_CONFIG_PATH) except Exception: config = {} transport_cfg = dict(config.get("transport", {})) video_receiver_cfg = dict(config.get("video_receiver", {})) control_sender_cfg = dict(config.get("control_sender", {})) control_ingress_cfg = dict(config.get("control_ingress", {})) video_sender_cfg = dict(config.get("video_sender", {})) telemetry_receiver_cfg = dict(config.get("telemetry_receiver", {})) transport_cfg["server_addr"] = os.getenv( "OMNISOCKET_SERVER_ADDR", str(transport_cfg.get("server_addr", "127.0.0.1:10909")), ) transport_cfg["relay_via"] = os.getenv( "OMNISOCKET_RELAY_VIA", str(transport_cfg.get("relay_via", "")), ) transport_cfg["bind_ip"] = os.getenv( "OMNISOCKET_BIND_IP", str(transport_cfg.get("bind_ip", "")), ) transport_cfg["bind_device"] = os.getenv( "OMNISOCKET_BIND_DEVICE", str(transport_cfg.get("bind_device", "")), ) video_receiver_cfg["peer_id"] = os.getenv( "OMNISOCKET_VIDEO_PEER_ID", str(video_receiver_cfg.get("peer_id", "peer-a-video")), ) video_receiver_cfg["buffer_bytes"] = int( os.getenv( "OMNISOCKET_BUFFER_BYTES", str(video_receiver_cfg.get("buffer_bytes", 1024 * 1024)), ) ) control_sender_cfg["peer_id"] = os.getenv( "OMNISOCKET_CONTROL_PEER_ID", str(control_sender_cfg.get("peer_id", "peer-a-ctrl")), ) control_sender_cfg["target_peer"] = os.getenv( "OMNISOCKET_CONTROL_TARGET_PEER", str(control_sender_cfg.get("target_peer", "peer-b-ctrl")), ) video_sender_cfg["peer_id"] = os.getenv( "OMNISOCKET_VIDEO_SENDER_PEER_ID", str(video_sender_cfg.get("peer_id", "peer-b-video")), ) video_sender_cfg["target_peer"] = os.getenv( "OMNISOCKET_VIDEO_TARGET_PEER_ID", str(video_sender_cfg.get("target_peer", "peer-a-video")), ) control_ingress_cfg["native_udp_bind"] = os.getenv( "OMNISOCKET_CONTROL_NATIVE_UDP_BIND", str(control_ingress_cfg.get("native_udp_bind", "127.0.0.1:10921")), ) control_ingress_cfg["source_lease_ms"] = int( os.getenv( "OMNISOCKET_CONTROL_SOURCE_LEASE_MS", str(control_ingress_cfg.get("source_lease_ms", 300)), ) ) control_ingress_cfg["send_rate_hz"] = float( os.getenv( "OMNISOCKET_CONTROL_SEND_RATE_HZ", str(control_ingress_cfg.get("send_rate_hz", 20.0)), ) ) control_ingress_cfg["zero_burst_packets"] = int( os.getenv( "OMNISOCKET_CONTROL_ZERO_BURST_PACKETS", str(control_ingress_cfg.get("zero_burst_packets", 3)), ) ) telemetry_receiver_cfg["peer_id"] = os.getenv( "OMNISOCKET_TELEMETRY_PEER_ID", str(telemetry_receiver_cfg.get("peer_id", "peer-a-telemetry")), ) telemetry_receiver_cfg["interval_ms"] = int( os.getenv( "OMNISOCKET_TELEMETRY_INTERVAL_MS", str(telemetry_receiver_cfg.get("interval_ms", 500)), ) ) telemetry_receiver_cfg["stale_after_ms"] = int( os.getenv( "OMNISOCKET_TELEMETRY_STALE_AFTER_MS", str(telemetry_receiver_cfg.get("stale_after_ms", telemetry_receiver_cfg["interval_ms"] * 3)), ) ) return { "transport": transport_cfg, "video_receiver": video_receiver_cfg, "control_sender": control_sender_cfg, "control_ingress": control_ingress_cfg, "video_sender": video_sender_cfg, "telemetry_receiver": telemetry_receiver_cfg, } def parse_host_port(bind_addr: str) -> tuple[str, int]: host, port_text = bind_addr.rsplit(":", 1) host = host.strip() or "127.0.0.1" port = int(port_text) if port <= 0 or port > 65535: raise ValueError(f"invalid port in bind address: {bind_addr}") return host, port