from __future__ import annotations from collections import deque from dataclasses import dataclass import math import struct import sys import threading import time from typing import Any, Iterator from .common import ( JPEG_FRAME_DIR, OMNISOCKET_CONFIG_PATH, OMNISOCKET_FRAME_FRESH_SECONDS, VIDEO_SOURCE_MODE, VIDEO_TIMESTAMP_SAMPLE_SIZE, VIDEO_TRAILER_BYTES, VIDEO_TRAILER_ENDIANNESS, VIDEO_TRAILER_STRUCT, VIDEO_TRAILER_TIMESTAMP_MAX_SKEW_NS, VIDEO_TRAILER_TIMESTAMP_MULTIPLIER_NS, VIDEO_TRAILER_TIMESTAMP_UNIT, WORKSPACE_ROOT, load_omnisocket_config, ) def safe_kcp_stats(session: Any) -> dict[str, Any]: if session is None or not hasattr(session, "kcp_stats"): return {} try: return dict(session.kcp_stats()) except Exception: return {} @dataclass(frozen=True) class FrameTrailerMetadata: timestamp_ns: int latitude: float longitude: float received_at: float class OmniSocketVideoReceiver: def __init__(self) -> None: self._lock = threading.Lock() self._thread: threading.Thread | None = None self._started = False self._session = None self._session_cls = None self._binary_msg_type = None self._video_defaults: dict[str, Any] = {} self._latest_frame: bytes | None = None self._latest_received_at = 0.0 self._latest_sequence: int | None = None self._latest_metadata: FrameTrailerMetadata | None = None self._latest_latency_ms: float | None = None self._latest_timestamp_unit: str | None = None self._latest_timestamp_endianness: str | None = None self._latency_samples_ms: deque[float] = deque(maxlen=VIDEO_TIMESTAMP_SAMPLE_SIZE) self._frames_received = 0 self._last_error = "" self._reconnect_count = 0 self._ever_connected = False self._closing = threading.Event() self._load_backend() def _load_backend(self) -> None: try: self._import_backend() except Exception as error: # pragma: no cover - optional runtime dependency self._last_error = f"omnisocket import failed: {error}" def _import_backend(self) -> None: try: from omnisocket import MSG_TYPE_BINARY, Session, VIDEO_DEFAULTS # type: ignore except ImportError: python_dir = WORKSPACE_ROOT / "OmniSocketGo" / "python" if python_dir.exists(): sys.path.insert(0, str(python_dir)) from omnisocket import MSG_TYPE_BINARY, Session, VIDEO_DEFAULTS # type: ignore self._binary_msg_type = MSG_TYPE_BINARY self._session_cls = Session self._video_defaults = dict(VIDEO_DEFAULTS) def ensure_started(self) -> None: if self._session_cls is None or self._binary_msg_type is None: return with self._lock: if self._started or self._closing.is_set(): return self._started = True self._thread = threading.Thread( target=self._run, name="omnisocket-video-receiver", daemon=True, ) self._thread.start() def _connect_session(self): assert self._session_cls is not None config = load_omnisocket_config() transport_cfg = config.get("transport", {}) video_cfg = config.get("video_receiver", {}) session = self._session_cls() session.connect( server_addr=str(transport_cfg.get("server_addr", "127.0.0.1:10909")), peer_id=str(video_cfg.get("peer_id", "peer-a-video")), relay_via=str(transport_cfg.get("relay_via", "")), bind_ip=str(transport_cfg.get("bind_ip", "")), bind_device=str(transport_cfg.get("bind_device", "")), **self._video_defaults, ) return session, int(video_cfg.get("buffer_bytes", 1024 * 1024)) def _extract_jpeg_payload(self, frame: bytes) -> bytes | None: if frame.startswith(b"\xff\xd8"): return frame if len(frame) > 8 and frame[8:10] == b"\xff\xd8": return frame[8:] return None def _split_jpeg_frame_and_trailer(self, frame: bytes) -> tuple[bytes, bytes] | None: jpeg_payload = self._extract_jpeg_payload(frame) if jpeg_payload is None: return None if jpeg_payload.endswith(b"\xff\xd9"): return jpeg_payload, b"" if ( len(jpeg_payload) >= VIDEO_TRAILER_BYTES + 2 and jpeg_payload[-(VIDEO_TRAILER_BYTES + 2) : -VIDEO_TRAILER_BYTES] == b"\xff\xd9" ): return jpeg_payload[:-VIDEO_TRAILER_BYTES], jpeg_payload[-VIDEO_TRAILER_BYTES:] eoi_index = jpeg_payload.rfind(b"\xff\xd9") if eoi_index < 0: return jpeg_payload, b"" trailer_start = eoi_index + 2 return jpeg_payload[:trailer_start], jpeg_payload[trailer_start:] def _extract_jpeg_frame(self, frame: bytes) -> bytes | None: split_payload = self._split_jpeg_frame_and_trailer(frame) if split_payload is None: return None jpeg_frame, _ = split_payload return jpeg_frame def _extract_sequence(self, frame: bytes) -> int | None: if len(frame) >= 8 and not frame.startswith(b"\xff\xd8"): return int.from_bytes(frame[:8], "big") return None def _extract_frame_tail(self, frame: bytes) -> bytes: split_payload = self._split_jpeg_frame_and_trailer(frame) if split_payload is None: return b"" _, trailer = split_payload return trailer def _extract_frame_metadata(self, frame: bytes, received_at: float | None = None) -> FrameTrailerMetadata | None: trailer = self._extract_frame_tail(frame) if len(trailer) != VIDEO_TRAILER_BYTES: return None try: timestamp_ms, latitude, longitude = VIDEO_TRAILER_STRUCT.unpack(trailer) except struct.error: return None if timestamp_ms <= 0: return None if not math.isfinite(latitude) or not math.isfinite(longitude): return None if not (-90.0 <= latitude <= 90.0) or not (-180.0 <= longitude <= 180.0): return None if abs(latitude) < 1e-9 and abs(longitude) < 1e-9: return None timestamp_ns = timestamp_ms * VIDEO_TRAILER_TIMESTAMP_MULTIPLIER_NS if abs(time.time_ns() - timestamp_ns) > VIDEO_TRAILER_TIMESTAMP_MAX_SKEW_NS: return None return FrameTrailerMetadata( timestamp_ns=timestamp_ns, latitude=latitude, longitude=longitude, raw_latitude_hex=trailer[8:16].hex(), raw_longitude_hex=trailer[16:24].hex(), received_at=received_at if received_at is not None else time.time(), ) def _has_fresh_frame_locked(self) -> bool: return self._latest_frame is not None and ( time.time() - self._latest_received_at <= OMNISOCKET_FRAME_FRESH_SECONDS ) def _run(self) -> None: while not self._closing.is_set(): try: session, buffer_bytes = self._connect_session() with self._lock: self._session = session self._last_error = "" if self._ever_connected: self._reconnect_count += 1 else: self._ever_connected = True buffer = bytearray(buffer_bytes) while not self._closing.is_set(): meta = session.recv_into(buffer, timeout_ms=1000) if meta is None: continue if meta.get("msg_type") != self._binary_msg_type: continue frame = bytes(buffer[: meta["body_len"]]) jpeg_frame = self._extract_jpeg_frame(frame) if jpeg_frame is None: self._last_error = "received non-JPEG binary frame" continue received_at = time.time() frame_metadata = self._extract_frame_metadata(frame, received_at=received_at) latency_ms = None if frame_metadata is not None: latency_ms = round((time.time_ns() - frame_metadata.timestamp_ns) / 1_000_000, 3) unit = VIDEO_TRAILER_TIMESTAMP_UNIT endianness = VIDEO_TRAILER_ENDIANNESS else: unit = None endianness = None with self._lock: self._latest_frame = jpeg_frame self._latest_received_at = received_at self._latest_sequence = self._extract_sequence(frame) self._latest_metadata = frame_metadata self._latest_latency_ms = latency_ms self._latest_timestamp_unit = unit self._latest_timestamp_endianness = endianness if latency_ms is not None: self._latency_samples_ms.append(latency_ms) self._frames_received += 1 except Exception as error: # pragma: no cover - runtime integration path if not self._closing.is_set(): session_error = "" if self._session is not None: try: session_error = str(dict(self._session.stats()).get("last_server_error", "") or "") except Exception: session_error = "" self._last_error = session_error or str(error) time.sleep(2) finally: if self._session is not None: try: self._session.close() except Exception: pass with self._lock: self._session = None if self._closing.is_set(): self._started = False def get_latest_frame(self) -> bytes | None: self.ensure_started() with self._lock: if not self._has_fresh_frame_locked(): return None return self._latest_frame def get_latest_frame_metadata(self) -> FrameTrailerMetadata | None: self.ensure_started() with self._lock: if not self._has_fresh_frame_locked(): return None return self._latest_metadata def session_stats(self) -> dict[str, Any]: self.ensure_started() with self._lock: session = self._session if session is None: return {"connected": 0, "registered": 0, "last_server_error": self._last_error} try: return dict(session.stats()) except Exception: return {"connected": 0, "registered": 0, "last_server_error": self._last_error} def session_kcp_stats(self) -> dict[str, Any]: self.ensure_started() with self._lock: session = self._session return safe_kcp_stats(session) def get_status(self) -> dict[str, Any]: self.ensure_started() config = load_omnisocket_config() transport_cfg = config.get("transport", {}) video_cfg = config.get("video_receiver", {}) session_stats = self.session_stats() with self._lock: has_recent_frame = self._has_fresh_frame_locked() if has_recent_frame and self._latest_latency_ms is not None: timing_status = { "available": True, "latest_delta_ms": self._latest_latency_ms, "delta_samples_ms": list(reversed(self._latency_samples_ms)), "sample_count": len(self._latency_samples_ms), "sample_window_size": VIDEO_TIMESTAMP_SAMPLE_SIZE, "timestamp_unit": self._latest_timestamp_unit, "timestamp_endianness": self._latest_timestamp_endianness, } else: timing_status = { "available": False, "latest_delta_ms": None, "delta_samples_ms": [], "sample_count": 0, "sample_window_size": VIDEO_TIMESTAMP_SAMPLE_SIZE, "timestamp_unit": None, "timestamp_endianness": None, } return { "backend_ready": self._session_cls is not None, "mode": VIDEO_SOURCE_MODE, "connected": self._session is not None, "registered": bool(session_stats.get("registered", 0)), "has_recent_frame": has_recent_frame, "frames_received": self._frames_received, "latest_sequence": self._latest_sequence, "reconnect_count": self._reconnect_count, "last_server_error": str(session_stats.get("last_server_error", "") or ""), "last_error": self._last_error, "config_path": str(OMNISOCKET_CONFIG_PATH), "server_addr": str(transport_cfg.get("server_addr", "")), "relay_via": str(transport_cfg.get("relay_via", "")), "peer_id": str(video_cfg.get("peer_id", "")), "buffer_bytes": int(video_cfg.get("buffer_bytes", 0)), "timing": timing_status, } def close(self) -> None: self._closing.set() with self._lock: session = self._session if session is not None: try: session.close() except Exception: pass thread = self._thread if thread is not None and thread.is_alive(): thread.join(timeout=0.5) class VideoFrameService: def __init__(self, receiver: OmniSocketVideoReceiver) -> None: self._receiver = receiver def get_status(self) -> dict[str, Any]: receiver_status = self._receiver.get_status() receiver_frame = self._receiver.get_latest_frame() if receiver_frame is not None: return { "available": True, "source_mode": "omnisocket-jpeg-live", "frame_count": receiver_status["frames_received"], "fps": 30, "frame_dir": str(JPEG_FRAME_DIR), "source_detail": f"peer stream active, frames={receiver_status['frames_received']}", "receiver": receiver_status, "timing": receiver_status["timing"], } wait_detail = receiver_status["last_error"] or ( "waiting for live OmniSocket JPEG frames; check the hub, sender, and receiver configuration" ) return { "available": False, "source_mode": "omnisocket-waiting", "frame_count": receiver_status["frames_received"], "fps": 30, "frame_dir": str(JPEG_FRAME_DIR), "source_detail": wait_detail, "receiver": receiver_status, "timing": receiver_status["timing"], } def get_next_frame(self) -> bytes: receiver_frame = self._receiver.get_latest_frame() if receiver_frame is not None: return receiver_frame raise RuntimeError("no live OmniSocket JPEG frame is currently available") def iter_mjpeg(self, fps: float = 6.0) -> Iterator[bytes]: frame_interval = 1.0 / max(1.0, min(fps, 30.0)) while True: frame = self.get_next_frame() header = ( b"--frame\r\n" b"Content-Type: image/jpeg\r\n" + f"Content-Length: {len(frame)}\r\n\r\n".encode("ascii") ) yield header + frame + b"\r\n" time.sleep(frame_interval)