from __future__ import annotations import json import math import sys import time from datetime import UTC, datetime from pathlib import Path from typing import Any, Iterator PROJECT_ROOT = Path(__file__).resolve().parents[2] WORKSPACE_ROOT = PROJECT_ROOT.parent GEOSTREAM_JSON_PATH = WORKSPACE_ROOT / "GeoStream" / "gps_latest.json" GEOSTREAM_STALE_SECONDS = 15 DAEMON_FRAME_URI = "omni-daemon://latest-frame" def utc_iso_now() -> str: return datetime.now(UTC).isoformat(timespec="seconds").replace("+00:00", "Z") def _load_daemon_client_api(): try: from omnisocket_a_side.client import OmniDaemonClient, OmniDaemonError except ImportError: python_dir = WORKSPACE_ROOT / "OmniSocketGo" / "python" if python_dir.exists(): sys.path.insert(0, str(python_dir)) from omnisocket_a_side.client import OmniDaemonClient, OmniDaemonError return OmniDaemonClient, OmniDaemonError _OmniDaemonClient, OmniDaemonError = _load_daemon_client_api() _daemon_client = _OmniDaemonClient() def get_daemon_client(): return _daemon_client def _default_receiver(error_message: str) -> dict[str, Any]: return { "backend_ready": False, "mode": "daemon", "connected": False, "has_recent_frame": False, "frames_received": 0, "latest_sequence": None, "last_error": error_message, "config_path": "", "server_addr": "", "relay_via": "", "peer_id": "", "buffer_bytes": 0, } class VideoFrameService: def get_status(self) -> dict[str, Any]: try: state = get_daemon_client().get_state() except OmniDaemonError as error: return { "available": False, "source_mode": "daemon-unavailable", "frame_count": 0, "fps": 5, "frame_dir": DAEMON_FRAME_URI, "source_detail": str(error), "receiver": _default_receiver(str(error)), } video = dict(state.get("video") or {}) receiver = dict(video.get("receiver") or {}) profile = dict((state.get("policy") or {}).get("recommended_video_profile") or {}) return { "available": bool(video.get("available", False)), "source_mode": str(video.get("source_mode") or "omnisocket-waiting"), "frame_count": int( video.get("frame_count", receiver.get("frames_received", 0)) or 0 ), "fps": int(video.get("fps", profile.get("fps", 5)) or 5), "frame_dir": str(video.get("frame_dir") or DAEMON_FRAME_URI), "source_detail": str( video.get("source_detail") or receiver.get("last_error") or "waiting for latest JPEG frame from daemon" ), "receiver": { "backend_ready": bool(receiver.get("backend_ready", True)), "mode": str(receiver.get("mode") or "daemon"), "connected": bool(receiver.get("connected", False)), "has_recent_frame": bool(receiver.get("has_recent_frame", False)), "frames_received": int(receiver.get("frames_received", 0) or 0), "latest_sequence": receiver.get("latest_sequence"), "last_error": str(receiver.get("last_error") or ""), "config_path": str(receiver.get("config_path") or ""), "server_addr": str(receiver.get("server_addr") or ""), "relay_via": str(receiver.get("relay_via") or ""), "peer_id": str(receiver.get("peer_id") or ""), "buffer_bytes": int(receiver.get("buffer_bytes", 0) or 0), }, } def get_next_frame(self) -> bytes: try: return get_daemon_client().get_video_frame() except OmniDaemonError as error: raise RuntimeError(str(error)) from error def iter_mjpeg(self, fps: float = 6.0) -> Iterator[bytes]: frame_interval = 1.0 / max(1.0, min(fps, 30.0)) while True: frame = self.get_next_frame() header = ( b"--frame\r\n" b"Content-Type: image/jpeg\r\n" + f"Content-Length: {len(frame)}\r\n\r\n".encode("ascii") ) yield header + frame + b"\r\n" time.sleep(frame_interval) class GpsDataService: def get_latest(self) -> dict[str, Any]: payload = self._read_geostream_payload() if payload is not None: payload["source_mode"] = "geostream-json" payload["updated_at"] = utc_iso_now() return payload return self._build_simulated_payload() def _read_geostream_payload(self) -> dict[str, Any] | None: if not GEOSTREAM_JSON_PATH.exists(): return None age_seconds = time.time() - GEOSTREAM_JSON_PATH.stat().st_mtime if age_seconds > GEOSTREAM_STALE_SECONDS: return None try: with GEOSTREAM_JSON_PATH.open("r", encoding="utf-8") as file: return json.load(file) except (OSError, json.JSONDecodeError): return None def _build_simulated_payload(self) -> dict[str, Any]: tick = time.time() / 12.0 latitude = 31.2304 + math.sin(tick) * 0.0014 longitude = 121.4737 + math.cos(tick) * 0.0018 return { "has_fix": True, "utc_time": datetime.now(UTC).strftime("%H:%M:%S"), "latitude": round(latitude, 6), "longitude": round(longitude, 6), "satellites": 14 + int((math.sin(tick * 0.7) + 1.0) * 2), "altitude_m": round(6.5 + math.cos(tick * 0.5) * 1.2, 2), "coordinate_system": "WGS84", "source_sentence": "SIMULATED", "raw_coordinate_format": "decimal degrees", "source_mode": "simulated", "updated_at": utc_iso_now(), } class NetworkTelemetryService: def get_latest(self) -> dict[str, Any]: try: state = get_daemon_client().get_state() except OmniDaemonError as error: return { "peer_status": "offline", "latency_ms": 0.0, "jitter_ms": 0.0, "retrans_pct": 0.0, "packet_loss_pct": 0.0, "tx_kbps": 0, "rx_kbps": 0, "signal_dbm": None, "transport": "OmniSocket / daemon", "source_mode": "daemon-unavailable", "updated_at": utc_iso_now(), "error": str(error), } network = dict(state.get("network") or {}) return { "peer_status": str(network.get("peer_status") or "offline"), "latency_ms": float(network.get("latency_ms", 0.0) or 0.0), "jitter_ms": float(network.get("jitter_ms", 0.0) or 0.0), "retrans_pct": float( network.get("retrans_pct", network.get("packet_loss_pct", 0.0)) or 0.0 ), "packet_loss_pct": float( network.get("packet_loss_pct", network.get("retrans_pct", 0.0)) or 0.0 ), "tx_kbps": int(network.get("tx_kbps", 0) or 0), "rx_kbps": int(network.get("rx_kbps", 0) or 0), "signal_dbm": network.get("signal_dbm"), "transport": str(network.get("transport") or "OmniSocket / daemon"), "source_mode": str(network.get("source_mode") or "daemon-live"), "updated_at": str(network.get("updated_at") or utc_iso_now()), } video_service = VideoFrameService() gps_service = GpsDataService() network_service = NetworkTelemetryService()