Files

213 lines
7.6 KiB
Python

from __future__ import annotations
import json
import math
import sys
import time
from datetime import UTC, datetime
from pathlib import Path
from typing import Any, Iterator
PROJECT_ROOT = Path(__file__).resolve().parents[2]
WORKSPACE_ROOT = PROJECT_ROOT.parent
GEOSTREAM_JSON_PATH = WORKSPACE_ROOT / "GeoStream" / "gps_latest.json"
GEOSTREAM_STALE_SECONDS = 15
DAEMON_FRAME_URI = "omni-daemon://latest-frame"
def utc_iso_now() -> str:
return datetime.now(UTC).isoformat(timespec="seconds").replace("+00:00", "Z")
def _load_daemon_client_api():
try:
from omnisocket_a_side.client import OmniDaemonClient, OmniDaemonError
except ImportError:
python_dir = WORKSPACE_ROOT / "OmniSocketGo" / "python"
if python_dir.exists():
sys.path.insert(0, str(python_dir))
from omnisocket_a_side.client import OmniDaemonClient, OmniDaemonError
return OmniDaemonClient, OmniDaemonError
_OmniDaemonClient, OmniDaemonError = _load_daemon_client_api()
_daemon_client = _OmniDaemonClient()
def get_daemon_client():
return _daemon_client
def _default_receiver(error_message: str) -> dict[str, Any]:
return {
"backend_ready": False,
"mode": "daemon",
"connected": False,
"has_recent_frame": False,
"frames_received": 0,
"latest_sequence": None,
"last_error": error_message,
"config_path": "",
"server_addr": "",
"relay_via": "",
"peer_id": "",
"buffer_bytes": 0,
}
class VideoFrameService:
def get_status(self) -> dict[str, Any]:
try:
state = get_daemon_client().get_state()
except OmniDaemonError as error:
return {
"available": False,
"source_mode": "daemon-unavailable",
"frame_count": 0,
"fps": 5,
"frame_dir": DAEMON_FRAME_URI,
"source_detail": str(error),
"receiver": _default_receiver(str(error)),
}
video = dict(state.get("video") or {})
receiver = dict(video.get("receiver") or {})
profile = dict((state.get("policy") or {}).get("recommended_video_profile") or {})
return {
"available": bool(video.get("available", False)),
"source_mode": str(video.get("source_mode") or "omnisocket-waiting"),
"frame_count": int(
video.get("frame_count", receiver.get("frames_received", 0)) or 0
),
"fps": int(video.get("fps", profile.get("fps", 5)) or 5),
"frame_dir": str(video.get("frame_dir") or DAEMON_FRAME_URI),
"source_detail": str(
video.get("source_detail")
or receiver.get("last_error")
or "waiting for latest JPEG frame from daemon"
),
"receiver": {
"backend_ready": bool(receiver.get("backend_ready", True)),
"mode": str(receiver.get("mode") or "daemon"),
"connected": bool(receiver.get("connected", False)),
"has_recent_frame": bool(receiver.get("has_recent_frame", False)),
"frames_received": int(receiver.get("frames_received", 0) or 0),
"latest_sequence": receiver.get("latest_sequence"),
"last_error": str(receiver.get("last_error") or ""),
"config_path": str(receiver.get("config_path") or ""),
"server_addr": str(receiver.get("server_addr") or ""),
"relay_via": str(receiver.get("relay_via") or ""),
"peer_id": str(receiver.get("peer_id") or ""),
"buffer_bytes": int(receiver.get("buffer_bytes", 0) or 0),
},
}
def get_next_frame(self) -> bytes:
try:
return get_daemon_client().get_video_frame()
except OmniDaemonError as error:
raise RuntimeError(str(error)) from error
def iter_mjpeg(self, fps: float = 6.0) -> Iterator[bytes]:
frame_interval = 1.0 / max(1.0, min(fps, 30.0))
while True:
frame = self.get_next_frame()
header = (
b"--frame\r\n"
b"Content-Type: image/jpeg\r\n"
+ f"Content-Length: {len(frame)}\r\n\r\n".encode("ascii")
)
yield header + frame + b"\r\n"
time.sleep(frame_interval)
class GpsDataService:
def get_latest(self) -> dict[str, Any]:
payload = self._read_geostream_payload()
if payload is not None:
payload["source_mode"] = "geostream-json"
payload["updated_at"] = utc_iso_now()
return payload
return self._build_simulated_payload()
def _read_geostream_payload(self) -> dict[str, Any] | None:
if not GEOSTREAM_JSON_PATH.exists():
return None
age_seconds = time.time() - GEOSTREAM_JSON_PATH.stat().st_mtime
if age_seconds > GEOSTREAM_STALE_SECONDS:
return None
try:
with GEOSTREAM_JSON_PATH.open("r", encoding="utf-8") as file:
return json.load(file)
except (OSError, json.JSONDecodeError):
return None
def _build_simulated_payload(self) -> dict[str, Any]:
tick = time.time() / 12.0
latitude = 31.2304 + math.sin(tick) * 0.0014
longitude = 121.4737 + math.cos(tick) * 0.0018
return {
"has_fix": True,
"utc_time": datetime.now(UTC).strftime("%H:%M:%S"),
"latitude": round(latitude, 6),
"longitude": round(longitude, 6),
"satellites": 14 + int((math.sin(tick * 0.7) + 1.0) * 2),
"altitude_m": round(6.5 + math.cos(tick * 0.5) * 1.2, 2),
"coordinate_system": "WGS84",
"source_sentence": "SIMULATED",
"raw_coordinate_format": "decimal degrees",
"source_mode": "simulated",
"updated_at": utc_iso_now(),
}
class NetworkTelemetryService:
def get_latest(self) -> dict[str, Any]:
try:
state = get_daemon_client().get_state()
except OmniDaemonError as error:
return {
"peer_status": "offline",
"latency_ms": 0.0,
"jitter_ms": 0.0,
"retrans_pct": 0.0,
"packet_loss_pct": 0.0,
"tx_kbps": 0,
"rx_kbps": 0,
"signal_dbm": None,
"transport": "OmniSocket / daemon",
"source_mode": "daemon-unavailable",
"updated_at": utc_iso_now(),
"error": str(error),
}
network = dict(state.get("network") or {})
return {
"peer_status": str(network.get("peer_status") or "offline"),
"latency_ms": float(network.get("latency_ms", 0.0) or 0.0),
"jitter_ms": float(network.get("jitter_ms", 0.0) or 0.0),
"retrans_pct": float(
network.get("retrans_pct", network.get("packet_loss_pct", 0.0)) or 0.0
),
"packet_loss_pct": float(
network.get("packet_loss_pct", network.get("retrans_pct", 0.0)) or 0.0
),
"tx_kbps": int(network.get("tx_kbps", 0) or 0),
"rx_kbps": int(network.get("rx_kbps", 0) or 0),
"signal_dbm": network.get("signal_dbm"),
"transport": str(network.get("transport") or "OmniSocket / daemon"),
"source_mode": str(network.get("source_mode") or "daemon-live"),
"updated_at": str(network.get("updated_at") or utc_iso_now()),
}
video_service = VideoFrameService()
gps_service = GpsDataService()
network_service = NetworkTelemetryService()