diff --git a/backend/monitoring/common.py b/backend/monitoring/common.py index 06d09b0..63d88cc 100644 --- a/backend/monitoring/common.py +++ b/backend/monitoring/common.py @@ -2,7 +2,7 @@ from __future__ import annotations import os import struct -from datetime import UTC, datetime +from datetime import datetime, timezone from pathlib import Path from typing import Any @@ -32,7 +32,7 @@ ZERO_CONTROL_PAYLOAD = CONTROL_PACKET.pack(0.0, 0.0, 0.0, 0.0, 0.0, 0.0) def utc_iso_now() -> str: - return datetime.now(UTC).isoformat(timespec="seconds").replace("+00:00", "Z") + return datetime.now(timezone.utc).isoformat(timespec="seconds").replace("+00:00", "Z") def parse_simple_yaml_scalar(value: str) -> Any: @@ -97,6 +97,8 @@ def load_omnisocket_config() -> dict[str, Any]: video_receiver_cfg = dict(config.get("video_receiver", {})) control_sender_cfg = dict(config.get("control_sender", {})) control_ingress_cfg = dict(config.get("control_ingress", {})) + video_sender_cfg = dict(config.get("video_sender", {})) + telemetry_receiver_cfg = dict(config.get("telemetry_receiver", {})) transport_cfg["server_addr"] = os.getenv( "OMNISOCKET_SERVER_ADDR", @@ -135,6 +137,15 @@ def load_omnisocket_config() -> dict[str, Any]: str(control_sender_cfg.get("target_peer", "peer-b-ctrl")), ) + video_sender_cfg["peer_id"] = os.getenv( + "OMNISOCKET_VIDEO_SENDER_PEER_ID", + str(video_sender_cfg.get("peer_id", "peer-b-video")), + ) + video_sender_cfg["target_peer"] = os.getenv( + "OMNISOCKET_VIDEO_TARGET_PEER_ID", + str(video_sender_cfg.get("target_peer", "peer-a-video")), + ) + control_ingress_cfg["native_udp_bind"] = os.getenv( "OMNISOCKET_CONTROL_NATIVE_UDP_BIND", str(control_ingress_cfg.get("native_udp_bind", "127.0.0.1:10921")), @@ -158,11 +169,30 @@ def load_omnisocket_config() -> dict[str, Any]: ) ) + telemetry_receiver_cfg["peer_id"] = os.getenv( + "OMNISOCKET_TELEMETRY_PEER_ID", + str(telemetry_receiver_cfg.get("peer_id", "peer-a-telemetry")), + ) + telemetry_receiver_cfg["interval_ms"] = int( + os.getenv( + "OMNISOCKET_TELEMETRY_INTERVAL_MS", + str(telemetry_receiver_cfg.get("interval_ms", 500)), + ) + ) + telemetry_receiver_cfg["stale_after_ms"] = int( + os.getenv( + "OMNISOCKET_TELEMETRY_STALE_AFTER_MS", + str(telemetry_receiver_cfg.get("stale_after_ms", telemetry_receiver_cfg["interval_ms"] * 3)), + ) + ) + return { "transport": transport_cfg, "video_receiver": video_receiver_cfg, "control_sender": control_sender_cfg, "control_ingress": control_ingress_cfg, + "video_sender": video_sender_cfg, + "telemetry_receiver": telemetry_receiver_cfg, } diff --git a/backend/monitoring/services.py b/backend/monitoring/services.py index 02ea861..3940c8e 100644 --- a/backend/monitoring/services.py +++ b/backend/monitoring/services.py @@ -1,12 +1,13 @@ from __future__ import annotations from .control import ControlArbiter, NativeUdpControlIngress, OmniSocketControlSender -from .telemetry import GpsDataService, NetworkTelemetryService +from .telemetry import GpsDataService, HubTelemetryReceiver, NetworkTelemetryService from .video import OmniSocketVideoReceiver, VideoFrameService _video_receiver = OmniSocketVideoReceiver() _control_sender = OmniSocketControlSender() +_hub_telemetry_receiver = HubTelemetryReceiver() control_arbiter = ControlArbiter(_control_sender) native_control_ingress = NativeUdpControlIngress(control_arbiter) @@ -18,5 +19,6 @@ network_service = NetworkTelemetryService( _control_sender, control_arbiter, native_control_ingress, + _hub_telemetry_receiver, ) diff --git a/backend/monitoring/telemetry.py b/backend/monitoring/telemetry.py index f3c594f..48bd784 100644 --- a/backend/monitoring/telemetry.py +++ b/backend/monitoring/telemetry.py @@ -1,17 +1,54 @@ from __future__ import annotations +from collections import deque import json import math +import sys import threading import time -from datetime import UTC, datetime +from datetime import datetime, timezone from typing import Any -from .common import GEOSTREAM_JSON_PATH, GEOSTREAM_STALE_SECONDS, utc_iso_now +from .common import ( + GEOSTREAM_JSON_PATH, + GEOSTREAM_STALE_SECONDS, + WORKSPACE_ROOT, + load_omnisocket_config, + utc_iso_now, +) from .control import ControlArbiter, NativeUdpControlIngress, OmniSocketControlSender from .video import OmniSocketVideoReceiver +LOCAL_SAMPLE_INTERVAL_MS = 500 +TREND_HISTORY_SIZE = 10 +TREND_WINDOW_SIZE = 5 + + +def _utc_from_epoch(epoch_seconds: float | None) -> str | None: + if epoch_seconds is None or epoch_seconds <= 0.0: + return None + return datetime.fromtimestamp(epoch_seconds, timezone.utc).isoformat(timespec="seconds").replace("+00:00", "Z") + + +def _coerce_int(value: Any, default: int = 0) -> int: + try: + if value is None: + return default + return int(value) + except (TypeError, ValueError): + return default + + +def _coerce_float(value: Any, default: float = 0.0) -> float: + try: + if value is None: + return default + return float(value) + except (TypeError, ValueError): + return default + + class GpsDataService: def get_latest(self) -> dict[str, Any]: payload = self._read_geostream_payload() @@ -43,7 +80,7 @@ class GpsDataService: return { "has_fix": True, - "utc_time": datetime.now(UTC).strftime("%H:%M:%S"), + "utc_time": datetime.now(timezone.utc).strftime("%H:%M:%S"), "latitude": round(latitude, 6), "longitude": round(longitude, 6), "satellites": 14 + int((math.sin(tick * 0.7) + 1.0) * 2), @@ -56,6 +93,255 @@ class GpsDataService: } +class KcpTrendTracker: + def __init__(self) -> None: + self._lock = threading.Lock() + self._samples: dict[str, deque[dict[str, Any]]] = {} + + def _normalize(self, stats: dict[str, Any] | None) -> dict[str, Any]: + raw = dict(stats or {}) + snd_wnd = _coerce_int(raw.get("snd_wnd")) + rmt_wnd = _coerce_int(raw.get("rmt_wnd")) + inflight = _coerce_int(raw.get("inflight")) + window_limit = _coerce_int(raw.get("window_limit"), min(snd_wnd, rmt_wnd) if snd_wnd and rmt_wnd else 0) + return { + "connected": _coerce_int(raw.get("connected")), + "conv": _coerce_int(raw.get("conv")), + "rto_ms": _coerce_int(raw.get("rto_ms")), + "srtt_ms": _coerce_int(raw.get("srtt_ms")), + "srttvar_ms": _coerce_int(raw.get("srttvar_ms")), + "snd_wnd": snd_wnd, + "rmt_wnd": rmt_wnd, + "inflight": inflight, + "window_limit": window_limit, + "window_pressure_pct": round(_coerce_float(raw.get("window_pressure_pct")), 3), + "snd_queue": _coerce_int(raw.get("snd_queue")), + "rcv_queue": _coerce_int(raw.get("rcv_queue")), + "snd_buffer": _coerce_int(raw.get("snd_buffer")), + "out_segs_total": _coerce_int(raw.get("out_segs_total")), + "retrans_total": _coerce_int(raw.get("retrans_total")), + "fast_retrans_total": _coerce_int(raw.get("fast_retrans_total")), + "lost_total": _coerce_int(raw.get("lost_total")), + "repeat_total": _coerce_int(raw.get("repeat_total")), + "xmit_total": _coerce_int(raw.get("xmit_total")), + } + + def add_sample(self, key: str, stats: dict[str, Any] | None) -> None: + sample = { + "ts_monotonic": time.monotonic(), + "updated_at": utc_iso_now(), + "stats": self._normalize(stats), + } + with self._lock: + history = self._samples.setdefault(key, deque(maxlen=TREND_HISTORY_SIZE)) + history.append(sample) + + def latest_updated_at(self, key: str) -> str | None: + with self._lock: + history = self._samples.get(key) + if not history: + return None + return str(history[-1].get("updated_at") or "") + + def describe(self, key: str, current_stats: dict[str, Any] | None) -> dict[str, Any]: + current = self._normalize(current_stats) + with self._lock: + history = list(self._samples.get(key, ())) + + timeline = history + [{"stats": current, "updated_at": utc_iso_now()}] + previous = timeline[-2]["stats"] if len(timeline) >= 2 else None + trend_window = [entry["stats"] for entry in timeline[-TREND_WINDOW_SIZE:]] + deadband = max(2.0, 0.05 * float(max(current.get("window_limit", 0), 1))) + + snd_queue_delta = 0 + snd_buffer_delta = 0 + retrans_delta = 0 + fast_retrans_delta = 0 + lost_delta = 0 + repeat_delta = 0 + out_segs_delta = 0 + if previous is not None: + snd_queue_delta = max(0, current["snd_queue"] - _coerce_int(previous.get("snd_queue"))) + snd_buffer_delta = max(0, current["snd_buffer"] - _coerce_int(previous.get("snd_buffer"))) + retrans_delta = max(0, current["retrans_total"] - _coerce_int(previous.get("retrans_total"))) + fast_retrans_delta = max(0, current["fast_retrans_total"] - _coerce_int(previous.get("fast_retrans_total"))) + lost_delta = max(0, current["lost_total"] - _coerce_int(previous.get("lost_total"))) + repeat_delta = max(0, current["repeat_total"] - _coerce_int(previous.get("repeat_total"))) + out_segs_delta = max(0, current["out_segs_total"] - _coerce_int(previous.get("out_segs_total"))) + + def classify(field: str) -> str: + if len(trend_window) < 2: + return "stable" + oldest = float(_coerce_int(trend_window[0].get(field))) + newest = float(_coerce_int(trend_window[-1].get(field))) + delta = newest - oldest + if abs(delta) < deadband: + return "stable" + return "rising" if delta > 0 else "falling" + + repair_rate_pct = 0.0 + if out_segs_delta > 0: + repair_rate_pct = round((retrans_delta / out_segs_delta) * 100.0, 3) + + return { + "kcp": current, + "trend": { + "snd_queue_delta": snd_queue_delta, + "snd_buffer_delta": snd_buffer_delta, + "snd_queue_trend": classify("snd_queue"), + "snd_buffer_trend": classify("snd_buffer"), + "retrans_delta": retrans_delta, + "fast_retrans_delta": fast_retrans_delta, + "lost_delta": lost_delta, + "repeat_delta": repeat_delta, + "out_segs_delta": out_segs_delta, + "repair_rate_pct": repair_rate_pct, + }, + } + + +class HubTelemetryReceiver: + def __init__(self) -> None: + self._lock = threading.Lock() + self._thread: threading.Thread | None = None + self._started = False + self._session = None + self._session_cls = None + self._msg_type_text = None + self._msg_type_error = None + self._telemetry_defaults: dict[str, Any] = {} + self._latest_snapshot: dict[str, Any] | None = None + self._last_error = "" + self._last_received_wall = 0.0 + self._last_received_monotonic = 0.0 + self._load_backend() + + def _load_backend(self) -> None: + try: + self._import_backend() + except Exception as error: # pragma: no cover - optional runtime dependency + self._last_error = f"omnisocket import failed: {error}" + + def _import_backend(self) -> None: + try: + from omnisocket import MSG_TYPE_ERROR, MSG_TYPE_TEXT, Session, TELEMETRY_DEFAULTS # type: ignore + except ImportError: + python_dir = WORKSPACE_ROOT / "OmniSocketGo" / "python" + if python_dir.exists(): + sys.path.insert(0, str(python_dir)) + from omnisocket import MSG_TYPE_ERROR, MSG_TYPE_TEXT, Session, TELEMETRY_DEFAULTS # type: ignore + + self._msg_type_error = MSG_TYPE_ERROR + self._msg_type_text = MSG_TYPE_TEXT + self._session_cls = Session + self._telemetry_defaults = dict(TELEMETRY_DEFAULTS) + + def _connect_session(self): + assert self._session_cls is not None + + config = load_omnisocket_config() + transport_cfg = config.get("transport", {}) + telemetry_cfg = config.get("telemetry_receiver", {}) + + session = self._session_cls() + session.connect( + server_addr=str(transport_cfg.get("server_addr", "127.0.0.1:10909")), + peer_id=str(telemetry_cfg.get("peer_id", "peer-a-telemetry")), + relay_via=str(transport_cfg.get("relay_via", "")), + bind_ip=str(transport_cfg.get("bind_ip", "")), + bind_device=str(transport_cfg.get("bind_device", "")), + **self._telemetry_defaults, + ) + return session + + def ensure_started(self) -> None: + if self._session_cls is None: + return + + with self._lock: + if self._started: + return + self._started = True + self._thread = threading.Thread( + target=self._run, + name="hub-telemetry-receiver", + daemon=True, + ) + self._thread.start() + + def _run(self) -> None: + while True: + try: + session = self._connect_session() + with self._lock: + self._session = session + self._last_error = "" + + while True: + result = session.recv(timeout_ms=1000) + if result is None: + continue + + from_peer, msg_type, payload = result + if msg_type == self._msg_type_error: + with self._lock: + self._last_error = f"hub error from {from_peer}: {payload.decode('utf-8', errors='replace')}" + continue + if msg_type != self._msg_type_text: + continue + + snapshot = json.loads(payload.decode("utf-8")) + if snapshot.get("type") != "hub_kcp_snapshot": + continue + + now_wall = time.time() + now_mono = time.monotonic() + with self._lock: + self._latest_snapshot = snapshot + self._last_received_wall = now_wall + self._last_received_monotonic = now_mono + self._last_error = "" + except Exception as error: # pragma: no cover - runtime integration path + with self._lock: + self._last_error = str(error) + finally: + with self._lock: + session = self._session + self._session = None + if session is not None: + try: + session.close() + except Exception: + pass + time.sleep(2) + + def get_snapshot(self) -> dict[str, Any]: + self.ensure_started() + cfg = load_omnisocket_config().get("telemetry_receiver", {}) + stale_after_ms = max(500, int(cfg.get("stale_after_ms", 1500))) + + with self._lock: + received_monotonic = self._last_received_monotonic + received_wall = self._last_received_wall + snapshot = self._latest_snapshot + connected = self._session is not None + last_error = self._last_error + + stale = True + if received_monotonic > 0.0: + stale = (time.monotonic() - received_monotonic) * 1000.0 > stale_after_ms + + return { + "connected": connected, + "updated_at": _utc_from_epoch(received_wall), + "received_at_monotonic": received_monotonic, + "stale": stale, + "peer_id": str(cfg.get("peer_id", "peer-a-telemetry")), + "snapshot": snapshot or {"sessions": []}, + "last_error": last_error, + } + + class NetworkTelemetryService: def __init__( self, @@ -63,13 +349,45 @@ class NetworkTelemetryService: control_sender: OmniSocketControlSender, control_arbiter: ControlArbiter, native_ingress: NativeUdpControlIngress, + hub_receiver: HubTelemetryReceiver, ) -> None: self._video_receiver = video_receiver self._control_sender = control_sender self._control_arbiter = control_arbiter self._native_ingress = native_ingress + self._hub_receiver = hub_receiver + self._trend_tracker = KcpTrendTracker() self._rate_lock = threading.Lock() self._last_rate_sample: tuple[float, int, int] | None = None + self._sample_thread: threading.Thread | None = None + self._sample_started = False + self._last_remote_snapshot_at = 0.0 + + def _ensure_started(self) -> None: + self._video_receiver.ensure_started() + self._control_arbiter.ensure_started() + self._native_ingress.ensure_started() + self._hub_receiver.ensure_started() + with self._rate_lock: + if self._sample_started: + return + self._sample_started = True + self._sample_thread = threading.Thread( + target=self._sample_loop, + name="network-telemetry-sampler", + daemon=True, + ) + self._sample_thread.start() + + def _sample_loop(self) -> None: + interval_seconds = LOCAL_SAMPLE_INTERVAL_MS / 1000.0 + while True: + try: + self._trend_tracker.add_sample("a_to_d.video", self._video_receiver.session_kcp_stats()) + self._trend_tracker.add_sample("a_to_d.control", self._control_sender.session_kcp_stats()) + except Exception: + pass + time.sleep(interval_seconds) def _compute_rates(self, send_bytes: int, recv_bytes: int) -> tuple[float, float]: now = time.monotonic() @@ -89,10 +407,85 @@ class NetworkTelemetryService: rx_kbps = max(0.0, ((recv_bytes - prev_recv) * 8.0) / elapsed / 1000.0) return tx_kbps, rx_kbps + def _ingest_remote_snapshot(self, telemetry_state: dict[str, Any]) -> None: + received_at = float(telemetry_state.get("received_at_monotonic") or 0.0) + if received_at <= 0.0 or received_at <= self._last_remote_snapshot_at: + return + + snapshot = telemetry_state.get("snapshot") or {} + sessions = snapshot.get("sessions") or [] + for session in sessions: + peer_id = str(session.get("peer_id", "")).strip() + if not peer_id: + continue + self._trend_tracker.add_sample(f"hub::{peer_id}", session) + self._last_remote_snapshot_at = received_at + + def _build_session_payload( + self, + trend_key: str, + peer_id: str, + app_stats: dict[str, Any] | None, + current_kcp: dict[str, Any] | None, + updated_at: str | None, + stale: bool, + ) -> dict[str, Any]: + described = self._trend_tracker.describe(trend_key, current_kcp) + return { + "peer_id": peer_id, + "connected": bool(described["kcp"].get("connected")), + "updated_at": updated_at, + "stale": stale, + "app": app_stats, + "kcp": described["kcp"], + "trend": described["trend"], + } + + def _build_link(self, source: str, updated_at: str | None, stale: bool, sessions: dict[str, dict[str, Any]]) -> dict[str, Any]: + session_items = list(sessions.values()) + active_sessions = [session for session in session_items if session.get("connected") and not session.get("stale")] + retrans_sum = sum(_coerce_int(session.get("trend", {}).get("retrans_delta")) for session in active_sessions) + out_segs_sum = sum(_coerce_int(session.get("trend", {}).get("out_segs_delta")) for session in active_sessions) + repair_rate_pct = round((retrans_sum / out_segs_sum) * 100.0, 3) if out_segs_sum > 0 else 0.0 + + return { + "source": source, + "updated_at": updated_at, + "stale": stale, + "aggregate": { + "online_sessions": len(active_sessions), + "max_window_pressure_pct": max( + (_coerce_float(session.get("kcp", {}).get("window_pressure_pct")) for session in active_sessions), + default=0.0, + ), + "sum_snd_queue": sum(_coerce_int(session.get("kcp", {}).get("snd_queue")) for session in active_sessions), + "sum_snd_buffer": sum(_coerce_int(session.get("kcp", {}).get("snd_buffer")) for session in active_sessions), + "sum_retrans_delta": retrans_sum, + "sum_out_segs_delta": out_segs_sum, + "repair_rate_pct": repair_rate_pct, + }, + "sessions": sessions, + } + + def _pick_primary_session(self, links: dict[str, dict[str, Any]]) -> dict[str, Any] | None: + candidates = ( + links["a_to_d"]["sessions"]["control"], + links["a_to_d"]["sessions"]["video"], + links["d_to_b"]["sessions"]["control"], + links["d_to_b"]["sessions"]["video"], + ) + for session in candidates: + if session.get("connected") and not session.get("stale"): + return session + return None + def get_latest(self) -> dict[str, Any]: - self._video_receiver.ensure_started() - self._control_arbiter.ensure_started() - self._native_ingress.ensure_started() + self._ensure_started() + + config = load_omnisocket_config() + video_receiver_cfg = config.get("video_receiver", {}) + control_sender_cfg = config.get("control_sender", {}) + video_sender_cfg = config.get("video_sender", {}) video_app = self._video_receiver.session_stats() control_app = self._control_sender.session_stats() @@ -101,20 +494,75 @@ class NetworkTelemetryService: arbiter_status = self._control_arbiter.get_status() ingress_status = self._native_ingress.get_status() sender_status = self._control_sender.get_status() + telemetry_state = self._hub_receiver.get_snapshot() total_send_bytes = int(video_app.get("send_bytes", 0)) + int(control_app.get("send_bytes", 0)) total_recv_bytes = int(video_app.get("recv_bytes", 0)) + int(control_app.get("recv_bytes", 0)) tx_kbps, rx_kbps = self._compute_rates(total_send_bytes, total_recv_bytes) - video_connected = int(video_app.get("connected", 0)) - control_connected = int(control_app.get("connected", 0)) - connected_sessions = video_connected + control_connected + local_updated_at = utc_iso_now() + local_sessions = { + "video": self._build_session_payload( + "a_to_d.video", + str(video_receiver_cfg.get("peer_id", "peer-a-video")), + video_app, + video_kcp, + local_updated_at, + False, + ), + "control": self._build_session_payload( + "a_to_d.control", + str(control_sender_cfg.get("peer_id", "peer-a-ctrl")), + control_app, + control_kcp, + local_updated_at, + False, + ), + } - primary_kcp = control_kcp if control_connected else video_kcp - latency_ms = primary_kcp.get("srtt_ms") - jitter_ms = primary_kcp.get("srttvar_ms") + remote_snapshot = telemetry_state.get("snapshot") or {} + remote_sessions_by_peer = { + str(session.get("peer_id", "")).strip(): session + for session in remote_snapshot.get("sessions", []) or [] + if str(session.get("peer_id", "")).strip() + } + remote_updated_at = telemetry_state.get("updated_at") + remote_stale = bool(telemetry_state.get("stale", True)) + remote_sessions = { + "video": self._build_session_payload( + f"hub::{str(video_sender_cfg.get('peer_id', 'peer-b-video'))}", + str(video_sender_cfg.get("peer_id", "peer-b-video")), + None, + remote_sessions_by_peer.get(str(video_sender_cfg.get("peer_id", "peer-b-video")), {}), + remote_updated_at, + remote_stale, + ), + "control": self._build_session_payload( + f"hub::{str(control_sender_cfg.get('target_peer', 'peer-b-ctrl'))}", + str(control_sender_cfg.get("target_peer", "peer-b-ctrl")), + None, + remote_sessions_by_peer.get(str(control_sender_cfg.get("target_peer", "peer-b-ctrl")), {}), + remote_updated_at, + remote_stale, + ), + } - if connected_sessions > 0: + links = { + "a_to_d": self._build_link("local-a-side", local_updated_at, False, local_sessions), + "d_to_b": self._build_link("hub-telemetry", remote_updated_at, remote_stale, remote_sessions), + } + + primary_session = self._pick_primary_session(links) + primary_kcp = dict(primary_session.get("kcp", {})) if primary_session is not None else {} + self._ingest_remote_snapshot(telemetry_state) + + fresh_connected_sessions = ( + links["a_to_d"]["aggregate"]["online_sessions"] + links["d_to_b"]["aggregate"]["online_sessions"] + ) + latency_ms = primary_kcp.get("srtt_ms") if primary_session is not None else None + jitter_ms = primary_kcp.get("srttvar_ms") if primary_session is not None else None + + if fresh_connected_sessions > 0: peer_status = "online" elif sender_status.get("backend_ready"): peer_status = "idle" @@ -129,12 +577,12 @@ class NetworkTelemetryService: "tx_kbps": round(tx_kbps, 3), "rx_kbps": round(rx_kbps, 3), "transport": "OmniSocket / kcp", - "source_mode": "omnisocket-live" if connected_sessions > 0 else "omnisocket-idle", + "source_mode": "omnisocket-live" if fresh_connected_sessions > 0 else "omnisocket-idle", "updated_at": utc_iso_now(), "active_control_source": arbiter_status["active_source"], "control_lease_remaining_ms": arbiter_status["control_lease_remaining_ms"], "combined": { - "connected_sessions": connected_sessions, + "connected_sessions": fresh_connected_sessions, "send_bytes": total_send_bytes, "recv_bytes": total_recv_bytes, "tx_kbps": round(tx_kbps, 3), @@ -143,13 +591,21 @@ class NetworkTelemetryService: "sessions": { "video": { "app": video_app, - "kcp": video_kcp, + "kcp": local_sessions["video"]["kcp"], }, "control": { "app": control_app, - "kcp": control_kcp, + "kcp": local_sessions["control"]["kcp"], }, }, + "links": links, + "telemetry_receiver": { + "hub_connected": bool(telemetry_state.get("connected")), + "hub_updated_at": telemetry_state.get("updated_at"), + "hub_stale": remote_stale, + "last_error": telemetry_state.get("last_error", ""), + "peer_id": telemetry_state.get("peer_id", ""), + }, "ingress": { "native_udp": ingress_status, }, @@ -158,4 +614,3 @@ class NetworkTelemetryService: "sender": sender_status, }, } - diff --git a/config/omnisocket_demo.yaml b/config/omnisocket_demo.yaml index 7c7d152..793a1eb 100644 --- a/config/omnisocket_demo.yaml +++ b/config/omnisocket_demo.yaml @@ -18,6 +18,11 @@ control_ingress: send_rate_hz: 20.0 zero_burst_packets: 3 +telemetry_receiver: + peer_id: "peer-a-telemetry" + interval_ms: 500 + stale_after_ms: 1500 + video_sender: peer_id: "peer-b-video" target_peer: "peer-a-video" diff --git a/frontend/src/components/NetworkPanel.vue b/frontend/src/components/NetworkPanel.vue index 69a1f39..d4b584a 100644 --- a/frontend/src/components/NetworkPanel.vue +++ b/frontend/src/components/NetworkPanel.vue @@ -1,20 +1,47 @@