diff --git a/backend/monitoring/common.py b/backend/monitoring/common.py index 63d88cc..9989012 100644 --- a/backend/monitoring/common.py +++ b/backend/monitoring/common.py @@ -10,18 +10,18 @@ from typing import Any PROJECT_ROOT = Path(__file__).resolve().parents[2] WORKSPACE_ROOT = PROJECT_ROOT.parent JPEG_FRAME_DIR = WORKSPACE_ROOT / "RobotDataShow" / "jpeg-frames" -GEOSTREAM_JSON_PATH = WORKSPACE_ROOT / "GeoStream" / "gps_latest.json" -GEOSTREAM_STALE_SECONDS = 15 OMNISOCKET_CONFIG_PATH = PROJECT_ROOT / "config" / "omnisocket_demo.yaml" VIDEO_SOURCE_MODE = os.getenv("VIDEO_SOURCE_MODE", "auto").strip().lower() OMNISOCKET_FRAME_FRESH_SECONDS = 2.0 VIDEO_TIMESTAMP_SAMPLE_SIZE = 10 -VIDEO_TIMESTAMP_TRAILER_BYTES = 8 -VIDEO_TIMESTAMP_ENDIANNESS = "little" -VIDEO_TIMESTAMP_UNIT = "ms" -VIDEO_TIMESTAMP_MULTIPLIER_NS = 1_000_000 -VIDEO_TIMESTAMP_MAX_SKEW_NS = 7 * 24 * 60 * 60 * 1_000_000_000 +VIDEO_TRAILER_ENDIANNESS = "little" +VIDEO_TRAILER_TIMESTAMP_UNIT = "ms" +VIDEO_TRAILER_TIMESTAMP_MULTIPLIER_NS = 1_000_000 +VIDEO_TRAILER_TIMESTAMP_MAX_SKEW_NS = 7 * 24 * 60 * 60 * 1_000_000_000 +VIDEO_TRAILER_COORDINATE_FORMAT = "float32 little-endian" +VIDEO_TRAILER_STRUCT = struct.Struct(" tuple[str, int]: if port <= 0 or port > 65535: raise ValueError(f"invalid port in bind address: {bind_addr}") return host, port - diff --git a/backend/monitoring/services.py b/backend/monitoring/services.py index 677029a..8055d02 100644 --- a/backend/monitoring/services.py +++ b/backend/monitoring/services.py @@ -15,7 +15,7 @@ control_arbiter = ControlArbiter(_control_sender) native_control_ingress = NativeUdpControlIngress(control_arbiter) video_service = VideoFrameService(_video_receiver) -gps_service = GpsDataService() +gps_service = GpsDataService(_video_receiver) network_service = NetworkTelemetryService( _video_receiver, _control_sender, @@ -41,4 +41,3 @@ def shutdown_monitoring_services() -> None: atexit.register(shutdown_monitoring_services) - diff --git a/backend/monitoring/telemetry.py b/backend/monitoring/telemetry.py index 64a7a48..247dc73 100644 --- a/backend/monitoring/telemetry.py +++ b/backend/monitoring/telemetry.py @@ -2,7 +2,6 @@ from __future__ import annotations from collections import deque import json -import math import sys import threading import time @@ -10,14 +9,13 @@ from datetime import datetime, timezone from typing import Any from .common import ( - GEOSTREAM_JSON_PATH, - GEOSTREAM_STALE_SECONDS, + VIDEO_TRAILER_COORDINATE_FORMAT, WORKSPACE_ROOT, load_omnisocket_config, utc_iso_now, ) from .control import ControlArbiter, NativeUdpControlIngress, OmniSocketControlSender -from .video import OmniSocketVideoReceiver +from .video import FrameTrailerMetadata, OmniSocketVideoReceiver LOCAL_SAMPLE_INTERVAL_MS = 500 @@ -50,46 +48,45 @@ def _coerce_float(value: Any, default: float = 0.0) -> float: class GpsDataService: + def __init__(self, receiver: OmniSocketVideoReceiver) -> None: + self._receiver = receiver + def get_latest(self) -> dict[str, Any]: - payload = self._read_geostream_payload() - if payload is not None: - payload["source_mode"] = "geostream-json" - payload["updated_at"] = utc_iso_now() - return payload + metadata = self._receiver.get_latest_frame_metadata() + if metadata is None: + return self._build_waiting_payload() + return self._build_payload_from_metadata(metadata) - return self._build_simulated_payload() - - def _read_geostream_payload(self) -> dict[str, Any] | None: - if not GEOSTREAM_JSON_PATH.exists(): - return None - - age_seconds = time.time() - GEOSTREAM_JSON_PATH.stat().st_mtime - if age_seconds > GEOSTREAM_STALE_SECONDS: - return None - - try: - with GEOSTREAM_JSON_PATH.open("r", encoding="utf-8") as file: - return json.load(file) - except (OSError, json.JSONDecodeError): - return None - - def _build_simulated_payload(self) -> dict[str, Any]: - tick = time.time() / 12.0 - latitude = 31.2304 + math.sin(tick) * 0.0014 - longitude = 121.4737 + math.cos(tick) * 0.0018 + def _build_waiting_payload(self) -> dict[str, Any]: + return { + "has_fix": False, + "utc_time": "--:--:--", + "latitude": None, + "longitude": None, + "satellites": None, + "altitude_m": None, + "coordinate_system": "WGS84", + "source_sentence": "VIDEO_TRAILER", + "raw_coordinate_format": VIDEO_TRAILER_COORDINATE_FORMAT, + "source_mode": "video-frame-trailer-waiting", + "updated_at": "", + } + def _build_payload_from_metadata(self, metadata: FrameTrailerMetadata) -> dict[str, Any]: + timestamp_seconds = metadata.timestamp_ns / 1_000_000_000 + updated_at = _utc_from_epoch(metadata.received_at) or "" return { "has_fix": True, - "utc_time": datetime.now(timezone.utc).strftime("%H:%M:%S"), - "latitude": round(latitude, 6), - "longitude": round(longitude, 6), - "satellites": 14 + int((math.sin(tick * 0.7) + 1.0) * 2), - "altitude_m": round(6.5 + math.cos(tick * 0.5) * 1.2, 2), + "utc_time": datetime.fromtimestamp(timestamp_seconds, timezone.utc).strftime("%H:%M:%S"), + "latitude": round(metadata.latitude, 6), + "longitude": round(metadata.longitude, 6), + "satellites": None, + "altitude_m": None, "coordinate_system": "WGS84", - "source_sentence": "SIMULATED", - "raw_coordinate_format": "decimal degrees", - "source_mode": "simulated", - "updated_at": utc_iso_now(), + "source_sentence": "VIDEO_TRAILER", + "raw_coordinate_format": VIDEO_TRAILER_COORDINATE_FORMAT, + "source_mode": "video-frame-trailer", + "updated_at": updated_at, } diff --git a/backend/monitoring/video.py b/backend/monitoring/video.py index e183fc7..c33d463 100644 --- a/backend/monitoring/video.py +++ b/backend/monitoring/video.py @@ -1,6 +1,9 @@ from __future__ import annotations from collections import deque +from dataclasses import dataclass +import math +import struct import sys import threading import time @@ -10,14 +13,14 @@ from .common import ( JPEG_FRAME_DIR, OMNISOCKET_CONFIG_PATH, OMNISOCKET_FRAME_FRESH_SECONDS, - PROJECT_ROOT, VIDEO_SOURCE_MODE, - VIDEO_TIMESTAMP_ENDIANNESS, - VIDEO_TIMESTAMP_MAX_SKEW_NS, - VIDEO_TIMESTAMP_MULTIPLIER_NS, VIDEO_TIMESTAMP_SAMPLE_SIZE, - VIDEO_TIMESTAMP_TRAILER_BYTES, - VIDEO_TIMESTAMP_UNIT, + VIDEO_TRAILER_BYTES, + VIDEO_TRAILER_ENDIANNESS, + VIDEO_TRAILER_STRUCT, + VIDEO_TRAILER_TIMESTAMP_MAX_SKEW_NS, + VIDEO_TRAILER_TIMESTAMP_MULTIPLIER_NS, + VIDEO_TRAILER_TIMESTAMP_UNIT, WORKSPACE_ROOT, load_omnisocket_config, ) @@ -32,6 +35,14 @@ def safe_kcp_stats(session: Any) -> dict[str, Any]: return {} +@dataclass(frozen=True) +class FrameTrailerMetadata: + timestamp_ns: int + latitude: float + longitude: float + received_at: float + + class OmniSocketVideoReceiver: def __init__(self) -> None: self._lock = threading.Lock() @@ -44,6 +55,7 @@ class OmniSocketVideoReceiver: self._latest_frame: bytes | None = None self._latest_received_at = 0.0 self._latest_sequence: int | None = None + self._latest_metadata: FrameTrailerMetadata | None = None self._latest_latency_ms: float | None = None self._latest_timestamp_unit: str | None = None self._latest_timestamp_endianness: str | None = None @@ -123,10 +135,10 @@ class OmniSocketVideoReceiver: return jpeg_payload, b"" if ( - len(jpeg_payload) >= VIDEO_TIMESTAMP_TRAILER_BYTES + 2 - and jpeg_payload[-(VIDEO_TIMESTAMP_TRAILER_BYTES + 2) : -VIDEO_TIMESTAMP_TRAILER_BYTES] == b"\xff\xd9" + len(jpeg_payload) >= VIDEO_TRAILER_BYTES + 2 + and jpeg_payload[-(VIDEO_TRAILER_BYTES + 2) : -VIDEO_TRAILER_BYTES] == b"\xff\xd9" ): - return jpeg_payload[:-VIDEO_TIMESTAMP_TRAILER_BYTES], jpeg_payload[-VIDEO_TIMESTAMP_TRAILER_BYTES:] + return jpeg_payload[:-VIDEO_TRAILER_BYTES], jpeg_payload[-VIDEO_TRAILER_BYTES:] eoi_index = jpeg_payload.rfind(b"\xff\xd9") if eoi_index < 0: @@ -154,20 +166,38 @@ class OmniSocketVideoReceiver: _, trailer = split_payload return trailer - def _extract_frame_timestamp(self, frame: bytes) -> tuple[int, str, str] | None: + def _extract_frame_metadata(self, frame: bytes, received_at: float | None = None) -> FrameTrailerMetadata | None: trailer = self._extract_frame_tail(frame) - if len(trailer) != VIDEO_TIMESTAMP_TRAILER_BYTES: + if len(trailer) != VIDEO_TRAILER_BYTES: return None - value = int.from_bytes(trailer, VIDEO_TIMESTAMP_ENDIANNESS, signed=False) - if value <= 0: + try: + timestamp_ms, latitude, longitude = VIDEO_TRAILER_STRUCT.unpack(trailer) + except struct.error: return None - timestamp_ns = value * VIDEO_TIMESTAMP_MULTIPLIER_NS - if abs(time.time_ns() - timestamp_ns) > VIDEO_TIMESTAMP_MAX_SKEW_NS: + if timestamp_ms <= 0: + return None + if not math.isfinite(latitude) or not math.isfinite(longitude): + return None + if not (-90.0 <= latitude <= 90.0) or not (-180.0 <= longitude <= 180.0): return None - return timestamp_ns, VIDEO_TIMESTAMP_UNIT, VIDEO_TIMESTAMP_ENDIANNESS + timestamp_ns = timestamp_ms * VIDEO_TRAILER_TIMESTAMP_MULTIPLIER_NS + if abs(time.time_ns() - timestamp_ns) > VIDEO_TRAILER_TIMESTAMP_MAX_SKEW_NS: + return None + + return FrameTrailerMetadata( + timestamp_ns=timestamp_ns, + latitude=latitude, + longitude=longitude, + received_at=received_at if received_at is not None else time.time(), + ) + + def _has_fresh_frame_locked(self) -> bool: + return self._latest_frame is not None and ( + time.time() - self._latest_received_at <= OMNISOCKET_FRAME_FRESH_SECONDS + ) def _run(self) -> None: while not self._closing.is_set(): @@ -195,19 +225,22 @@ class OmniSocketVideoReceiver: self._last_error = "received non-JPEG binary frame" continue - timestamp_meta = self._extract_frame_timestamp(frame) + received_at = time.time() + frame_metadata = self._extract_frame_metadata(frame, received_at=received_at) latency_ms = None - if timestamp_meta is not None: - timestamp_ns, unit, endianness = timestamp_meta - latency_ms = round((time.time_ns() - timestamp_ns) / 1_000_000, 3) + if frame_metadata is not None: + latency_ms = round((time.time_ns() - frame_metadata.timestamp_ns) / 1_000_000, 3) + unit = VIDEO_TRAILER_TIMESTAMP_UNIT + endianness = VIDEO_TRAILER_ENDIANNESS else: unit = None endianness = None with self._lock: self._latest_frame = jpeg_frame - self._latest_received_at = time.time() + self._latest_received_at = received_at self._latest_sequence = self._extract_sequence(frame) + self._latest_metadata = frame_metadata self._latest_latency_ms = latency_ms self._latest_timestamp_unit = unit self._latest_timestamp_endianness = endianness @@ -238,12 +271,17 @@ class OmniSocketVideoReceiver: def get_latest_frame(self) -> bytes | None: self.ensure_started() with self._lock: - if self._latest_frame is None: - return None - if time.time() - self._latest_received_at > OMNISOCKET_FRAME_FRESH_SECONDS: + if not self._has_fresh_frame_locked(): return None return self._latest_frame + def get_latest_frame_metadata(self) -> FrameTrailerMetadata | None: + self.ensure_started() + with self._lock: + if not self._has_fresh_frame_locked(): + return None + return self._latest_metadata + def session_stats(self) -> dict[str, Any]: self.ensure_started() with self._lock: @@ -268,9 +306,7 @@ class OmniSocketVideoReceiver: video_cfg = config.get("video_receiver", {}) session_stats = self.session_stats() with self._lock: - has_recent_frame = self._latest_frame is not None and ( - time.time() - self._latest_received_at <= OMNISOCKET_FRAME_FRESH_SECONDS - ) + has_recent_frame = self._has_fresh_frame_locked() if has_recent_frame and self._latest_latency_ms is not None: timing_status = { "available": True, @@ -375,4 +411,3 @@ class VideoFrameService: ) yield header + frame + b"\r\n" time.sleep(frame_interval) -