from __future__ import annotations from collections import deque import json import sys import threading import time from datetime import datetime, timezone from typing import Any from .common import ( VIDEO_TRAILER_COORDINATE_FORMAT, WORKSPACE_ROOT, load_omnisocket_config, utc_iso_now, ) from .control import ControlArbiter, NativeUdpControlIngress, OmniSocketControlSender from .video import FrameTrailerMetadata, OmniSocketVideoReceiver LOCAL_SAMPLE_INTERVAL_MS = 500 TREND_HISTORY_SIZE = 10 TREND_WINDOW_SIZE = 5 def _utc_from_epoch(epoch_seconds: float | None) -> str | None: if epoch_seconds is None or epoch_seconds <= 0.0: return None return datetime.fromtimestamp(epoch_seconds, timezone.utc).isoformat(timespec="seconds").replace("+00:00", "Z") def _coerce_int(value: Any, default: int = 0) -> int: try: if value is None: return default return int(value) except (TypeError, ValueError): return default def _coerce_float(value: Any, default: float = 0.0) -> float: try: if value is None: return default return float(value) except (TypeError, ValueError): return default class GpsDataService: def __init__(self, receiver: OmniSocketVideoReceiver) -> None: self._receiver = receiver def get_latest(self) -> dict[str, Any]: metadata = self._receiver.get_latest_frame_metadata() if metadata is None: return self._build_waiting_payload() return self._build_payload_from_metadata(metadata) def _build_waiting_payload(self) -> dict[str, Any]: return { "has_fix": False, "utc_time": "--:--:--", "latitude": None, "longitude": None, "satellites": None, "altitude_m": None, "coordinate_system": "WGS84", "source_sentence": "VIDEO_TRAILER", "raw_coordinate_format": VIDEO_TRAILER_COORDINATE_FORMAT, "source_mode": "video-frame-trailer-waiting", "updated_at": "", } def _build_payload_from_metadata(self, metadata: FrameTrailerMetadata) -> dict[str, Any]: timestamp_seconds = metadata.timestamp_ns / 1_000_000_000 updated_at = _utc_from_epoch(metadata.received_at) or "" return { "has_fix": True, "utc_time": datetime.fromtimestamp(timestamp_seconds, timezone.utc).strftime("%H:%M:%S"), "latitude": round(metadata.latitude, 6), "longitude": round(metadata.longitude, 6), "satellites": None, "altitude_m": None, "coordinate_system": "WGS84", "source_sentence": "VIDEO_TRAILER", "raw_coordinate_format": VIDEO_TRAILER_COORDINATE_FORMAT, "source_mode": "video-frame-trailer", "updated_at": updated_at, } class KcpTrendTracker: def __init__(self) -> None: self._lock = threading.Lock() self._samples: dict[str, deque[dict[str, Any]]] = {} def _normalize(self, stats: dict[str, Any] | None) -> dict[str, Any]: raw = dict(stats or {}) snd_wnd = _coerce_int(raw.get("snd_wnd")) rmt_wnd = _coerce_int(raw.get("rmt_wnd")) inflight = _coerce_int(raw.get("inflight")) window_limit = _coerce_int(raw.get("window_limit"), min(snd_wnd, rmt_wnd) if snd_wnd and rmt_wnd else 0) return { "connected": _coerce_int(raw.get("connected")), "conv": _coerce_int(raw.get("conv")), "rto_ms": _coerce_int(raw.get("rto_ms")), "srtt_ms": _coerce_int(raw.get("srtt_ms")), "srttvar_ms": _coerce_int(raw.get("srttvar_ms")), "snd_wnd": snd_wnd, "rmt_wnd": rmt_wnd, "inflight": inflight, "window_limit": window_limit, "window_pressure_pct": round(_coerce_float(raw.get("window_pressure_pct")), 3), "snd_queue": _coerce_int(raw.get("snd_queue")), "rcv_queue": _coerce_int(raw.get("rcv_queue")), "snd_buffer": _coerce_int(raw.get("snd_buffer")), "out_segs_total": _coerce_int(raw.get("out_segs_total")), "retrans_total": _coerce_int(raw.get("retrans_total")), "fast_retrans_total": _coerce_int(raw.get("fast_retrans_total")), "lost_total": _coerce_int(raw.get("lost_total")), "repeat_total": _coerce_int(raw.get("repeat_total")), "xmit_total": _coerce_int(raw.get("xmit_total")), } def add_sample(self, key: str, stats: dict[str, Any] | None) -> None: sample = { "ts_monotonic": time.monotonic(), "updated_at": utc_iso_now(), "stats": self._normalize(stats), } with self._lock: history = self._samples.setdefault(key, deque(maxlen=TREND_HISTORY_SIZE)) history.append(sample) def latest_updated_at(self, key: str) -> str | None: with self._lock: history = self._samples.get(key) if not history: return None return str(history[-1].get("updated_at") or "") def describe(self, key: str, current_stats: dict[str, Any] | None) -> dict[str, Any]: current = self._normalize(current_stats) with self._lock: history = list(self._samples.get(key, ())) timeline = history + [{"stats": current, "updated_at": utc_iso_now()}] previous = timeline[-2]["stats"] if len(timeline) >= 2 else None trend_window = [entry["stats"] for entry in timeline[-TREND_WINDOW_SIZE:]] deadband = max(2.0, 0.05 * float(max(current.get("window_limit", 0), 1))) snd_queue_delta = 0 snd_buffer_delta = 0 retrans_delta = 0 fast_retrans_delta = 0 lost_delta = 0 repeat_delta = 0 out_segs_delta = 0 if previous is not None: snd_queue_delta = max(0, current["snd_queue"] - _coerce_int(previous.get("snd_queue"))) snd_buffer_delta = max(0, current["snd_buffer"] - _coerce_int(previous.get("snd_buffer"))) retrans_delta = max(0, current["retrans_total"] - _coerce_int(previous.get("retrans_total"))) fast_retrans_delta = max(0, current["fast_retrans_total"] - _coerce_int(previous.get("fast_retrans_total"))) lost_delta = max(0, current["lost_total"] - _coerce_int(previous.get("lost_total"))) repeat_delta = max(0, current["repeat_total"] - _coerce_int(previous.get("repeat_total"))) out_segs_delta = max(0, current["out_segs_total"] - _coerce_int(previous.get("out_segs_total"))) def classify(field: str) -> str: if len(trend_window) < 2: return "stable" oldest = float(_coerce_int(trend_window[0].get(field))) newest = float(_coerce_int(trend_window[-1].get(field))) delta = newest - oldest if abs(delta) < deadband: return "stable" return "rising" if delta > 0 else "falling" repair_rate_pct = 0.0 if out_segs_delta > 0: repair_rate_pct = round((retrans_delta / out_segs_delta) * 100.0, 3) return { "kcp": current, "trend": { "snd_queue_delta": snd_queue_delta, "snd_buffer_delta": snd_buffer_delta, "snd_queue_trend": classify("snd_queue"), "snd_buffer_trend": classify("snd_buffer"), "retrans_delta": retrans_delta, "fast_retrans_delta": fast_retrans_delta, "lost_delta": lost_delta, "repeat_delta": repeat_delta, "out_segs_delta": out_segs_delta, "repair_rate_pct": repair_rate_pct, }, } class HubTelemetryReceiver: def __init__(self) -> None: self._lock = threading.Lock() self._thread: threading.Thread | None = None self._started = False self._session = None self._session_cls = None self._msg_type_text = None self._msg_type_error = None self._telemetry_defaults: dict[str, Any] = {} self._latest_snapshot: dict[str, Any] | None = None self._last_error = "" self._last_received_wall = 0.0 self._last_received_monotonic = 0.0 self._reconnect_count = 0 self._ever_connected = False self._closing = threading.Event() self._load_backend() def _load_backend(self) -> None: try: self._import_backend() except Exception as error: # pragma: no cover - optional runtime dependency self._last_error = f"omnisocket import failed: {error}" def _import_backend(self) -> None: try: from omnisocket import MSG_TYPE_ERROR, MSG_TYPE_TEXT, Session, TELEMETRY_DEFAULTS # type: ignore except ImportError: python_dir = WORKSPACE_ROOT / "OmniSocketGo" / "python" if python_dir.exists(): sys.path.insert(0, str(python_dir)) from omnisocket import MSG_TYPE_ERROR, MSG_TYPE_TEXT, Session, TELEMETRY_DEFAULTS # type: ignore self._msg_type_error = MSG_TYPE_ERROR self._msg_type_text = MSG_TYPE_TEXT self._session_cls = Session self._telemetry_defaults = dict(TELEMETRY_DEFAULTS) def _connect_session(self): assert self._session_cls is not None config = load_omnisocket_config() transport_cfg = config.get("transport", {}) telemetry_cfg = config.get("telemetry_receiver", {}) session = self._session_cls() session.connect( server_addr=str(transport_cfg.get("server_addr", "127.0.0.1:10909")), peer_id=str(telemetry_cfg.get("peer_id", "peer-a-telemetry")), relay_via=str(transport_cfg.get("relay_via", "")), bind_ip=str(transport_cfg.get("bind_ip", "")), bind_device=str(transport_cfg.get("bind_device", "")), **self._telemetry_defaults, ) return session def ensure_started(self) -> None: if self._session_cls is None: return with self._lock: if self._started or self._closing.is_set(): return self._started = True self._thread = threading.Thread( target=self._run, name="hub-telemetry-receiver", daemon=True, ) self._thread.start() def _run(self) -> None: while not self._closing.is_set(): try: session = self._connect_session() with self._lock: self._session = session self._last_error = "" if self._ever_connected: self._reconnect_count += 1 else: self._ever_connected = True while not self._closing.is_set(): result = session.recv(timeout_ms=1000) if result is None: continue from_peer, msg_type, payload = result if msg_type == self._msg_type_error: with self._lock: self._last_error = f"hub error from {from_peer}: {payload.decode('utf-8', errors='replace')}" continue if msg_type != self._msg_type_text: continue snapshot = json.loads(payload.decode("utf-8")) if snapshot.get("type") != "hub_kcp_snapshot": continue now_wall = time.time() now_mono = time.monotonic() with self._lock: self._latest_snapshot = snapshot self._last_received_wall = now_wall self._last_received_monotonic = now_mono self._last_error = "" except Exception as error: # pragma: no cover - runtime integration path if not self._closing.is_set(): session_error = "" if self._session is not None: try: session_error = str(dict(self._session.stats()).get("last_server_error", "") or "") except Exception: session_error = "" with self._lock: self._last_error = session_error or str(error) finally: with self._lock: session = self._session self._session = None if self._closing.is_set(): self._started = False if session is not None: try: session.close() except Exception: pass if not self._closing.is_set(): time.sleep(2) def get_snapshot(self) -> dict[str, Any]: self.ensure_started() cfg = load_omnisocket_config().get("telemetry_receiver", {}) stale_after_ms = max(500, int(cfg.get("stale_after_ms", 1500))) with self._lock: received_monotonic = self._last_received_monotonic received_wall = self._last_received_wall snapshot = self._latest_snapshot connected = self._session is not None last_error = self._last_error reconnect_count = self._reconnect_count if self._session is not None: try: session_stats = dict(self._session.stats()) except Exception: session_stats = {} else: session_stats = {} stale = True if received_monotonic > 0.0: stale = (time.monotonic() - received_monotonic) * 1000.0 > stale_after_ms return { "connected": connected, "updated_at": _utc_from_epoch(received_wall), "received_at_monotonic": received_monotonic, "stale": stale, "peer_id": str(cfg.get("peer_id", "peer-a-telemetry")), "snapshot": snapshot or {"sessions": []}, "last_error": last_error, "registered": bool(session_stats.get("registered", 0)), "last_server_error": str(session_stats.get("last_server_error", "") or ""), "reconnect_count": reconnect_count, } def close(self) -> None: self._closing.set() with self._lock: session = self._session if session is not None: try: session.close() except Exception: pass thread = self._thread if thread is not None and thread.is_alive(): thread.join(timeout=0.5) class NetworkTelemetryService: def __init__( self, video_receiver: OmniSocketVideoReceiver, control_sender: OmniSocketControlSender, control_arbiter: ControlArbiter, native_ingress: NativeUdpControlIngress, hub_receiver: HubTelemetryReceiver, ) -> None: self._video_receiver = video_receiver self._control_sender = control_sender self._control_arbiter = control_arbiter self._native_ingress = native_ingress self._hub_receiver = hub_receiver self._trend_tracker = KcpTrendTracker() self._rate_lock = threading.Lock() self._last_rate_sample: tuple[float, int, int] | None = None self._sample_thread: threading.Thread | None = None self._sample_started = False self._last_remote_snapshot_at = 0.0 self._closing = threading.Event() def _ensure_started(self) -> None: self._video_receiver.ensure_started() self._control_arbiter.ensure_started() self._native_ingress.ensure_started() self._hub_receiver.ensure_started() with self._rate_lock: if self._sample_started or self._closing.is_set(): return self._sample_started = True self._sample_thread = threading.Thread( target=self._sample_loop, name="network-telemetry-sampler", daemon=True, ) self._sample_thread.start() def _sample_loop(self) -> None: interval_seconds = LOCAL_SAMPLE_INTERVAL_MS / 1000.0 while not self._closing.is_set(): try: self._trend_tracker.add_sample("a_to_d.video", self._video_receiver.session_kcp_stats()) self._trend_tracker.add_sample("a_to_d.control", self._control_sender.session_kcp_stats()) except Exception: pass time.sleep(interval_seconds) def _compute_rates(self, send_bytes: int, recv_bytes: int) -> tuple[float, float]: now = time.monotonic() with self._rate_lock: previous = self._last_rate_sample self._last_rate_sample = (now, send_bytes, recv_bytes) if previous is None: return 0.0, 0.0 prev_time, prev_send, prev_recv = previous elapsed = now - prev_time if elapsed <= 0.0: return 0.0, 0.0 tx_kbps = max(0.0, ((send_bytes - prev_send) * 8.0) / elapsed / 1000.0) rx_kbps = max(0.0, ((recv_bytes - prev_recv) * 8.0) / elapsed / 1000.0) return tx_kbps, rx_kbps def _ingest_remote_snapshot(self, telemetry_state: dict[str, Any]) -> None: received_at = float(telemetry_state.get("received_at_monotonic") or 0.0) if received_at <= 0.0 or received_at <= self._last_remote_snapshot_at: return snapshot = telemetry_state.get("snapshot") or {} sessions = snapshot.get("sessions") or [] for session in sessions: peer_id = str(session.get("peer_id", "")).strip() if not peer_id: continue self._trend_tracker.add_sample(f"hub::{peer_id}", session) self._last_remote_snapshot_at = received_at def _build_session_payload( self, trend_key: str, peer_id: str, app_stats: dict[str, Any] | None, current_kcp: dict[str, Any] | None, updated_at: str | None, stale: bool, ) -> dict[str, Any]: described = self._trend_tracker.describe(trend_key, current_kcp) connected = bool(described["kcp"].get("connected")) if app_stats is not None and "registered" in app_stats: connected = bool(app_stats.get("registered")) return { "peer_id": peer_id, "connected": connected, "updated_at": updated_at, "stale": stale, "app": app_stats, "kcp": described["kcp"], "trend": described["trend"], } def _build_link(self, source: str, updated_at: str | None, stale: bool, sessions: dict[str, dict[str, Any]]) -> dict[str, Any]: session_items = list(sessions.values()) active_sessions = [session for session in session_items if session.get("connected") and not session.get("stale")] retrans_sum = sum(_coerce_int(session.get("trend", {}).get("retrans_delta")) for session in active_sessions) out_segs_sum = sum(_coerce_int(session.get("trend", {}).get("out_segs_delta")) for session in active_sessions) repair_rate_pct = round((retrans_sum / out_segs_sum) * 100.0, 3) if out_segs_sum > 0 else 0.0 return { "source": source, "updated_at": updated_at, "stale": stale, "aggregate": { "online_sessions": len(active_sessions), "max_window_pressure_pct": max( (_coerce_float(session.get("kcp", {}).get("window_pressure_pct")) for session in active_sessions), default=0.0, ), "sum_snd_queue": sum(_coerce_int(session.get("kcp", {}).get("snd_queue")) for session in active_sessions), "sum_snd_buffer": sum(_coerce_int(session.get("kcp", {}).get("snd_buffer")) for session in active_sessions), "sum_retrans_delta": retrans_sum, "sum_out_segs_delta": out_segs_sum, "repair_rate_pct": repair_rate_pct, }, "sessions": sessions, } def _pick_primary_session(self, links: dict[str, dict[str, Any]]) -> dict[str, Any] | None: candidates = ( links["a_to_d"]["sessions"]["control"], links["a_to_d"]["sessions"]["video"], links["d_to_b"]["sessions"]["control"], links["d_to_b"]["sessions"]["video"], ) for session in candidates: if session.get("connected") and not session.get("stale"): return session return None def get_latest(self) -> dict[str, Any]: self._ensure_started() config = load_omnisocket_config() video_receiver_cfg = config.get("video_receiver", {}) control_sender_cfg = config.get("control_sender", {}) video_sender_cfg = config.get("video_sender", {}) video_app = self._video_receiver.session_stats() control_app = self._control_sender.session_stats() video_kcp = self._video_receiver.session_kcp_stats() control_kcp = self._control_sender.session_kcp_stats() arbiter_status = self._control_arbiter.get_status() ingress_status = self._native_ingress.get_status() sender_status = self._control_sender.get_status() telemetry_state = self._hub_receiver.get_snapshot() total_send_bytes = int(video_app.get("send_bytes", 0)) + int(control_app.get("send_bytes", 0)) total_recv_bytes = int(video_app.get("recv_bytes", 0)) + int(control_app.get("recv_bytes", 0)) tx_kbps, rx_kbps = self._compute_rates(total_send_bytes, total_recv_bytes) local_updated_at = utc_iso_now() local_sessions = { "video": self._build_session_payload( "a_to_d.video", str(video_receiver_cfg.get("peer_id", "peer-a-video")), video_app, video_kcp, local_updated_at, False, ), "control": self._build_session_payload( "a_to_d.control", str(control_sender_cfg.get("peer_id", "peer-a-ctrl")), control_app, control_kcp, local_updated_at, False, ), } remote_snapshot = telemetry_state.get("snapshot") or {} remote_sessions_by_peer = { str(session.get("peer_id", "")).strip(): session for session in remote_snapshot.get("sessions", []) or [] if str(session.get("peer_id", "")).strip() } remote_updated_at = telemetry_state.get("updated_at") remote_stale = bool(telemetry_state.get("stale", True)) remote_sessions = { "video": self._build_session_payload( f"hub::{str(video_sender_cfg.get('peer_id', 'peer-b-video'))}", str(video_sender_cfg.get("peer_id", "peer-b-video")), None, remote_sessions_by_peer.get(str(video_sender_cfg.get("peer_id", "peer-b-video")), {}), remote_updated_at, remote_stale, ), "control": self._build_session_payload( f"hub::{str(control_sender_cfg.get('target_peer', 'peer-b-ctrl'))}", str(control_sender_cfg.get("target_peer", "peer-b-ctrl")), None, remote_sessions_by_peer.get(str(control_sender_cfg.get("target_peer", "peer-b-ctrl")), {}), remote_updated_at, remote_stale, ), } links = { "a_to_d": self._build_link("local-a-side", local_updated_at, False, local_sessions), "d_to_b": self._build_link("hub-telemetry", remote_updated_at, remote_stale, remote_sessions), } primary_session = self._pick_primary_session(links) primary_kcp = dict(primary_session.get("kcp", {})) if primary_session is not None else {} self._ingest_remote_snapshot(telemetry_state) fresh_connected_sessions = ( links["a_to_d"]["aggregate"]["online_sessions"] + links["d_to_b"]["aggregate"]["online_sessions"] ) latency_ms = primary_kcp.get("srtt_ms") if primary_session is not None else None jitter_ms = primary_kcp.get("srttvar_ms") if primary_session is not None else None local_control_registered = bool(control_app.get("registered", 0)) remote_control_fresh = bool(remote_sessions["control"].get("connected")) and not bool(remote_sessions["control"].get("stale")) if local_control_registered and remote_control_fresh: peer_status = "online" elif local_control_registered or bool(local_sessions["video"].get("connected")): peer_status = "degraded" elif sender_status.get("backend_ready"): peer_status = "idle" else: peer_status = "backend-unavailable" return { "peer_status": peer_status, "latency_ms": latency_ms, "jitter_ms": jitter_ms, "packet_loss_pct": None, "tx_kbps": round(tx_kbps, 3), "rx_kbps": round(rx_kbps, 3), "transport": "OmniSocket / kcp", "source_mode": "omnisocket-live" if fresh_connected_sessions > 0 else "omnisocket-idle", "updated_at": utc_iso_now(), "active_control_source": arbiter_status["active_source"], "control_lease_remaining_ms": arbiter_status["control_lease_remaining_ms"], "combined": { "connected_sessions": fresh_connected_sessions, "send_bytes": total_send_bytes, "recv_bytes": total_recv_bytes, "tx_kbps": round(tx_kbps, 3), "rx_kbps": round(rx_kbps, 3), }, "sessions": { "video": { "app": video_app, "kcp": local_sessions["video"]["kcp"], }, "control": { "app": control_app, "kcp": local_sessions["control"]["kcp"], }, }, "links": links, "telemetry_receiver": { "hub_connected": bool(telemetry_state.get("connected")), "hub_updated_at": telemetry_state.get("updated_at"), "hub_stale": remote_stale, "last_error": telemetry_state.get("last_error", ""), "peer_id": telemetry_state.get("peer_id", ""), "registered": bool(telemetry_state.get("registered", False)), "last_server_error": str(telemetry_state.get("last_server_error", "") or ""), "reconnect_count": int(telemetry_state.get("reconnect_count", 0)), }, "ingress": { "native_udp": ingress_status, }, "control": { "arbiter": arbiter_status, "sender": sender_status, }, } def close(self) -> None: self._closing.set() thread = self._sample_thread if thread is not None and thread.is_alive(): thread.join(timeout=0.5)