from __future__ import annotations from collections import deque from dataclasses import dataclass import hashlib import math import struct import sys import threading import time from typing import Any, Iterator from .common import ( JPEG_FRAME_DIR, OMNISOCKET_CONFIG_PATH, OMNISOCKET_FRAME_FRESH_SECONDS, JsonlRunLogger, VIDEO_SOURCE_MODE, VIDEO_TIMESTAMP_SAMPLE_SIZE, VIDEO_TRAILER_BYTES, VIDEO_TRAILER_ENDIANNESS, VIDEO_TRAILER_STRUCT, VIDEO_TRAILER_TIMESTAMP_MAX_SKEW_NS, VIDEO_TRAILER_TIMESTAMP_MULTIPLIER_NS, VIDEO_TRAILER_TIMESTAMP_UNIT, WORKSPACE_ROOT, load_omnisocket_config, ) def safe_kcp_stats(session: Any) -> dict[str, Any]: if session is None or not hasattr(session, "kcp_stats"): return {} try: return dict(session.kcp_stats()) except Exception: return {} class VideoDisplayProbeStore: def __init__(self) -> None: self._lock = threading.Lock() self._logger = JsonlRunLogger("BLITZ_A_VIDEO_DISPLAY_PROBE_LOG_PATH", "a-video-display-probe") self._latest: VideoDisplayProbeStatus = VideoDisplayProbeStatus( updated_at=None, frame_seq=None, frame_hash="", input_to_next_fresh_frame_ms=None, input_to_next_changed_frame_ms=None, input_to_next_paint_ms=None, a_recv_to_paint_ms=None, ) def record_event(self, payload: dict[str, Any]) -> None: backend_received_unix_ns = payload.get("backend_received_unix_ns") paint_unix_ms = payload.get("paint_unix_ms") a_recv_to_paint_ms = None try: if backend_received_unix_ns is not None and paint_unix_ms is not None: a_recv_to_paint_ms = round(float(paint_unix_ms) - (int(backend_received_unix_ns) / 1_000_000.0), 3) except (TypeError, ValueError): a_recv_to_paint_ms = None status = VideoDisplayProbeStatus( updated_at=str(payload.get("updated_at") or ""), frame_seq=int(payload["frame_seq"]) if payload.get("frame_seq") is not None else None, frame_hash=str(payload.get("frame_hash") or ""), input_to_next_fresh_frame_ms=self._coerce_float(payload.get("input_to_next_fresh_frame_ms")), input_to_next_changed_frame_ms=self._coerce_float(payload.get("input_to_next_changed_frame_ms")), input_to_next_paint_ms=self._coerce_float(payload.get("input_to_next_paint_ms")), a_recv_to_paint_ms=a_recv_to_paint_ms, ) with self._lock: self._latest = status self._logger.write(payload) def get_status(self) -> dict[str, Any]: with self._lock: latest = self._latest return { "updated_at": latest.updated_at, "frame_seq": latest.frame_seq, "frame_hash": latest.frame_hash, "input_to_next_fresh_frame_ms": latest.input_to_next_fresh_frame_ms, "input_to_next_changed_frame_ms": latest.input_to_next_changed_frame_ms, "input_to_next_paint_ms": latest.input_to_next_paint_ms, "a_recv_to_paint_ms": latest.a_recv_to_paint_ms, } def close(self) -> None: self._logger.close() @staticmethod def _coerce_float(value: Any) -> float | None: try: if value is None: return None return round(float(value), 3) except (TypeError, ValueError): return None @dataclass(frozen=True) class FrameTrailerMetadata: timestamp_ns: int latitude: float longitude: float capture_to_send_ms: int raw_latitude_hex: str raw_longitude_hex: str received_at: float @dataclass(frozen=True) class VideoDisplayProbeStatus: updated_at: str | None frame_seq: int | None frame_hash: str input_to_next_fresh_frame_ms: float | None input_to_next_changed_frame_ms: float | None input_to_next_paint_ms: float | None a_recv_to_paint_ms: float | None class OmniSocketVideoReceiver: def __init__(self) -> None: self._lock = threading.Lock() self._thread: threading.Thread | None = None self._started = False self._session = None self._session_cls = None self._binary_msg_type = None self._video_defaults: dict[str, Any] = {} self._latest_frame: bytes | None = None self._latest_received_at = 0.0 self._latest_backend_received_unix_ns = 0 self._latest_backend_received_mono_ns = 0 self._latest_sequence: int | None = None self._latest_metadata: FrameTrailerMetadata | None = None self._latest_sender_clock_delta_ms_raw: float | None = None self._latest_timestamp_unit: str | None = None self._latest_timestamp_endianness: str | None = None self._sender_clock_delta_samples_ms_raw: deque[float] = deque(maxlen=VIDEO_TIMESTAMP_SAMPLE_SIZE) self._latest_frame_hash = "" self._latest_frame_bytes = 0 self._last_frame_hash = "" self._last_sequence: int | None = None self._last_backend_received_mono_ns = 0 self._interarrival_ms_samples: deque[float] = deque(maxlen=120) self._repeat_samples: deque[int] = deque(maxlen=120) self._skip_samples: deque[int] = deque(maxlen=120) self._freeze_samples_ms: deque[float] = deque(maxlen=120) self._current_stale_frame_run_length = 0 self._latest_remote_video_srtt_ms: int | None = None self._frames_received = 0 self._last_error = "" self._reconnect_count = 0 self._ever_connected = False self._closing = threading.Event() self._frame_recv_logger = JsonlRunLogger("BLITZ_A_VIDEO_FRAME_RECV_LOG_PATH", "a-video-frame-recv") self._load_backend() def _load_backend(self) -> None: try: self._import_backend() except Exception as error: # pragma: no cover - optional runtime dependency self._last_error = f"omnisocket import failed: {error}" def _import_backend(self) -> None: try: from omnisocket import MSG_TYPE_BINARY, Session, VIDEO_DEFAULTS # type: ignore except ImportError: python_dir = WORKSPACE_ROOT / "OmniSocketGo" / "python" if python_dir.exists(): sys.path.insert(0, str(python_dir)) from omnisocket import MSG_TYPE_BINARY, Session, VIDEO_DEFAULTS # type: ignore self._binary_msg_type = MSG_TYPE_BINARY self._session_cls = Session self._video_defaults = dict(VIDEO_DEFAULTS) def ensure_started(self) -> None: if self._session_cls is None or self._binary_msg_type is None: return with self._lock: if self._started or self._closing.is_set(): return self._started = True self._thread = threading.Thread( target=self._run, name="omnisocket-video-receiver", daemon=True, ) self._thread.start() def _connect_session(self): assert self._session_cls is not None config = load_omnisocket_config() transport_cfg = config.get("transport", {}) video_cfg = config.get("video_receiver", {}) session = self._session_cls() session.connect( server_addr=str(transport_cfg.get("server_addr", "127.0.0.1:10909")), peer_id=str(video_cfg.get("peer_id", "peer-a-video")), relay_via=str(transport_cfg.get("relay_via", "")), bind_ip=str(transport_cfg.get("bind_ip", "")), bind_device=str(transport_cfg.get("bind_device", "")), **self._video_defaults, ) return session, int(video_cfg.get("buffer_bytes", 1024 * 1024)) def _extract_jpeg_payload(self, frame: bytes) -> bytes | None: if frame.startswith(b"\xff\xd8"): return frame if len(frame) > 8 and frame[8:10] == b"\xff\xd8": return frame[8:] return None def _split_jpeg_frame_and_trailer(self, frame: bytes) -> tuple[bytes, bytes] | None: jpeg_payload = self._extract_jpeg_payload(frame) if jpeg_payload is None: return None if jpeg_payload.endswith(b"\xff\xd9"): return jpeg_payload, b"" if ( len(jpeg_payload) >= VIDEO_TRAILER_BYTES + 2 and jpeg_payload[-(VIDEO_TRAILER_BYTES + 2) : -VIDEO_TRAILER_BYTES] == b"\xff\xd9" ): return jpeg_payload[:-VIDEO_TRAILER_BYTES], jpeg_payload[-VIDEO_TRAILER_BYTES:] eoi_index = jpeg_payload.rfind(b"\xff\xd9") if eoi_index < 0: return jpeg_payload, b"" trailer_start = eoi_index + 2 return jpeg_payload[:trailer_start], jpeg_payload[trailer_start:] def _extract_jpeg_frame(self, frame: bytes) -> bytes | None: split_payload = self._split_jpeg_frame_and_trailer(frame) if split_payload is None: return None jpeg_frame, _ = split_payload return jpeg_frame def _extract_sequence(self, frame: bytes) -> int | None: if len(frame) >= 8 and not frame.startswith(b"\xff\xd8"): return int.from_bytes(frame[:8], "big") return None def _extract_frame_tail(self, frame: bytes) -> bytes: split_payload = self._split_jpeg_frame_and_trailer(frame) if split_payload is None: return b"" _, trailer = split_payload return trailer def _extract_frame_metadata(self, frame: bytes, received_at: float | None = None) -> FrameTrailerMetadata | None: trailer = self._extract_frame_tail(frame) if len(trailer) != VIDEO_TRAILER_BYTES: return None try: timestamp_ms, latitude, longitude, capture_to_send_ms = VIDEO_TRAILER_STRUCT.unpack(trailer) except struct.error: return None if timestamp_ms <= 0: return None if not math.isfinite(latitude) or not math.isfinite(longitude): return None if not (-90.0 <= latitude <= 90.0) or not (-180.0 <= longitude <= 180.0): return None if abs(latitude) < 1e-9 and abs(longitude) < 1e-9: return None timestamp_ns = timestamp_ms * VIDEO_TRAILER_TIMESTAMP_MULTIPLIER_NS if abs(time.time_ns() - timestamp_ns) > VIDEO_TRAILER_TIMESTAMP_MAX_SKEW_NS: return None return FrameTrailerMetadata( timestamp_ns=timestamp_ns, latitude=latitude, longitude=longitude, capture_to_send_ms=int(capture_to_send_ms), raw_latitude_hex=trailer[8:16].hex(), raw_longitude_hex=trailer[16:24].hex(), received_at=received_at if received_at is not None else time.time(), ) def _has_fresh_frame_locked(self) -> bool: return self._latest_frame is not None and ( time.time() - self._latest_received_at <= OMNISOCKET_FRAME_FRESH_SECONDS ) def update_remote_video_srtt(self, srtt_ms: int | None) -> None: with self._lock: self._latest_remote_video_srtt_ms = srtt_ms def _freshness_payload_locked(self) -> dict[str, Any]: interarrival_samples = list(self._interarrival_ms_samples) repeat_samples = list(self._repeat_samples) skip_samples = list(self._skip_samples) freeze_samples_ms = list(self._freeze_samples_ms) inter_frame_avg_ms = round(sum(interarrival_samples) / len(interarrival_samples), 3) if interarrival_samples else None if interarrival_samples: ordered = sorted(interarrival_samples) p95_index = min(len(ordered) - 1, max(0, math.ceil(len(ordered) * 0.95) - 1)) inter_frame_p95_ms = round(ordered[p95_index], 3) else: inter_frame_p95_ms = None repeated_frame_ratio = round(sum(repeat_samples) / len(repeat_samples), 4) if repeat_samples else 0.0 total_skip = sum(skip_samples) expected_frames = len(skip_samples) + total_skip skip_ratio = round(total_skip / expected_frames, 4) if expected_frames > 0 else 0.0 longest_freeze_ms = round(max(freeze_samples_ms), 3) if freeze_samples_ms else 0.0 return { "inter_frame_avg_ms": inter_frame_avg_ms, "inter_frame_p95_ms": inter_frame_p95_ms, "repeated_frame_ratio": repeated_frame_ratio, "skip_ratio": skip_ratio, "longest_freeze_ms": longest_freeze_ms, "stale_frame_run_length": self._current_stale_frame_run_length, "relative_freshness_lag_frames": self._current_stale_frame_run_length + (skip_samples[-1] if skip_samples else 0), } def _frame_headers_locked(self) -> dict[str, str]: capture_to_send_ms = self._latest_metadata.capture_to_send_ms if self._latest_metadata is not None else None headers: dict[str, str] = {} if self._latest_sequence is not None: headers["X-Blitz-Frame-Seq"] = str(self._latest_sequence) if self._latest_backend_received_unix_ns > 0: headers["X-Blitz-Backend-Received-Unix-Ns"] = str(self._latest_backend_received_unix_ns) if self._latest_frame_hash: headers["X-Blitz-Frame-Hash"] = self._latest_frame_hash if capture_to_send_ms is not None: headers["X-Blitz-BSide-Capture-To-Send-Ms"] = str(capture_to_send_ms) return headers def get_latest_frame_headers(self) -> dict[str, str]: snapshot = self.get_latest_frame_snapshot() return snapshot[1] if snapshot is not None else {} def _run(self) -> None: while not self._closing.is_set(): try: session, buffer_bytes = self._connect_session() with self._lock: self._session = session self._last_error = "" if self._ever_connected: self._reconnect_count += 1 else: self._ever_connected = True buffer = bytearray(buffer_bytes) while not self._closing.is_set(): meta = session.recv_into(buffer, timeout_ms=1000) if meta is None: continue if meta.get("msg_type") != self._binary_msg_type: continue frame = bytes(buffer[: meta["body_len"]]) jpeg_frame = self._extract_jpeg_frame(frame) if jpeg_frame is None: self._last_error = "received non-JPEG binary frame" continue received_at = time.time() received_unix_ns = time.time_ns() received_mono_ns = time.monotonic_ns() frame_metadata = self._extract_frame_metadata(frame, received_at=received_at) sender_clock_delta_ms_raw = None if frame_metadata is not None: sender_clock_delta_ms_raw = round((received_unix_ns - frame_metadata.timestamp_ns) / 1_000_000, 3) unit = VIDEO_TRAILER_TIMESTAMP_UNIT endianness = VIDEO_TRAILER_ENDIANNESS else: unit = None endianness = None frame_sequence = self._extract_sequence(frame) frame_hash = hashlib.blake2s(jpeg_frame, digest_size=8).hexdigest() local_kcp = safe_kcp_stats(session) if self._frame_recv_logger.enabled else {} frame_log_record: dict[str, Any] | None = None with self._lock: interarrival_ms = None if self._last_backend_received_mono_ns > 0: interarrival_ms = round((received_mono_ns - self._last_backend_received_mono_ns) / 1_000_000, 3) self._interarrival_ms_samples.append(interarrival_ms) sequence_gap = 0 if frame_sequence is not None and self._last_sequence is not None and frame_sequence > self._last_sequence: sequence_gap = max(0, frame_sequence - self._last_sequence - 1) repeat_flag = bool(self._last_frame_hash) and frame_hash == self._last_frame_hash self._repeat_samples.append(1 if repeat_flag else 0) self._skip_samples.append(sequence_gap) if repeat_flag: self._current_stale_frame_run_length += 1 else: self._current_stale_frame_run_length = 0 if interarrival_ms is not None and repeat_flag: self._freeze_samples_ms.append(interarrival_ms) elif interarrival_ms is not None: self._freeze_samples_ms.append(0.0) self._latest_frame = jpeg_frame self._latest_received_at = received_at self._latest_backend_received_unix_ns = received_unix_ns self._latest_backend_received_mono_ns = received_mono_ns self._latest_sequence = frame_sequence self._last_sequence = frame_sequence self._latest_metadata = frame_metadata self._latest_sender_clock_delta_ms_raw = sender_clock_delta_ms_raw self._latest_timestamp_unit = unit self._latest_timestamp_endianness = endianness self._latest_frame_hash = frame_hash self._latest_frame_bytes = len(jpeg_frame) self._last_frame_hash = frame_hash self._last_backend_received_mono_ns = received_mono_ns if sender_clock_delta_ms_raw is not None: self._sender_clock_delta_samples_ms_raw.append(sender_clock_delta_ms_raw) self._frames_received += 1 if self._frame_recv_logger.enabled: frame_log_record = { "ts_unix_nano": received_unix_ns, "frame_seq": frame_sequence, "backend_received_unix_ns": received_unix_ns, "backend_received_mono_ns": received_mono_ns, "jpeg_bytes": len(jpeg_frame), "interarrival_ms": interarrival_ms, "sequence_gap": sequence_gap, "repeat_flag": repeat_flag, "skip_count": sequence_gap, "frame_hash": frame_hash, "a_to_d_video_srtt_ms": local_kcp.get("srtt_ms"), "d_to_b_video_srtt_ms": self._latest_remote_video_srtt_ms, "b_side_capture_to_send_ms": frame_metadata.capture_to_send_ms if frame_metadata is not None else None, "sender_clock_delta_ms_raw": sender_clock_delta_ms_raw, } if frame_log_record is not None: self._frame_recv_logger.write(frame_log_record) except Exception as error: # pragma: no cover - runtime integration path if not self._closing.is_set(): session_error = "" if self._session is not None: try: session_error = str(dict(self._session.stats()).get("last_server_error", "") or "") except Exception: session_error = "" self._last_error = session_error or str(error) time.sleep(2) finally: if self._session is not None: try: self._session.close() except Exception: pass with self._lock: self._session = None if self._closing.is_set(): self._started = False def get_latest_frame(self) -> bytes | None: snapshot = self.get_latest_frame_snapshot() return snapshot[0] if snapshot is not None else None def get_latest_frame_snapshot(self) -> tuple[bytes, dict[str, str]] | None: self.ensure_started() with self._lock: if not self._has_fresh_frame_locked(): return None if self._latest_frame is None: return None return self._latest_frame, self._frame_headers_locked() def get_latest_frame_metadata(self) -> FrameTrailerMetadata | None: self.ensure_started() with self._lock: if not self._has_fresh_frame_locked(): return None return self._latest_metadata def session_stats(self) -> dict[str, Any]: self.ensure_started() with self._lock: session = self._session if session is None: return {"connected": 0, "registered": 0, "last_server_error": self._last_error} try: return dict(session.stats()) except Exception: return {"connected": 0, "registered": 0, "last_server_error": self._last_error} def session_kcp_stats(self) -> dict[str, Any]: self.ensure_started() with self._lock: session = self._session return safe_kcp_stats(session) def get_status(self) -> dict[str, Any]: self.ensure_started() config = load_omnisocket_config() transport_cfg = config.get("transport", {}) video_cfg = config.get("video_receiver", {}) session_stats = self.session_stats() with self._lock: has_recent_frame = self._has_fresh_frame_locked() freshness_status = self._freshness_payload_locked() if has_recent_frame and self._latest_sender_clock_delta_ms_raw is not None: timing_status = { "available": True, "sender_clock_delta_ms_raw": self._latest_sender_clock_delta_ms_raw, "sender_clock_delta_samples_ms_raw": list(reversed(self._sender_clock_delta_samples_ms_raw)), "sample_count": len(self._sender_clock_delta_samples_ms_raw), "sample_window_size": VIDEO_TIMESTAMP_SAMPLE_SIZE, "timestamp_unit": self._latest_timestamp_unit, "timestamp_endianness": self._latest_timestamp_endianness, "unsynced_clock": True, } else: timing_status = { "available": False, "sender_clock_delta_ms_raw": None, "sender_clock_delta_samples_ms_raw": [], "sample_count": 0, "sample_window_size": VIDEO_TIMESTAMP_SAMPLE_SIZE, "timestamp_unit": None, "timestamp_endianness": None, "unsynced_clock": True, } return { "backend_ready": self._session_cls is not None, "mode": VIDEO_SOURCE_MODE, "connected": self._session is not None, "registered": bool(session_stats.get("registered", 0)), "has_recent_frame": has_recent_frame, "frames_received": self._frames_received, "latest_sequence": self._latest_sequence, "latest_frame_hash": self._latest_frame_hash, "latest_backend_received_unix_ns": self._latest_backend_received_unix_ns or None, "latest_backend_received_mono_ns": self._latest_backend_received_mono_ns or None, "latest_frame_bytes": self._latest_frame_bytes, "latest_capture_to_send_ms": self._latest_metadata.capture_to_send_ms if self._latest_metadata is not None else None, "reconnect_count": self._reconnect_count, "last_server_error": str(session_stats.get("last_server_error", "") or ""), "last_error": self._last_error, "config_path": str(OMNISOCKET_CONFIG_PATH), "server_addr": str(transport_cfg.get("server_addr", "")), "relay_via": str(transport_cfg.get("relay_via", "")), "peer_id": str(video_cfg.get("peer_id", "")), "buffer_bytes": int(video_cfg.get("buffer_bytes", 0)), "timing": timing_status, "freshness": freshness_status, } def close(self) -> None: self._closing.set() with self._lock: session = self._session if session is not None: try: session.close() except Exception: pass thread = self._thread if thread is not None and thread.is_alive(): thread.join(timeout=0.5) self._frame_recv_logger.close() class VideoFrameService: def __init__(self, receiver: OmniSocketVideoReceiver, display_probe_store: VideoDisplayProbeStore) -> None: self._receiver = receiver self._display_probe_store = display_probe_store def get_status(self) -> dict[str, Any]: receiver_status = self._receiver.get_status() receiver_frame = self._receiver.get_latest_frame() display_probe_status = self._display_probe_store.get_status() if receiver_frame is not None: return { "available": True, "source_mode": "omnisocket-jpeg-live", "frame_count": receiver_status["frames_received"], "fps": 30, "frame_dir": str(JPEG_FRAME_DIR), "source_detail": f"peer stream active, frames={receiver_status['frames_received']}", "receiver": receiver_status, "timing": receiver_status["timing"], "freshness": receiver_status.get("freshness", {}), "display_probe": display_probe_status, } wait_detail = receiver_status["last_error"] or ( "waiting for live OmniSocket JPEG frames; check the hub, sender, and receiver configuration" ) return { "available": False, "source_mode": "omnisocket-waiting", "frame_count": receiver_status["frames_received"], "fps": 30, "frame_dir": str(JPEG_FRAME_DIR), "source_detail": wait_detail, "receiver": receiver_status, "timing": receiver_status["timing"], "freshness": receiver_status.get("freshness", {}), "display_probe": display_probe_status, } def get_next_frame(self) -> bytes: receiver_frame = self._receiver.get_latest_frame() if receiver_frame is not None: return receiver_frame raise RuntimeError("no live OmniSocket JPEG frame is currently available") def get_next_frame_with_headers(self) -> tuple[bytes, dict[str, str]]: snapshot = self._receiver.get_latest_frame_snapshot() if snapshot is not None: return snapshot raise RuntimeError("no live OmniSocket JPEG frame is currently available") def get_latest_frame_headers(self) -> dict[str, str]: return self._receiver.get_latest_frame_headers() def record_display_probe(self, payload: dict[str, Any]) -> None: self._display_probe_store.record_event(payload) def iter_mjpeg(self, fps: float = 6.0) -> Iterator[bytes]: frame_interval = 1.0 / max(1.0, min(fps, 30.0)) while True: frame = self.get_next_frame() header = ( b"--frame\r\n" b"Content-Type: image/jpeg\r\n" + f"Content-Length: {len(frame)}\r\n\r\n".encode("ascii") ) yield header + frame + b"\r\n" time.sleep(frame_interval)