From 2ca70d556b2d72d81c4214db3a8fedf5da3febd7 Mon Sep 17 00:00:00 2001 From: Mock Date: Sat, 18 Apr 2026 12:52:32 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=A2=9E=E5=8A=A0=E6=97=A5=E5=BF=97?= =?UTF-8?q?=E6=A8=A1=E5=9D=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/monitoring/common.py | 141 +++++- backend/monitoring/control.py | 292 +++++++++++- backend/monitoring/services.py | 17 +- backend/monitoring/telemetry.py | 99 +++- backend/monitoring/urls.py | 1 + backend/monitoring/video.py | 264 ++++++++++- backend/monitoring/views.py | 33 +- config/omnisocket_demo.yaml | 4 + frontend/src/components/NetworkPanel.vue | 27 +- frontend/src/components/VideoPanel.vue | 446 +++++++++++++----- .../src/composables/useControlInterface.ts | 23 +- frontend/src/lib/api.ts | 13 + frontend/src/types.ts | 83 +++- frontend/src/views/DashboardView.vue | 2 +- frontend/src/views/VideoView.vue | 4 +- 15 files changed, 1263 insertions(+), 186 deletions(-) diff --git a/backend/monitoring/common.py b/backend/monitoring/common.py index a64ceaa..dd3c4b2 100644 --- a/backend/monitoring/common.py +++ b/backend/monitoring/common.py @@ -1,9 +1,12 @@ from __future__ import annotations import os +import json import struct from datetime import datetime, timezone from pathlib import Path +import threading +import time from typing import Any @@ -19,8 +22,10 @@ VIDEO_TRAILER_ENDIANNESS = "little" VIDEO_TRAILER_TIMESTAMP_UNIT = "ms" VIDEO_TRAILER_TIMESTAMP_MULTIPLIER_NS = 1_000_000 VIDEO_TRAILER_TIMESTAMP_MAX_SKEW_NS = 7 * 24 * 60 * 60 * 1_000_000_000 -VIDEO_TRAILER_COORDINATE_FORMAT = "uint64 timestamp_ms + float64 latitude + float64 longitude (little-endian)" -VIDEO_TRAILER_STRUCT = struct.Struct(" str: @@ -96,6 +104,7 @@ def load_omnisocket_config() -> dict[str, Any]: transport_cfg = dict(config.get("transport", {})) video_receiver_cfg = dict(config.get("video_receiver", {})) control_sender_cfg = dict(config.get("control_sender", {})) + control_ack_receiver_cfg = dict(config.get("control_ack_receiver", {})) control_ingress_cfg = dict(config.get("control_ingress", {})) video_sender_cfg = dict(config.get("video_sender", {})) telemetry_receiver_cfg = dict(config.get("telemetry_receiver", {})) @@ -137,6 +146,15 @@ def load_omnisocket_config() -> dict[str, Any]: str(control_sender_cfg.get("target_peer", "peer-b-ctrl")), ) + control_ack_receiver_cfg["peer_id"] = os.getenv( + "OMNISOCKET_CONTROL_ACK_RECEIVER_PEER_ID", + str(control_ack_receiver_cfg.get("peer_id", "peer-a-ctrl-ack")), + ) + control_ack_receiver_cfg["expected_sender"] = os.getenv( + "OMNISOCKET_CONTROL_ACK_EXPECTED_SENDER", + str(control_ack_receiver_cfg.get("expected_sender", "peer-b-ctrl-ack")), + ) + video_sender_cfg["peer_id"] = os.getenv( "OMNISOCKET_VIDEO_SENDER_PEER_ID", str(video_sender_cfg.get("peer_id", "peer-b-video")), @@ -190,12 +208,131 @@ def load_omnisocket_config() -> dict[str, Any]: "transport": transport_cfg, "video_receiver": video_receiver_cfg, "control_sender": control_sender_cfg, + "control_ack_receiver": control_ack_receiver_cfg, "control_ingress": control_ingress_cfg, "video_sender": video_sender_cfg, "telemetry_receiver": telemetry_receiver_cfg, } +class JsonlRunLogger: + def __init__(self, stem_env: str, default_stem: str) -> None: + explicit_path = os.getenv(stem_env, "").strip() + self._path = Path(explicit_path) if explicit_path else ( + BLITZ_RUN_DIR / f"{default_stem}.{BLITZ_INSTANCE_ID}.jsonl" if BLITZ_RUN_DIR is not None else None + ) + self._lock = threading.Lock() + self._file = None + self._buffered_bytes = 0 + self._current_bytes = 0 + self._flush_bytes = self._positive_int_env("BLITZ_JSONL_FLUSH_BYTES", 262144) + self._flush_interval_ms = self._positive_int_env("BLITZ_JSONL_FLUSH_INTERVAL_MS", 1000) + self._max_bytes = self._positive_int_env("BLITZ_JSONL_ROTATE_BYTES", 134217728) + self._max_files = self._positive_int_env("BLITZ_JSONL_ROTATE_FILES", 8) + self._last_flush_monotonic_ms = self._now_ms() + + if self._path is not None: + try: + self._path.parent.mkdir(parents=True, exist_ok=True) + self._file = self._path.open("a", encoding="utf-8") + self._current_bytes = self._path.stat().st_size if self._path.exists() else 0 + except OSError: + self._file = None + + @property + def path(self) -> str | None: + return str(self._path) if self._path is not None else None + + @property + def enabled(self) -> bool: + return self._file is not None + + def write(self, payload: dict[str, Any]) -> None: + if self._file is None: + return + line = json.dumps(payload, ensure_ascii=False, separators=(",", ":")) + line_bytes = len(line.encode("utf-8")) + 1 + with self._lock: + if self._file is None: + return + try: + self._file.write(line) + self._file.write("\n") + self._buffered_bytes += line_bytes + self._current_bytes += line_bytes + now_ms = self._now_ms() + if ( + self._buffered_bytes >= self._flush_bytes + or (self._flush_interval_ms > 0 and now_ms - self._last_flush_monotonic_ms >= self._flush_interval_ms) + ): + self._flush_locked(now_ms) + if self._max_bytes > 0 and self._max_files > 0 and self._current_bytes >= self._max_bytes: + self._rotate_locked() + except OSError: + if self._file is not None: + try: + self._file.close() + except OSError: + pass + self._file = None + + def close(self) -> None: + with self._lock: + if self._file is not None: + try: + self._flush_locked(self._now_ms()) + except OSError: + pass + self._file.close() + self._file = None + + def _flush_locked(self, now_ms: int) -> None: + if self._file is None: + return + self._file.flush() + self._buffered_bytes = 0 + self._last_flush_monotonic_ms = now_ms + + def _rotate_locked(self) -> None: + if self._path is None or self._file is None or self._max_files <= 0: + return + self._flush_locked(self._now_ms()) + self._file.close() + self._file = None + + oldest = self._path.with_name(f"{self._path.name}.{self._max_files}") + if oldest.exists(): + oldest.unlink() + + for index in range(self._max_files - 1, 0, -1): + src = self._path.with_name(f"{self._path.name}.{index}") + if src.exists(): + dst = self._path.with_name(f"{self._path.name}.{index + 1}") + src.replace(dst) + + if self._path.exists(): + rotated = self._path.with_name(f"{self._path.name}.1") + self._path.replace(rotated) + + self._file = self._path.open("a", encoding="utf-8") + self._buffered_bytes = 0 + self._current_bytes = self._path.stat().st_size if self._path.exists() else 0 + self._last_flush_monotonic_ms = self._now_ms() + + @staticmethod + def _now_ms() -> int: + return int(time.monotonic() * 1000) + + @staticmethod + def _positive_int_env(name: str, default: int) -> int: + raw = os.getenv(name, "").strip() + try: + value = int(raw) + except ValueError: + return default + return value if value > 0 else default + + def parse_host_port(bind_addr: str) -> tuple[str, int]: host, port_text = bind_addr.rsplit(":", 1) host = host.strip() or "127.0.0.1" diff --git a/backend/monitoring/control.py b/backend/monitoring/control.py index 0b8e505..81122f6 100644 --- a/backend/monitoring/control.py +++ b/backend/monitoring/control.py @@ -1,5 +1,6 @@ from __future__ import annotations +import json import socket import sys import threading @@ -10,6 +11,7 @@ from .common import ( CONTROL_PACKET_SIZE, CONTROL_SOURCE_NATIVE_UDP, CONTROL_SOURCE_PRIORITY, + JsonlRunLogger, ZERO_CONTROL_PAYLOAD, WORKSPACE_ROOT, load_omnisocket_config, @@ -18,9 +20,129 @@ from .common import ( from .video import safe_kcp_stats -class OmniSocketControlSender: +class ControlAckTracker: def __init__(self) -> None: self._lock = threading.Lock() + self._event_logger = JsonlRunLogger("BLITZ_A_CONTROL_EVENTS_LOG_PATH", "a-control-events") + self._ack_logger = JsonlRunLogger("BLITZ_A_CONTROL_ACKS_LOG_PATH", "a-control-acks") + self._pending: dict[int, dict[str, Any]] = {} + self._latest_estimate: dict[str, Any] = { + "ack_available": False, + "updated_at": None, + "received_mono_ns": 0, + "control_loop_rtt_ms": None, + "b_recv_to_persist_ms": None, + "control_oneway_network_est_ms": None, + "control_to_persist_est_ms": None, + "sample_reason": None, + } + + def register_send( + self, + *, + message_id: int, + issued_at_unix_ns: int, + issued_at_mono_ns: int, + source: str, + payload: bytes, + send_call_latency_us: int, + ) -> None: + event = { + "ts_unix_nano": issued_at_unix_ns, + "message_id": message_id, + "issued_at_unix_ns": issued_at_unix_ns, + "issued_at_mono_ns": issued_at_mono_ns, + "source": source, + "command_signature": payload.hex(), + "payload_size": len(payload), + "send_call_latency_us": send_call_latency_us, + } + with self._lock: + self._pending[message_id] = event + self._prune_locked(issued_at_mono_ns) + self._event_logger.write(event) + + def handle_ack(self, ack_payload: dict[str, Any], received_unix_ns: int, received_mono_ns: int) -> None: + try: + message_id = int(ack_payload["message_id"]) + except (KeyError, TypeError, ValueError): + return + + with self._lock: + event = self._pending.pop(message_id, None) + self._prune_locked(received_mono_ns) + + if event is None: + return + + try: + control_loop_rtt_ms = round((received_unix_ns - int(event["issued_at_unix_ns"])) / 1_000_000.0, 3) + b_recv_to_persist_ms = round(float(ack_payload.get("b_recv_to_persist_us", 0)) / 1000.0, 3) + except (TypeError, ValueError): + return + + control_oneway_network_est_ms = round(max(0.0, (control_loop_rtt_ms - b_recv_to_persist_ms) / 2.0), 3) + control_to_persist_est_ms = round(control_oneway_network_est_ms + b_recv_to_persist_ms, 3) + ack_record = { + "ts_unix_nano": received_unix_ns, + "received_unix_ns": received_unix_ns, + "received_mono_ns": received_mono_ns, + "message_id": message_id, + "ack_phase": str(ack_payload.get("ack_phase") or "persist_end"), + "sample_reason": str(ack_payload.get("sample_reason") or ""), + "b_recv_to_persist_us": ack_payload.get("b_recv_to_persist_us"), + "unix_send_ok": bool(ack_payload.get("unix_send_ok", False)), + "issued_at_unix_ns": event["issued_at_unix_ns"], + "source": event["source"], + "control_loop_rtt_ms": control_loop_rtt_ms, + "b_recv_to_persist_ms": b_recv_to_persist_ms, + "control_oneway_network_est_ms": control_oneway_network_est_ms, + "control_to_persist_est_ms": control_to_persist_est_ms, + } + self._ack_logger.write(ack_record) + with self._lock: + self._latest_estimate = { + "ack_available": True, + "updated_at": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(received_unix_ns / 1_000_000_000)), + "received_mono_ns": received_mono_ns, + "control_loop_rtt_ms": control_loop_rtt_ms, + "b_recv_to_persist_ms": b_recv_to_persist_ms, + "control_oneway_network_est_ms": control_oneway_network_est_ms, + "control_to_persist_est_ms": control_to_persist_est_ms, + "sample_reason": ack_record["sample_reason"], + } + + def get_latest_estimate(self) -> dict[str, Any]: + with self._lock: + estimate = dict(self._latest_estimate) + if int(estimate.get("received_mono_ns", 0) or 0) > 0 and time.monotonic_ns() - int(estimate["received_mono_ns"]) > 10_000_000_000: + estimate["ack_available"] = False + estimate["control_loop_rtt_ms"] = None + estimate["b_recv_to_persist_ms"] = None + estimate["control_oneway_network_est_ms"] = None + estimate["control_to_persist_est_ms"] = None + estimate["sample_reason"] = None + estimate.pop("received_mono_ns", None) + return estimate + + def close(self) -> None: + self._event_logger.close() + self._ack_logger.close() + + def _prune_locked(self, now_mono_ns: int) -> None: + stale_ids = [ + message_id + for message_id, event in self._pending.items() + if now_mono_ns - int(event.get("issued_at_mono_ns", 0)) > 60_000_000_000 + ] + for message_id in stale_ids: + self._pending.pop(message_id, None) + + +class OmniSocketControlSender: + def __init__(self, ack_tracker: ControlAckTracker) -> None: + self._lock = threading.Lock() + self._ack_tracker = ack_tracker self._session = None self._session_cls = None self._msg_type_error = None @@ -36,6 +158,7 @@ class OmniSocketControlSender: self._reconnect_count = 0 self._ever_connected = False self._registered = False + self._supports_send_with_id = False self._load_backend() def _load_backend(self) -> None: @@ -92,6 +215,7 @@ class OmniSocketControlSender: self._started = True self._last_error = "" self._registered = bool(dict(session.stats()).get("registered", 0)) + self._supports_send_with_id = hasattr(session, "send_with_id") if self._ever_connected: self._reconnect_count += 1 else: @@ -111,25 +235,35 @@ class OmniSocketControlSender: self._session = None self._started = False self._registered = False + self._supports_send_with_id = False if current is not None: try: current.close() except Exception: pass - def send_payload(self, payload: bytes) -> None: + def send_payload(self, payload: bytes, *, source: str) -> None: if len(payload) != CONTROL_PACKET_SIZE: raise ValueError(f"expected {CONTROL_PACKET_SIZE} bytes, got {len(payload)}") self.ensure_started() with self._lock: session = self._session target_peer = self._target_peer + supports_send_with_id = self._supports_send_with_id if session is None: raise RuntimeError("control session is not available") try: - session.send(to=target_peer, data=payload) + issued_at_unix_ns = time.time_ns() + issued_at_mono_ns = time.monotonic_ns() + send_started_ns = time.perf_counter_ns() + message_id: int | None = None + if supports_send_with_id: + message_id = int(session.send_with_id(to=target_peer, data=payload)) + else: + session.send(to=target_peer, data=payload) + send_call_latency_us = max(0, int((time.perf_counter_ns() - send_started_ns) / 1000)) except Exception as error: with self._lock: self._send_errors += 1 @@ -137,13 +271,22 @@ class OmniSocketControlSender: self._reset_session(session) raise + if message_id is not None: + self._ack_tracker.register_send( + message_id=message_id, + issued_at_unix_ns=issued_at_unix_ns, + issued_at_mono_ns=issued_at_mono_ns, + source=source, + payload=payload, + send_call_latency_us=send_call_latency_us, + ) with self._lock: self._send_count += 1 def send_zero_burst(self, count: int) -> None: for _ in range(max(0, count)): try: - self.send_payload(ZERO_CONTROL_PAYLOAD) + self.send_payload(ZERO_CONTROL_PAYLOAD, source="zero_burst") except Exception: return @@ -236,6 +379,145 @@ class OmniSocketControlSender: drain_thread.join(timeout=0.5) +class OmniSocketControlAckReceiver: + def __init__(self, ack_tracker: ControlAckTracker) -> None: + self._ack_tracker = ack_tracker + self._lock = threading.Lock() + self._thread: threading.Thread | None = None + self._started = False + self._session = None + self._session_cls = None + self._msg_type_text = None + self._msg_type_error = None + self._control_defaults: dict[str, Any] = {} + self._closing = threading.Event() + self._last_error = "" + self._reconnect_count = 0 + self._ever_connected = False + self._load_backend() + + def _load_backend(self) -> None: + try: + self._import_backend() + except Exception as error: # pragma: no cover + self._last_error = f"omnisocket import failed: {error}" + + def _import_backend(self) -> None: + try: + from omnisocket import CONTROL_DEFAULTS, MSG_TYPE_ERROR, MSG_TYPE_TEXT, Session # type: ignore + except ImportError: + python_dir = WORKSPACE_ROOT / "OmniSocketGo" / "python" + if python_dir.exists(): + sys.path.insert(0, str(python_dir)) + from omnisocket import CONTROL_DEFAULTS, MSG_TYPE_ERROR, MSG_TYPE_TEXT, Session # type: ignore + + self._session_cls = Session + self._msg_type_text = MSG_TYPE_TEXT + self._msg_type_error = MSG_TYPE_ERROR + self._control_defaults = dict(CONTROL_DEFAULTS) + + def _connect_session(self): + assert self._session_cls is not None + + config = load_omnisocket_config() + transport_cfg = config.get("transport", {}) + ack_cfg = config.get("control_ack_receiver", {}) + + session = self._session_cls() + session.connect( + server_addr=str(transport_cfg.get("server_addr", "127.0.0.1:10909")), + peer_id=str(ack_cfg.get("peer_id", "peer-a-ctrl-ack")), + relay_via=str(transport_cfg.get("relay_via", "")), + bind_ip=str(transport_cfg.get("bind_ip", "")), + bind_device=str(transport_cfg.get("bind_device", "")), + **self._control_defaults, + ) + return session, str(ack_cfg.get("expected_sender", "peer-b-ctrl-ack")) + + def ensure_started(self) -> None: + if self._session_cls is None: + return + with self._lock: + if self._started or self._closing.is_set(): + return + self._started = True + self._thread = threading.Thread(target=self._run, name="omnisocket-control-ack", daemon=True) + self._thread.start() + + def _run(self) -> None: + while not self._closing.is_set(): + expected_sender = "" + try: + session, expected_sender = self._connect_session() + with self._lock: + self._session = session + self._last_error = "" + if self._ever_connected: + self._reconnect_count += 1 + else: + self._ever_connected = True + + while not self._closing.is_set(): + result = session.recv(timeout_ms=1000) + if result is None: + continue + from_peer, msg_type, payload = result + if msg_type == self._msg_type_error: + with self._lock: + self._last_error = f"ack session error from {from_peer}: {payload.decode('utf-8', errors='replace')}" + continue + if msg_type != self._msg_type_text: + continue + if expected_sender and from_peer != expected_sender: + continue + try: + ack_payload = json.loads(payload.decode("utf-8")) + except (UnicodeDecodeError, json.JSONDecodeError): + continue + self._ack_tracker.handle_ack(ack_payload, time.time_ns(), time.monotonic_ns()) + except Exception as error: # pragma: no cover + if not self._closing.is_set(): + with self._lock: + self._last_error = str(error) + time.sleep(2) + finally: + if self._session is not None: + try: + self._session.close() + except Exception: + pass + with self._lock: + self._session = None + if self._closing.is_set(): + self._started = False + + def get_status(self) -> dict[str, Any]: + config = load_omnisocket_config().get("control_ack_receiver", {}) + with self._lock: + return { + "backend_ready": self._session_cls is not None, + "started": self._started, + "connected": self._session is not None, + "peer_id": str(config.get("peer_id", "")), + "expected_sender": str(config.get("expected_sender", "")), + "reconnect_count": self._reconnect_count, + "last_error": self._last_error, + } + + def close(self) -> None: + self._closing.set() + with self._lock: + session = self._session + if session is not None: + try: + session.close() + except Exception: + pass + thread = self._thread + if thread is not None and thread.is_alive(): + thread.join(timeout=0.5) + + class ControlArbiter: def __init__(self, sender: OmniSocketControlSender) -> None: self._sender = sender @@ -325,7 +607,7 @@ class ControlArbiter: self._last_error = str(error) elif active_source is not None: try: - self._sender.send_payload(payload) + self._sender.send_payload(payload, source=active_source) with self._lock: self._last_sent_at = time.monotonic() self._last_error = "" diff --git a/backend/monitoring/services.py b/backend/monitoring/services.py index 8055d02..171fc6d 100644 --- a/backend/monitoring/services.py +++ b/backend/monitoring/services.py @@ -2,26 +2,32 @@ from __future__ import annotations import atexit -from .control import ControlArbiter, NativeUdpControlIngress, OmniSocketControlSender +from .control import ControlAckTracker, ControlArbiter, NativeUdpControlIngress, OmniSocketControlAckReceiver, OmniSocketControlSender from .telemetry import GpsDataService, HubTelemetryReceiver, NetworkTelemetryService -from .video import OmniSocketVideoReceiver, VideoFrameService +from .video import OmniSocketVideoReceiver, VideoDisplayProbeStore, VideoFrameService _video_receiver = OmniSocketVideoReceiver() -_control_sender = OmniSocketControlSender() +_control_ack_tracker = ControlAckTracker() +_control_sender = OmniSocketControlSender(_control_ack_tracker) +_control_ack_receiver = OmniSocketControlAckReceiver(_control_ack_tracker) _hub_telemetry_receiver = HubTelemetryReceiver() +_video_display_probe_store = VideoDisplayProbeStore() control_arbiter = ControlArbiter(_control_sender) native_control_ingress = NativeUdpControlIngress(control_arbiter) -video_service = VideoFrameService(_video_receiver) +video_service = VideoFrameService(_video_receiver, _video_display_probe_store) gps_service = GpsDataService(_video_receiver) network_service = NetworkTelemetryService( _video_receiver, _control_sender, + _control_ack_tracker, + _control_ack_receiver, control_arbiter, native_control_ingress, _hub_telemetry_receiver, + _video_display_probe_store, ) @@ -30,7 +36,10 @@ def shutdown_monitoring_services() -> None: network_service.close, native_control_ingress.close, control_arbiter.close, + _control_ack_receiver.close, + _control_ack_tracker.close, _hub_telemetry_receiver.close, + _video_display_probe_store.close, _video_receiver.close, _control_sender.close, ): diff --git a/backend/monitoring/telemetry.py b/backend/monitoring/telemetry.py index e29cd72..65f4f8c 100644 --- a/backend/monitoring/telemetry.py +++ b/backend/monitoring/telemetry.py @@ -16,8 +16,8 @@ from .common import ( load_omnisocket_config, utc_iso_now, ) -from .control import ControlArbiter, NativeUdpControlIngress, OmniSocketControlSender -from .video import FrameTrailerMetadata, OmniSocketVideoReceiver +from .control import ControlAckTracker, ControlArbiter, NativeUdpControlIngress, OmniSocketControlAckReceiver, OmniSocketControlSender +from .video import FrameTrailerMetadata, OmniSocketVideoReceiver, VideoDisplayProbeStore LOCAL_SAMPLE_INTERVAL_MS = 500 @@ -140,7 +140,9 @@ class KcpTrendTracker: "conv": _coerce_int(raw.get("conv")), "rto_ms": _coerce_int(raw.get("rto_ms")), "srtt_ms": _coerce_int(raw.get("srtt_ms")), + "min_srtt_ms": _coerce_int(raw.get("min_srtt_ms")), "srttvar_ms": _coerce_int(raw.get("srttvar_ms")), + "last_feedback_age_ms": _coerce_int(raw.get("last_feedback_age_ms")), "snd_wnd": snd_wnd, "rmt_wnd": rmt_wnd, "inflight": inflight, @@ -419,15 +421,21 @@ class NetworkTelemetryService: self, video_receiver: OmniSocketVideoReceiver, control_sender: OmniSocketControlSender, + control_ack_tracker: ControlAckTracker, + control_ack_receiver: OmniSocketControlAckReceiver, control_arbiter: ControlArbiter, native_ingress: NativeUdpControlIngress, hub_receiver: HubTelemetryReceiver, + video_display_probe_store: VideoDisplayProbeStore, ) -> None: self._video_receiver = video_receiver self._control_sender = control_sender + self._control_ack_tracker = control_ack_tracker + self._control_ack_receiver = control_ack_receiver self._control_arbiter = control_arbiter self._native_ingress = native_ingress self._hub_receiver = hub_receiver + self._video_display_probe_store = video_display_probe_store self._trend_tracker = KcpTrendTracker() self._rate_lock = threading.Lock() self._last_rate_sample: tuple[float, int, int] | None = None @@ -439,6 +447,7 @@ class NetworkTelemetryService: def _ensure_started(self) -> None: self._video_receiver.ensure_started() self._control_arbiter.ensure_started() + self._control_ack_receiver.ensure_started() self._native_ingress.ensure_started() self._hub_receiver.ensure_started() with self._rate_lock: @@ -629,6 +638,74 @@ class NetworkTelemetryService: "updated_at": _utc_from_epoch(updated_at_epoch_ms / 1000.0) or utc_iso_now(), } + def _derive_latency_estimate( + self, + *, + links: dict[str, dict[str, Any]], + video_receiver_status: dict[str, Any], + display_probe_status: dict[str, Any], + ) -> dict[str, Any]: + a_to_d_control_raw = links["a_to_d"]["sessions"]["control"]["kcp"].get("srtt_ms") + d_to_b_control_raw = links["d_to_b"]["sessions"]["control"]["kcp"].get("srtt_ms") + a_to_d_control_min_raw = links["a_to_d"]["sessions"]["control"]["kcp"].get("min_srtt_ms") + d_to_b_control_min_raw = links["d_to_b"]["sessions"]["control"]["kcp"].get("min_srtt_ms") + a_to_d_video_raw = links["a_to_d"]["sessions"]["video"]["kcp"].get("srtt_ms") + d_to_b_video_raw = links["d_to_b"]["sessions"]["video"]["kcp"].get("srtt_ms") + + a_to_d_control = _coerce_float(a_to_d_control_raw) if a_to_d_control_raw is not None else None + d_to_b_control = _coerce_float(d_to_b_control_raw) if d_to_b_control_raw is not None else None + a_to_d_control_min = _coerce_float(a_to_d_control_min_raw) if a_to_d_control_min_raw is not None else None + d_to_b_control_min = _coerce_float(d_to_b_control_min_raw) if d_to_b_control_min_raw is not None else None + a_to_d_video = _coerce_float(a_to_d_video_raw) if a_to_d_video_raw is not None else None + d_to_b_video = _coerce_float(d_to_b_video_raw) if d_to_b_video_raw is not None else None + ack_estimate = self._control_ack_tracker.get_latest_estimate() + capture_to_send_raw = video_receiver_status.get("latest_capture_to_send_ms") + a_recv_to_paint_raw = display_probe_status.get("a_recv_to_paint_ms") + capture_to_send_ms = _coerce_float(capture_to_send_raw) if capture_to_send_raw is not None else None + a_recv_to_paint_ms = _coerce_float(a_recv_to_paint_raw) if a_recv_to_paint_raw is not None else None + video_network_oneway_est_ms = ( + round((a_to_d_video + d_to_b_video) / 2.0, 3) + if a_to_d_video is not None and d_to_b_video is not None + else None + ) + video_partial_est_ms = None + if capture_to_send_ms is not None and video_network_oneway_est_ms is not None: + video_partial_est_ms = round(capture_to_send_ms + video_network_oneway_est_ms, 3) + video_e2e_est_ms = None + if video_partial_est_ms is not None and a_recv_to_paint_ms is not None: + video_e2e_est_ms = round(video_partial_est_ms + a_recv_to_paint_ms, 3) + + return { + "control_loop_rtt_ms": ack_estimate.get("control_loop_rtt_ms"), + "control_to_persist_est_ms": ack_estimate.get("control_to_persist_est_ms"), + "control_oneway_srtt_est_ms": ( + round((a_to_d_control + d_to_b_control) / 2.0, 3) + if a_to_d_control is not None and d_to_b_control is not None + else None + ), + "control_oneway_bestcase_est_ms": ( + round((a_to_d_control_min + d_to_b_control_min) / 2.0, 3) + if a_to_d_control_min is not None and d_to_b_control_min is not None + else None + ), + "video_network_oneway_est_ms": video_network_oneway_est_ms, + "video_partial_est_ms": video_partial_est_ms, + "video_e2e_est_ms": video_e2e_est_ms, + "estimate_method": { + "control": "ack_loop" if ack_estimate.get("ack_available") else "srtt_fallback", + "video": "capture_to_send+srtt/2+recv_to_paint" if video_e2e_est_ms is not None else "capture_to_send+srtt/2", + }, + "clock_sync_required": False, + "assumptions": [ + "control one-way estimate uses ACK loop when available", + "video one-way estimate uses per-leg SRTT and local paint timing", + ], + "confidence": { + "control": "derived_ack" if ack_estimate.get("ack_available") else "fallback_srtt", + "video": "derived_local_probe" if video_e2e_est_ms is not None else "partial_without_probe", + }, + } + def get_latest(self) -> dict[str, Any]: self._ensure_started() @@ -645,7 +722,10 @@ class NetworkTelemetryService: arbiter_status = self._control_arbiter.get_status() ingress_status = self._native_ingress.get_status() sender_status = self._control_sender.get_status() + ack_receiver_status = self._control_ack_receiver.get_status() + ack_status = self._control_ack_tracker.get_latest_estimate() telemetry_state = self._hub_receiver.get_snapshot() + display_probe_status = self._video_display_probe_store.get_status() total_send_bytes = int(video_app.get("send_bytes", 0)) + int(control_app.get("send_bytes", 0)) total_recv_bytes = int(video_app.get("recv_bytes", 0)) + int(control_app.get("recv_bytes", 0)) @@ -697,6 +777,9 @@ class NetworkTelemetryService: remote_stale, ), } + self._video_receiver.update_remote_video_srtt( + _coerce_int(remote_sessions["video"]["kcp"].get("srtt_ms")) if remote_sessions["video"]["kcp"].get("srtt_ms") is not None else None + ) links = { "a_to_d": self._build_link("local-a-side", local_updated_at, False, local_sessions), @@ -724,6 +807,11 @@ class NetworkTelemetryService: telemetry_state=telemetry_state, watchdog_status=watchdog_status, ) + latency_estimate = self._derive_latency_estimate( + links=links, + video_receiver_status=video_receiver_status, + display_probe_status=display_probe_status, + ) if local_control_registered and remote_control_fresh: peer_status = "online" @@ -764,6 +852,12 @@ class NetworkTelemetryService: }, }, "links": links, + "latency_estimate": latency_estimate, + "video_freshness": video_receiver_status.get("freshness", {}), + "control_ack_status": { + **ack_status, + "receiver": ack_receiver_status, + }, "telemetry_receiver": { "hub_connected": bool(telemetry_state.get("connected")), "hub_updated_at": telemetry_state.get("updated_at"), @@ -781,6 +875,7 @@ class NetworkTelemetryService: "control": { "arbiter": arbiter_status, "sender": sender_status, + "ack_receiver": ack_receiver_status, }, } diff --git a/backend/monitoring/urls.py b/backend/monitoring/urls.py index fc23d15..66f0a4b 100644 --- a/backend/monitoring/urls.py +++ b/backend/monitoring/urls.py @@ -9,5 +9,6 @@ urlpatterns = [ path("network/latest/", views.network_latest, name="network-latest"), path("video/status/", views.video_status, name="video-status"), path("video/frame/", views.video_frame, name="video-frame"), + path("video/display-probe/", views.video_display_probe, name="video-display-probe"), path("video/stream/", views.video_stream, name="video-stream"), ] diff --git a/backend/monitoring/video.py b/backend/monitoring/video.py index c17cedc..b1b321e 100644 --- a/backend/monitoring/video.py +++ b/backend/monitoring/video.py @@ -2,6 +2,7 @@ from __future__ import annotations from collections import deque from dataclasses import dataclass +import hashlib import math import struct import sys @@ -13,6 +14,7 @@ from .common import ( JPEG_FRAME_DIR, OMNISOCKET_CONFIG_PATH, OMNISOCKET_FRAME_FRESH_SECONDS, + JsonlRunLogger, VIDEO_SOURCE_MODE, VIDEO_TIMESTAMP_SAMPLE_SIZE, VIDEO_TRAILER_BYTES, @@ -35,16 +37,91 @@ def safe_kcp_stats(session: Any) -> dict[str, Any]: return {} +class VideoDisplayProbeStore: + def __init__(self) -> None: + self._lock = threading.Lock() + self._logger = JsonlRunLogger("BLITZ_A_VIDEO_DISPLAY_PROBE_LOG_PATH", "a-video-display-probe") + self._latest: VideoDisplayProbeStatus = VideoDisplayProbeStatus( + updated_at=None, + frame_seq=None, + frame_hash="", + input_to_next_fresh_frame_ms=None, + input_to_next_changed_frame_ms=None, + input_to_next_paint_ms=None, + a_recv_to_paint_ms=None, + ) + + def record_event(self, payload: dict[str, Any]) -> None: + backend_received_unix_ns = payload.get("backend_received_unix_ns") + paint_unix_ms = payload.get("paint_unix_ms") + a_recv_to_paint_ms = None + try: + if backend_received_unix_ns is not None and paint_unix_ms is not None: + a_recv_to_paint_ms = round(float(paint_unix_ms) - (int(backend_received_unix_ns) / 1_000_000.0), 3) + except (TypeError, ValueError): + a_recv_to_paint_ms = None + + status = VideoDisplayProbeStatus( + updated_at=str(payload.get("updated_at") or ""), + frame_seq=int(payload["frame_seq"]) if payload.get("frame_seq") is not None else None, + frame_hash=str(payload.get("frame_hash") or ""), + input_to_next_fresh_frame_ms=self._coerce_float(payload.get("input_to_next_fresh_frame_ms")), + input_to_next_changed_frame_ms=self._coerce_float(payload.get("input_to_next_changed_frame_ms")), + input_to_next_paint_ms=self._coerce_float(payload.get("input_to_next_paint_ms")), + a_recv_to_paint_ms=a_recv_to_paint_ms, + ) + with self._lock: + self._latest = status + self._logger.write(payload) + + def get_status(self) -> dict[str, Any]: + with self._lock: + latest = self._latest + return { + "updated_at": latest.updated_at, + "frame_seq": latest.frame_seq, + "frame_hash": latest.frame_hash, + "input_to_next_fresh_frame_ms": latest.input_to_next_fresh_frame_ms, + "input_to_next_changed_frame_ms": latest.input_to_next_changed_frame_ms, + "input_to_next_paint_ms": latest.input_to_next_paint_ms, + "a_recv_to_paint_ms": latest.a_recv_to_paint_ms, + } + + def close(self) -> None: + self._logger.close() + + @staticmethod + def _coerce_float(value: Any) -> float | None: + try: + if value is None: + return None + return round(float(value), 3) + except (TypeError, ValueError): + return None + + @dataclass(frozen=True) class FrameTrailerMetadata: timestamp_ns: int latitude: float longitude: float + capture_to_send_ms: int raw_latitude_hex: str raw_longitude_hex: str received_at: float +@dataclass(frozen=True) +class VideoDisplayProbeStatus: + updated_at: str | None + frame_seq: int | None + frame_hash: str + input_to_next_fresh_frame_ms: float | None + input_to_next_changed_frame_ms: float | None + input_to_next_paint_ms: float | None + a_recv_to_paint_ms: float | None + + class OmniSocketVideoReceiver: def __init__(self) -> None: self._lock = threading.Lock() @@ -56,17 +133,31 @@ class OmniSocketVideoReceiver: self._video_defaults: dict[str, Any] = {} self._latest_frame: bytes | None = None self._latest_received_at = 0.0 + self._latest_backend_received_unix_ns = 0 + self._latest_backend_received_mono_ns = 0 self._latest_sequence: int | None = None self._latest_metadata: FrameTrailerMetadata | None = None - self._latest_latency_ms: float | None = None + self._latest_sender_clock_delta_ms_raw: float | None = None self._latest_timestamp_unit: str | None = None self._latest_timestamp_endianness: str | None = None - self._latency_samples_ms: deque[float] = deque(maxlen=VIDEO_TIMESTAMP_SAMPLE_SIZE) + self._sender_clock_delta_samples_ms_raw: deque[float] = deque(maxlen=VIDEO_TIMESTAMP_SAMPLE_SIZE) + self._latest_frame_hash = "" + self._latest_frame_bytes = 0 + self._last_frame_hash = "" + self._last_sequence: int | None = None + self._last_backend_received_mono_ns = 0 + self._interarrival_ms_samples: deque[float] = deque(maxlen=120) + self._repeat_samples: deque[int] = deque(maxlen=120) + self._skip_samples: deque[int] = deque(maxlen=120) + self._freeze_samples_ms: deque[float] = deque(maxlen=120) + self._current_stale_frame_run_length = 0 + self._latest_remote_video_srtt_ms: int | None = None self._frames_received = 0 self._last_error = "" self._reconnect_count = 0 self._ever_connected = False self._closing = threading.Event() + self._frame_recv_logger = JsonlRunLogger("BLITZ_A_VIDEO_FRAME_RECV_LOG_PATH", "a-video-frame-recv") self._load_backend() def _load_backend(self) -> None: @@ -174,7 +265,7 @@ class OmniSocketVideoReceiver: return None try: - timestamp_ms, latitude, longitude = VIDEO_TRAILER_STRUCT.unpack(trailer) + timestamp_ms, latitude, longitude, capture_to_send_ms = VIDEO_TRAILER_STRUCT.unpack(trailer) except struct.error: return None @@ -195,6 +286,7 @@ class OmniSocketVideoReceiver: timestamp_ns=timestamp_ns, latitude=latitude, longitude=longitude, + capture_to_send_ms=int(capture_to_send_ms), raw_latitude_hex=trailer[8:16].hex(), raw_longitude_hex=trailer[16:24].hex(), received_at=received_at if received_at is not None else time.time(), @@ -205,6 +297,56 @@ class OmniSocketVideoReceiver: time.time() - self._latest_received_at <= OMNISOCKET_FRAME_FRESH_SECONDS ) + def update_remote_video_srtt(self, srtt_ms: int | None) -> None: + with self._lock: + self._latest_remote_video_srtt_ms = srtt_ms + + def _freshness_payload_locked(self) -> dict[str, Any]: + interarrival_samples = list(self._interarrival_ms_samples) + repeat_samples = list(self._repeat_samples) + skip_samples = list(self._skip_samples) + freeze_samples_ms = list(self._freeze_samples_ms) + + inter_frame_avg_ms = round(sum(interarrival_samples) / len(interarrival_samples), 3) if interarrival_samples else None + if interarrival_samples: + ordered = sorted(interarrival_samples) + p95_index = min(len(ordered) - 1, max(0, math.ceil(len(ordered) * 0.95) - 1)) + inter_frame_p95_ms = round(ordered[p95_index], 3) + else: + inter_frame_p95_ms = None + + repeated_frame_ratio = round(sum(repeat_samples) / len(repeat_samples), 4) if repeat_samples else 0.0 + total_skip = sum(skip_samples) + expected_frames = len(skip_samples) + total_skip + skip_ratio = round(total_skip / expected_frames, 4) if expected_frames > 0 else 0.0 + longest_freeze_ms = round(max(freeze_samples_ms), 3) if freeze_samples_ms else 0.0 + return { + "inter_frame_avg_ms": inter_frame_avg_ms, + "inter_frame_p95_ms": inter_frame_p95_ms, + "repeated_frame_ratio": repeated_frame_ratio, + "skip_ratio": skip_ratio, + "longest_freeze_ms": longest_freeze_ms, + "stale_frame_run_length": self._current_stale_frame_run_length, + "relative_freshness_lag_frames": self._current_stale_frame_run_length + (skip_samples[-1] if skip_samples else 0), + } + + def _frame_headers_locked(self) -> dict[str, str]: + capture_to_send_ms = self._latest_metadata.capture_to_send_ms if self._latest_metadata is not None else None + headers: dict[str, str] = {} + if self._latest_sequence is not None: + headers["X-Blitz-Frame-Seq"] = str(self._latest_sequence) + if self._latest_backend_received_unix_ns > 0: + headers["X-Blitz-Backend-Received-Unix-Ns"] = str(self._latest_backend_received_unix_ns) + if self._latest_frame_hash: + headers["X-Blitz-Frame-Hash"] = self._latest_frame_hash + if capture_to_send_ms is not None: + headers["X-Blitz-BSide-Capture-To-Send-Ms"] = str(capture_to_send_ms) + return headers + + def get_latest_frame_headers(self) -> dict[str, str]: + snapshot = self.get_latest_frame_snapshot() + return snapshot[1] if snapshot is not None else {} + def _run(self) -> None: while not self._closing.is_set(): try: @@ -232,27 +374,81 @@ class OmniSocketVideoReceiver: continue received_at = time.time() + received_unix_ns = time.time_ns() + received_mono_ns = time.monotonic_ns() frame_metadata = self._extract_frame_metadata(frame, received_at=received_at) - latency_ms = None + sender_clock_delta_ms_raw = None if frame_metadata is not None: - latency_ms = round((time.time_ns() - frame_metadata.timestamp_ns) / 1_000_000, 3) + sender_clock_delta_ms_raw = round((received_unix_ns - frame_metadata.timestamp_ns) / 1_000_000, 3) unit = VIDEO_TRAILER_TIMESTAMP_UNIT endianness = VIDEO_TRAILER_ENDIANNESS else: unit = None endianness = None + frame_sequence = self._extract_sequence(frame) + frame_hash = hashlib.blake2s(jpeg_frame, digest_size=8).hexdigest() + local_kcp = safe_kcp_stats(session) if self._frame_recv_logger.enabled else {} + frame_log_record: dict[str, Any] | None = None with self._lock: + interarrival_ms = None + if self._last_backend_received_mono_ns > 0: + interarrival_ms = round((received_mono_ns - self._last_backend_received_mono_ns) / 1_000_000, 3) + self._interarrival_ms_samples.append(interarrival_ms) + + sequence_gap = 0 + if frame_sequence is not None and self._last_sequence is not None and frame_sequence > self._last_sequence: + sequence_gap = max(0, frame_sequence - self._last_sequence - 1) + + repeat_flag = bool(self._last_frame_hash) and frame_hash == self._last_frame_hash + self._repeat_samples.append(1 if repeat_flag else 0) + self._skip_samples.append(sequence_gap) + if repeat_flag: + self._current_stale_frame_run_length += 1 + else: + self._current_stale_frame_run_length = 0 + if interarrival_ms is not None and repeat_flag: + self._freeze_samples_ms.append(interarrival_ms) + elif interarrival_ms is not None: + self._freeze_samples_ms.append(0.0) + self._latest_frame = jpeg_frame self._latest_received_at = received_at - self._latest_sequence = self._extract_sequence(frame) + self._latest_backend_received_unix_ns = received_unix_ns + self._latest_backend_received_mono_ns = received_mono_ns + self._latest_sequence = frame_sequence + self._last_sequence = frame_sequence self._latest_metadata = frame_metadata - self._latest_latency_ms = latency_ms + self._latest_sender_clock_delta_ms_raw = sender_clock_delta_ms_raw self._latest_timestamp_unit = unit self._latest_timestamp_endianness = endianness - if latency_ms is not None: - self._latency_samples_ms.append(latency_ms) + self._latest_frame_hash = frame_hash + self._latest_frame_bytes = len(jpeg_frame) + self._last_frame_hash = frame_hash + self._last_backend_received_mono_ns = received_mono_ns + if sender_clock_delta_ms_raw is not None: + self._sender_clock_delta_samples_ms_raw.append(sender_clock_delta_ms_raw) self._frames_received += 1 + + if self._frame_recv_logger.enabled: + frame_log_record = { + "ts_unix_nano": received_unix_ns, + "frame_seq": frame_sequence, + "backend_received_unix_ns": received_unix_ns, + "backend_received_mono_ns": received_mono_ns, + "jpeg_bytes": len(jpeg_frame), + "interarrival_ms": interarrival_ms, + "sequence_gap": sequence_gap, + "repeat_flag": repeat_flag, + "skip_count": sequence_gap, + "frame_hash": frame_hash, + "a_to_d_video_srtt_ms": local_kcp.get("srtt_ms"), + "d_to_b_video_srtt_ms": self._latest_remote_video_srtt_ms, + "b_side_capture_to_send_ms": frame_metadata.capture_to_send_ms if frame_metadata is not None else None, + "sender_clock_delta_ms_raw": sender_clock_delta_ms_raw, + } + if frame_log_record is not None: + self._frame_recv_logger.write(frame_log_record) except Exception as error: # pragma: no cover - runtime integration path if not self._closing.is_set(): session_error = "" @@ -275,11 +471,17 @@ class OmniSocketVideoReceiver: self._started = False def get_latest_frame(self) -> bytes | None: + snapshot = self.get_latest_frame_snapshot() + return snapshot[0] if snapshot is not None else None + + def get_latest_frame_snapshot(self) -> tuple[bytes, dict[str, str]] | None: self.ensure_started() with self._lock: if not self._has_fresh_frame_locked(): return None - return self._latest_frame + if self._latest_frame is None: + return None + return self._latest_frame, self._frame_headers_locked() def get_latest_frame_metadata(self) -> FrameTrailerMetadata | None: self.ensure_started() @@ -313,25 +515,28 @@ class OmniSocketVideoReceiver: session_stats = self.session_stats() with self._lock: has_recent_frame = self._has_fresh_frame_locked() - if has_recent_frame and self._latest_latency_ms is not None: + freshness_status = self._freshness_payload_locked() + if has_recent_frame and self._latest_sender_clock_delta_ms_raw is not None: timing_status = { "available": True, - "latest_delta_ms": self._latest_latency_ms, - "delta_samples_ms": list(reversed(self._latency_samples_ms)), - "sample_count": len(self._latency_samples_ms), + "sender_clock_delta_ms_raw": self._latest_sender_clock_delta_ms_raw, + "sender_clock_delta_samples_ms_raw": list(reversed(self._sender_clock_delta_samples_ms_raw)), + "sample_count": len(self._sender_clock_delta_samples_ms_raw), "sample_window_size": VIDEO_TIMESTAMP_SAMPLE_SIZE, "timestamp_unit": self._latest_timestamp_unit, "timestamp_endianness": self._latest_timestamp_endianness, + "unsynced_clock": True, } else: timing_status = { "available": False, - "latest_delta_ms": None, - "delta_samples_ms": [], + "sender_clock_delta_ms_raw": None, + "sender_clock_delta_samples_ms_raw": [], "sample_count": 0, "sample_window_size": VIDEO_TIMESTAMP_SAMPLE_SIZE, "timestamp_unit": None, "timestamp_endianness": None, + "unsynced_clock": True, } return { "backend_ready": self._session_cls is not None, @@ -341,6 +546,11 @@ class OmniSocketVideoReceiver: "has_recent_frame": has_recent_frame, "frames_received": self._frames_received, "latest_sequence": self._latest_sequence, + "latest_frame_hash": self._latest_frame_hash, + "latest_backend_received_unix_ns": self._latest_backend_received_unix_ns or None, + "latest_backend_received_mono_ns": self._latest_backend_received_mono_ns or None, + "latest_frame_bytes": self._latest_frame_bytes, + "latest_capture_to_send_ms": self._latest_metadata.capture_to_send_ms if self._latest_metadata is not None else None, "reconnect_count": self._reconnect_count, "last_server_error": str(session_stats.get("last_server_error", "") or ""), "last_error": self._last_error, @@ -350,6 +560,7 @@ class OmniSocketVideoReceiver: "peer_id": str(video_cfg.get("peer_id", "")), "buffer_bytes": int(video_cfg.get("buffer_bytes", 0)), "timing": timing_status, + "freshness": freshness_status, } def close(self) -> None: @@ -364,15 +575,18 @@ class OmniSocketVideoReceiver: thread = self._thread if thread is not None and thread.is_alive(): thread.join(timeout=0.5) + self._frame_recv_logger.close() class VideoFrameService: - def __init__(self, receiver: OmniSocketVideoReceiver) -> None: + def __init__(self, receiver: OmniSocketVideoReceiver, display_probe_store: VideoDisplayProbeStore) -> None: self._receiver = receiver + self._display_probe_store = display_probe_store def get_status(self) -> dict[str, Any]: receiver_status = self._receiver.get_status() receiver_frame = self._receiver.get_latest_frame() + display_probe_status = self._display_probe_store.get_status() if receiver_frame is not None: return { @@ -384,6 +598,8 @@ class VideoFrameService: "source_detail": f"peer stream active, frames={receiver_status['frames_received']}", "receiver": receiver_status, "timing": receiver_status["timing"], + "freshness": receiver_status.get("freshness", {}), + "display_probe": display_probe_status, } wait_detail = receiver_status["last_error"] or ( @@ -398,6 +614,8 @@ class VideoFrameService: "source_detail": wait_detail, "receiver": receiver_status, "timing": receiver_status["timing"], + "freshness": receiver_status.get("freshness", {}), + "display_probe": display_probe_status, } def get_next_frame(self) -> bytes: @@ -406,6 +624,18 @@ class VideoFrameService: return receiver_frame raise RuntimeError("no live OmniSocket JPEG frame is currently available") + def get_next_frame_with_headers(self) -> tuple[bytes, dict[str, str]]: + snapshot = self._receiver.get_latest_frame_snapshot() + if snapshot is not None: + return snapshot + raise RuntimeError("no live OmniSocket JPEG frame is currently available") + + def get_latest_frame_headers(self) -> dict[str, str]: + return self._receiver.get_latest_frame_headers() + + def record_display_probe(self, payload: dict[str, Any]) -> None: + self._display_probe_store.record_event(payload) + def iter_mjpeg(self, fps: float = 6.0) -> Iterator[bytes]: frame_interval = 1.0 / max(1.0, min(fps, 30.0)) while True: diff --git a/backend/monitoring/views.py b/backend/monitoring/views.py index 86cbf12..3edb0f5 100644 --- a/backend/monitoring/views.py +++ b/backend/monitoring/views.py @@ -1,5 +1,8 @@ from __future__ import annotations +import json + +from django.views.decorators.csrf import csrf_exempt from django.http import HttpResponse, StreamingHttpResponse from rest_framework.decorators import api_view from rest_framework.response import Response @@ -34,25 +37,20 @@ def video_status(request): def video_frame(request): - status = video_service.get_status() - if not status["available"]: - return HttpResponse( - status.get("source_detail") or f"JPEG frame directory not found: {status['frame_dir']}", - status=503, - content_type="text/plain; charset=utf-8", - ) - try: - frame = video_service.get_next_frame() + frame, headers = video_service.get_next_frame_with_headers() except (FileNotFoundError, RuntimeError) as error: + status = video_service.get_status() return HttpResponse( - str(error), + status.get("source_detail") or str(error), status=503, content_type="text/plain; charset=utf-8", ) response = HttpResponse(frame, content_type="image/jpeg") response["Cache-Control"] = "no-store, no-cache, must-revalidate, max-age=0" + for key, value in headers.items(): + response[key] = value return response @@ -76,3 +74,18 @@ def video_stream(request): ) response["Cache-Control"] = "no-store, no-cache, must-revalidate, max-age=0" return response + + +@csrf_exempt +@api_view(["POST"]) +def video_display_probe(request): + try: + payload = json.loads(request.body.decode("utf-8")) + except (UnicodeDecodeError, json.JSONDecodeError): + return Response({"detail": "invalid json"}, status=400) + + if not isinstance(payload, dict): + return Response({"detail": "expected json object"}, status=400) + + video_service.record_display_probe(payload) + return Response({"ok": True}) diff --git a/config/omnisocket_demo.yaml b/config/omnisocket_demo.yaml index 793a1eb..27ff328 100644 --- a/config/omnisocket_demo.yaml +++ b/config/omnisocket_demo.yaml @@ -12,6 +12,10 @@ control_sender: peer_id: "peer-a-ctrl" target_peer: "peer-b-ctrl" +control_ack_receiver: + peer_id: "peer-a-ctrl-ack" + expected_sender: "peer-b-ctrl-ack" + control_ingress: native_udp_bind: "127.0.0.1:10921" source_lease_ms: 300 diff --git a/frontend/src/components/NetworkPanel.vue b/frontend/src/components/NetworkPanel.vue index 29e15f9..60d0ef1 100644 --- a/frontend/src/components/NetworkPanel.vue +++ b/frontend/src/components/NetworkPanel.vue @@ -33,6 +33,9 @@ function formatScalar(value?: number | string | null, suffix = '') { if (value === null || value === undefined || value === '') { return '--' } + if (typeof value === 'number') { + return `${value.toFixed(1)}${suffix}` + } return `${value}${suffix}` } @@ -58,20 +61,20 @@ function legSessions(link: LinkTelemetry | null): Array<{ name: string; data: Li
- Latency - {{ formatScalar(network?.latency_ms, ' ms') }} + Control Loop RTT + {{ formatScalar(network?.latency_estimate?.control_loop_rtt_ms, ' ms') }}
- Jitter - {{ formatScalar(network?.jitter_ms, ' ms') }} + Control to Persist + {{ formatScalar(network?.latency_estimate?.control_to_persist_est_ms, ' ms') }}
- Active Control - {{ activeSource }} + Control SRTT One-way + {{ formatScalar(network?.latency_estimate?.control_oneway_srtt_est_ms, ' ms') }}
- Lease - {{ formatScalar(network?.control_lease_remaining_ms, ' ms') }} + Video One-way Est. + {{ formatScalar(network?.latency_estimate?.video_network_oneway_est_ms, ' ms') }}
TX Rate @@ -89,6 +92,10 @@ function legSessions(link: LinkTelemetry | null): Array<{ name: string; data: Li

Health Confidence: {{ network?.robot_health?.confidence ?? 'n/a' }}

Health Updated: {{ formatTime(network?.robot_health?.updated_at) }}

Transport: {{ network?.transport ?? 'n/a' }} / {{ network?.source_mode ?? 'n/a' }}

+

Active Control: {{ activeSource }}

+

Lease: {{ formatScalar(network?.control_lease_remaining_ms, ' ms') }}

+

ACK Mode: {{ network?.control_ack_status?.ack_available ? 'ack-loop' : 'srtt-fallback' }}

+

ACK Updated: {{ formatTime(network?.control_ack_status?.updated_at) }}

Telemetry Peer: {{ network?.telemetry_receiver?.peer_id ?? 'n/a' }}

Telemetry Registered: {{ network?.telemetry_receiver?.registered ? 'yes' : 'no' }}

Hub Freshness: {{ formatTime(network?.telemetry_receiver?.hub_updated_at) }}

@@ -182,8 +189,12 @@ function legSessions(link: LinkTelemetry | null): Array<{ name: string; data: Li

Combined: sessions={{ network?.combined?.connected_sessions ?? 0 }} send={{ network?.combined?.send_bytes ?? 0 }}B recv={{ network?.combined?.recv_bytes ?? 0 }}B

+

Video E2E Est.: {{ formatScalar(network?.latency_estimate?.video_e2e_est_ms, ' ms') }} / confidence={{ network?.latency_estimate?.confidence?.video ?? 'n/a' }}

+

Control Estimate Confidence: {{ network?.latency_estimate?.confidence?.control ?? 'n/a' }}

+

Video Freshness: repeat={{ formatScalar((network?.video_freshness?.repeated_frame_ratio ?? 0) * 100, '%') }} skip={{ formatScalar((network?.video_freshness?.skip_ratio ?? 0) * 100, '%') }} freeze={{ formatScalar(network?.video_freshness?.longest_freeze_ms, ' ms') }}

Native UDP: {{ network?.ingress?.native_udp?.bind_addr ?? 'n/a' }} packets={{ network?.ingress?.native_udp?.packets_received ?? 0 }} invalid={{ network?.ingress?.native_udp?.invalid_packets ?? 0 }}

Control Sender: {{ network?.control?.sender?.peer_id ?? 'n/a' }} -> {{ network?.control?.sender?.target_peer ?? 'n/a' }} sends={{ network?.control?.sender?.send_count ?? 0 }} registered={{ network?.control?.sender?.registered ? 'yes' : 'no' }}

+

ACK Receiver: {{ network?.control?.ack_receiver?.peer_id ?? 'n/a' }} reconnects={{ network?.control?.ack_receiver?.reconnect_count ?? 0 }}

Control Reconnects: {{ network?.control?.sender?.reconnect_count ?? 0 }}

Control Session Error: {{ network?.control?.sender?.last_server_error }}

diff --git a/frontend/src/components/VideoPanel.vue b/frontend/src/components/VideoPanel.vue index bbd5365..8851d78 100644 --- a/frontend/src/components/VideoPanel.vue +++ b/frontend/src/components/VideoPanel.vue @@ -1,77 +1,101 @@ @@ -224,16 +413,24 @@ watch([currentFps, canRequestFrames], () => { margin: 0 0 4px; color: #5b7aff; text-transform: uppercase; - letter-spacing: 0.12em; + letter-spacing: 0.08em; font-size: 12px; font-weight: 700; } -h2 { +h2, +h3 { margin: 0; +} + +h2 { font-size: 24px; } +h3 { + font-size: 16px; +} + .badge { padding: 8px 12px; border-radius: 999px; @@ -249,11 +446,10 @@ h2 { } .video-shell { - position: relative; overflow: hidden; - border-radius: 20px; + border-radius: 8px; border: 1px solid rgba(133, 147, 169, 0.28); - background: linear-gradient(180deg, #09111f 0%, #050812 100%); + background: #050812; } .video-frame { @@ -261,115 +457,107 @@ h2 { width: 100%; aspect-ratio: 16 / 9; object-fit: cover; - background: #02050d; } .video-placeholder { display: grid; place-items: center; - width: 100%; aspect-ratio: 16 / 9; - padding: 24px; - color: #a8b4ce; - text-align: center; - line-height: 1.7; - background: - radial-gradient(circle at top, rgba(91, 122, 255, 0.14), transparent 42%), - #02050d; + color: #95a4c6; +} + +.stats, +.metric-grid { + display: grid; + gap: 10px; } .stats { - display: grid; - grid-template-columns: repeat(2, minmax(0, 1fr)); - gap: 12px; + grid-template-columns: repeat(4, minmax(0, 1fr)); } -.stat-card { - padding: 14px; - border-radius: 16px; - background: rgba(7, 14, 26, 0.78); - border: 1px solid rgba(133, 147, 169, 0.2); +.metric-grid { + grid-template-columns: repeat(3, minmax(0, 1fr)); +} + +.stat-card, +.metric-group, +.timing-panel { + padding: 12px; + border-radius: 8px; + border: 1px solid rgba(133, 147, 169, 0.18); + background: rgba(7, 14, 26, 0.86); +} + +.stat-card span, +.metric-group p, +.hint { + color: #d5dbee; } .stat-card span { display: block; - margin-bottom: 8px; - color: #8d99b3; + margin-bottom: 6px; font-size: 12px; + color: #9aaccc; + text-transform: uppercase; } .stat-card strong { font-size: 18px; } -.timing-panel { +.metric-group { display: grid; - gap: 12px; - padding: 14px; - border-radius: 18px; - background: rgba(7, 14, 26, 0.88); - border: 1px solid rgba(133, 147, 169, 0.18); + gap: 8px; +} + +.metric-group p { + margin: 0; + line-height: 1.6; } .timing-head { display: flex; justify-content: space-between; gap: 12px; - align-items: center; - color: #cfd7e6; -} - -.timing-head span { - color: #8d99b3; - font-size: 12px; -} - -.timing-head strong { - font-size: 16px; + margin-bottom: 10px; } .timing-grid { - display: grid; - grid-template-columns: repeat(5, minmax(0, 1fr)); + display: flex; + flex-wrap: wrap; gap: 8px; } .timing-label { - display: grid; - place-items: center; - min-height: 40px; - padding: 8px 10px; - border-radius: 12px; + padding: 6px 8px; + border-radius: 8px; background: rgba(91, 122, 255, 0.12); - border: 1px solid rgba(91, 122, 255, 0.28); - color: #dce4ff; + color: #dbe5ff; font-size: 12px; - font-weight: 700; -} - -.timing-label.empty { - background: rgba(133, 147, 169, 0.08); - border-color: rgba(133, 147, 169, 0.18); - color: #7e8aa5; } .hint { margin: 0; - color: #8d99b3; - line-height: 1.65; + line-height: 1.7; } .hint.subtle { - font-size: 13px; + color: #96a5c3; } -@media (max-width: 720px) { - .stats { - grid-template-columns: 1fr; - } - - .timing-grid { +@media (max-width: 1100px) { + .stats, + .metric-grid { grid-template-columns: repeat(2, minmax(0, 1fr)); } } + +@media (max-width: 720px) { + .stats, + .metric-grid { + grid-template-columns: 1fr; + } +} diff --git a/frontend/src/composables/useControlInterface.ts b/frontend/src/composables/useControlInterface.ts index 96a39ff..67fe2a0 100644 --- a/frontend/src/composables/useControlInterface.ts +++ b/frontend/src/composables/useControlInterface.ts @@ -66,6 +66,8 @@ const gamepadMapping = ref('') const gamepadAxes = ref([0, 0, 0, 0]) const gamepadButtonPressed = ref(Array.from({ length: GAMEPAD_BUTTON_LABELS.length }, () => false)) const activeSource = ref('idle') +const operatorInputSequence = ref(0) +const lastOperatorInputPerfMs = ref(0) function clampValue(value: number, min: number, max: number) { return Math.min(max, Math.max(min, value)) @@ -152,6 +154,11 @@ let consumerCount = 0 let lastGamepadSignature = '' let lastCommandSignature = '' +function noteOperatorInput() { + operatorInputSequence.value += 1 + lastOperatorInputPerfMs.value = performance.now() +} + function normalizeAxis(raw: number) { if (Math.abs(raw) < GAMEPAD_DEADZONE) { return 0 @@ -366,7 +373,7 @@ function sendCurrentCommand() { socket.send(packCommand(resolvedCommandValues())) } -function refreshSendLoop(force = false) { +function refreshSendLoop(force = false, noteInput = true) { const source = resolvedSource() const values = resolvedCommandValues() const signature = commandSignature(values, source) @@ -375,6 +382,9 @@ function refreshSendLoop(force = false) { return } lastCommandSignature = signature + if (noteInput) { + noteOperatorInput() + } stopSendLoop() if (socket == null || socket.readyState !== WebSocket.OPEN) { @@ -497,7 +507,7 @@ function connectSocket() { socket.onopen = () => { socketState.value = 'open' lastServerMessage.value = 'control link live' - refreshSendLoop(true) + refreshSendLoop(true, false) } socket.onmessage = (event) => { @@ -706,5 +716,14 @@ export function useControlInterface() { gamepadRightStick, gamepadAxes, gamepadActive: computed(() => gamepadActiveInternal()), + operatorInputSequence, + lastOperatorInputPerfMs, + } +} + +export function useOperatorInputTelemetry() { + return { + operatorInputSequence, + lastOperatorInputPerfMs, } } diff --git a/frontend/src/lib/api.ts b/frontend/src/lib/api.ts index 6b166f9..d94c017 100644 --- a/frontend/src/lib/api.ts +++ b/frontend/src/lib/api.ts @@ -24,6 +24,19 @@ export function buildVideoFrameUrl(frameKey: number) { return `${API_BASE}/api/video/frame/?frame=${frameKey}&t=${Date.now()}` } +export async function postVideoDisplayProbe(payload: Record) { + const response = await fetch(`${API_BASE}/api/video/display-probe/`, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + }, + body: JSON.stringify(payload), + }) + if (!response.ok) { + throw new Error(`display probe post failed: ${response.status} ${response.statusText}`) + } +} + export function buildControlWebSocketUrl() { const url = new URL(API_BASE, window.location.origin) const basePath = url.pathname.replace(/\/$/, '') diff --git a/frontend/src/types.ts b/frontend/src/types.ts index 55709c3..35758f8 100644 --- a/frontend/src/types.ts +++ b/frontend/src/types.ts @@ -32,7 +32,9 @@ export interface SessionKcpStats { conv?: number rto_ms?: number srtt_ms?: number + min_srtt_ms?: number srttvar_ms?: number + last_feedback_age_ms?: number snd_wnd?: number rmt_wnd?: number inflight?: number @@ -133,6 +135,16 @@ export interface ControlSenderStatus { last_error: string } +export interface ControlAckReceiverStatus { + backend_ready: boolean + started: boolean + connected: boolean + peer_id: string + expected_sender: string + reconnect_count: number + last_error: string +} + export interface TelemetryReceiverStatus { hub_connected: boolean hub_updated_at: string | null @@ -151,6 +163,47 @@ export interface RobotHealthStatus { updated_at: string } +export interface VideoFreshnessStatus { + inter_frame_avg_ms: number | null + inter_frame_p95_ms: number | null + repeated_frame_ratio: number + skip_ratio: number + longest_freeze_ms: number + stale_frame_run_length: number + relative_freshness_lag_frames: number +} + +export interface LatencyEstimateStatus { + control_loop_rtt_ms: number | null + control_to_persist_est_ms: number | null + control_oneway_srtt_est_ms: number | null + control_oneway_bestcase_est_ms: number | null + video_network_oneway_est_ms: number | null + video_partial_est_ms: number | null + video_e2e_est_ms: number | null + estimate_method: { + control: string + video: string + } + clock_sync_required: boolean + assumptions: string[] + confidence: { + control: string + video: string + } +} + +export interface ControlAckStatus { + ack_available: boolean + updated_at: string | null + control_loop_rtt_ms: number | null + b_recv_to_persist_ms: number | null + control_oneway_network_est_ms: number | null + control_to_persist_est_ms: number | null + sample_reason: string | null + receiver: ControlAckReceiverStatus +} + export interface NetworkTelemetry { peer_status: string latency_ms: number | null @@ -178,6 +231,9 @@ export interface NetworkTelemetry { a_to_d: LinkTelemetry d_to_b: LinkTelemetry } + latency_estimate: LatencyEstimateStatus + video_freshness: VideoFreshnessStatus + control_ack_status: ControlAckStatus telemetry_receiver: TelemetryReceiverStatus robot_health: RobotHealthStatus ingress: { @@ -186,6 +242,7 @@ export interface NetworkTelemetry { control: { arbiter: ControlArbiterStatus sender: ControlSenderStatus + ack_receiver: ControlAckReceiverStatus } } @@ -198,12 +255,23 @@ export interface VideoStatus { source_detail?: string timing?: { available: boolean - latest_delta_ms: number | null - delta_samples_ms: number[] + sender_clock_delta_ms_raw: number | null + sender_clock_delta_samples_ms_raw: number[] sample_count: number sample_window_size: number timestamp_unit: string | null timestamp_endianness: string | null + unsynced_clock: boolean + } + freshness?: VideoFreshnessStatus + display_probe?: { + updated_at: string | null + frame_seq: number | null + frame_hash: string + input_to_next_fresh_frame_ms: number | null + input_to_next_changed_frame_ms: number | null + input_to_next_paint_ms: number | null + a_recv_to_paint_ms: number | null } receiver?: { backend_ready: boolean @@ -213,6 +281,11 @@ export interface VideoStatus { has_recent_frame: boolean frames_received: number latest_sequence: number | null + latest_frame_hash?: string + latest_backend_received_unix_ns?: number | null + latest_backend_received_mono_ns?: number | null + latest_frame_bytes?: number + latest_capture_to_send_ms?: number | null reconnect_count: number last_server_error: string last_error: string @@ -223,13 +296,15 @@ export interface VideoStatus { buffer_bytes?: number timing?: { available: boolean - latest_delta_ms: number | null - delta_samples_ms: number[] + sender_clock_delta_ms_raw: number | null + sender_clock_delta_samples_ms_raw: number[] sample_count: number sample_window_size: number timestamp_unit: string | null timestamp_endianness: string | null + unsynced_clock: boolean } + freshness?: VideoFreshnessStatus } } diff --git a/frontend/src/views/DashboardView.vue b/frontend/src/views/DashboardView.vue index 0cecb52..f38dbef 100644 --- a/frontend/src/views/DashboardView.vue +++ b/frontend/src/views/DashboardView.vue @@ -27,7 +27,7 @@ const { gps, network, video, errorMessage, headerStatus } = useMonitoringData()
- +
diff --git a/frontend/src/views/VideoView.vue b/frontend/src/views/VideoView.vue index 7332bf6..0c21a1e 100644 --- a/frontend/src/views/VideoView.vue +++ b/frontend/src/views/VideoView.vue @@ -2,7 +2,7 @@ import VideoPanel from '@/components/VideoPanel.vue' import { useMonitoringData } from '@/composables/useMonitoringData' -const { video, errorMessage, headerStatus } = useMonitoringData() +const { video, network, errorMessage, headerStatus } = useMonitoringData()