904 lines
39 KiB
Python
904 lines
39 KiB
Python
from __future__ import annotations
|
|
|
|
from collections import deque
|
|
import json
|
|
import os
|
|
import sys
|
|
import threading
|
|
import time
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
from .common import (
|
|
VIDEO_TRAILER_COORDINATE_FORMAT,
|
|
WORKSPACE_ROOT,
|
|
load_omnisocket_config,
|
|
utc_iso_now,
|
|
)
|
|
from .control import ControlAckTracker, ControlArbiter, NativeUdpControlIngress, OmniSocketControlAckReceiver, OmniSocketControlSender
|
|
from .video import FrameTrailerMetadata, OmniSocketVideoReceiver, VideoDisplayProbeStore
|
|
|
|
|
|
LOCAL_SAMPLE_INTERVAL_MS = 500
|
|
TREND_HISTORY_SIZE = 10
|
|
TREND_WINDOW_SIZE = 5
|
|
BLITZ_RUNTIME_DIR = Path(os.getenv("BLITZ_RUNTIME_DIR", "/run/blitz-robot"))
|
|
WATCHDOG_STATUS_PATH = BLITZ_RUNTIME_DIR / "watchdog.status.json"
|
|
WATCHDOG_STATUS_STALE_MS = max(int(os.getenv("BLITZ_HEALTH_STALE_SEC", "15")), 1) * 1000
|
|
WATCHDOG_FAULT_REASON_MAP: dict[str, tuple[str, str | None]] = {
|
|
"": ("none", None),
|
|
"none": ("none", None),
|
|
"camera_missing": ("video_pipeline_stalled", "degraded"),
|
|
"camera_recovered": ("video_session_recovering", "recovering"),
|
|
"camera-reappeared-escalated": ("video_session_recovering", "recovering"),
|
|
"bside_status_stale": ("video_session_recovering", "recovering"),
|
|
"bside-unhealthy-escalated": ("video_session_recovering", "recovering"),
|
|
"ros_receiver_unhealthy": ("control_session_recovering", "recovering"),
|
|
"ros-unhealthy": ("control_session_recovering", "recovering"),
|
|
"network_or_robot_unreachable": ("network_or_robot_unreachable", "recovering"),
|
|
"network-recovered-ros-unhealthy": ("control_session_recovering", "recovering"),
|
|
"network-recovered-escalated": ("network_or_robot_unreachable", "recovering"),
|
|
}
|
|
|
|
|
|
def _utc_from_epoch(epoch_seconds: float | None) -> str | None:
|
|
if epoch_seconds is None or epoch_seconds <= 0.0:
|
|
return None
|
|
return datetime.fromtimestamp(epoch_seconds, timezone.utc).isoformat(timespec="seconds").replace("+00:00", "Z")
|
|
|
|
|
|
def _coerce_int(value: Any, default: int = 0) -> int:
|
|
try:
|
|
if value is None:
|
|
return default
|
|
return int(value)
|
|
except (TypeError, ValueError):
|
|
return default
|
|
|
|
|
|
def _coerce_float(value: Any, default: float = 0.0) -> float:
|
|
try:
|
|
if value is None:
|
|
return default
|
|
return float(value)
|
|
except (TypeError, ValueError):
|
|
return default
|
|
|
|
|
|
def _load_optional_json(path: Path) -> dict[str, Any] | None:
|
|
try:
|
|
if not path.exists():
|
|
return None
|
|
with path.open("r", encoding="utf-8") as file:
|
|
payload = json.load(file)
|
|
if isinstance(payload, dict):
|
|
return payload
|
|
except Exception:
|
|
return None
|
|
return None
|
|
|
|
|
|
class GpsDataService:
|
|
def __init__(self, receiver: OmniSocketVideoReceiver) -> None:
|
|
self._receiver = receiver
|
|
|
|
def get_latest(self) -> dict[str, Any]:
|
|
metadata = self._receiver.get_latest_frame_metadata()
|
|
if metadata is None:
|
|
return self._build_waiting_payload()
|
|
return self._build_payload_from_metadata(metadata)
|
|
|
|
def _build_waiting_payload(self) -> dict[str, Any]:
|
|
return {
|
|
"has_fix": False,
|
|
"utc_time": "--:--:--",
|
|
"latitude": None,
|
|
"longitude": None,
|
|
"satellites": None,
|
|
"altitude_m": None,
|
|
"coordinate_system": "WGS84",
|
|
"source_sentence": "VIDEO_TRAILER",
|
|
"raw_coordinate_format": VIDEO_TRAILER_COORDINATE_FORMAT,
|
|
"source_mode": "video-frame-trailer-waiting",
|
|
"updated_at": "",
|
|
}
|
|
|
|
def _build_payload_from_metadata(self, metadata: FrameTrailerMetadata) -> dict[str, Any]:
|
|
updated_at = _utc_from_epoch(metadata.received_at) or ""
|
|
if not metadata.has_gps_fix:
|
|
return {
|
|
"has_fix": False,
|
|
"utc_time": "--:--:--",
|
|
"latitude": None,
|
|
"longitude": None,
|
|
"raw_latitude_hex": f"0x{metadata.raw_latitude_hex}",
|
|
"raw_longitude_hex": f"0x{metadata.raw_longitude_hex}",
|
|
"satellites": None,
|
|
"altitude_m": None,
|
|
"coordinate_system": "WGS84",
|
|
"source_sentence": "VIDEO_TRAILER",
|
|
"raw_coordinate_format": VIDEO_TRAILER_COORDINATE_FORMAT,
|
|
"source_mode": "video-frame-trailer-no-fix",
|
|
"updated_at": updated_at,
|
|
}
|
|
|
|
timestamp_seconds = metadata.timestamp_ns / 1_000_000_000
|
|
return {
|
|
"has_fix": True,
|
|
"utc_time": datetime.fromtimestamp(timestamp_seconds, timezone.utc).strftime("%H:%M:%S"),
|
|
"latitude": round(metadata.latitude, 6) if metadata.latitude is not None else None,
|
|
"longitude": round(metadata.longitude, 6) if metadata.longitude is not None else None,
|
|
"raw_latitude_hex": f"0x{metadata.raw_latitude_hex}",
|
|
"raw_longitude_hex": f"0x{metadata.raw_longitude_hex}",
|
|
"satellites": None,
|
|
"altitude_m": None,
|
|
"coordinate_system": "WGS84",
|
|
"source_sentence": "VIDEO_TRAILER",
|
|
"raw_coordinate_format": VIDEO_TRAILER_COORDINATE_FORMAT,
|
|
"source_mode": "video-frame-trailer",
|
|
"updated_at": updated_at,
|
|
}
|
|
|
|
|
|
class KcpTrendTracker:
|
|
def __init__(self) -> None:
|
|
self._lock = threading.Lock()
|
|
self._samples: dict[str, deque[dict[str, Any]]] = {}
|
|
|
|
def _normalize(self, stats: dict[str, Any] | None) -> dict[str, Any]:
|
|
raw = dict(stats or {})
|
|
snd_wnd = _coerce_int(raw.get("snd_wnd"))
|
|
rmt_wnd = _coerce_int(raw.get("rmt_wnd"))
|
|
inflight = _coerce_int(raw.get("inflight"))
|
|
window_limit = _coerce_int(raw.get("window_limit"), min(snd_wnd, rmt_wnd) if snd_wnd and rmt_wnd else 0)
|
|
return {
|
|
"connected": _coerce_int(raw.get("connected")),
|
|
"conv": _coerce_int(raw.get("conv")),
|
|
"rto_ms": _coerce_int(raw.get("rto_ms")),
|
|
"srtt_ms": _coerce_int(raw.get("srtt_ms")),
|
|
"min_srtt_ms": _coerce_int(raw.get("min_srtt_ms")),
|
|
"srttvar_ms": _coerce_int(raw.get("srttvar_ms")),
|
|
"last_feedback_age_ms": _coerce_int(raw.get("last_feedback_age_ms")),
|
|
"snd_wnd": snd_wnd,
|
|
"rmt_wnd": rmt_wnd,
|
|
"inflight": inflight,
|
|
"window_limit": window_limit,
|
|
"window_pressure_pct": round(_coerce_float(raw.get("window_pressure_pct")), 3),
|
|
"snd_queue": _coerce_int(raw.get("snd_queue")),
|
|
"rcv_queue": _coerce_int(raw.get("rcv_queue")),
|
|
"snd_buffer": _coerce_int(raw.get("snd_buffer")),
|
|
"out_segs_total": _coerce_int(raw.get("out_segs_total")),
|
|
"retrans_total": _coerce_int(raw.get("retrans_total")),
|
|
"fast_retrans_total": _coerce_int(raw.get("fast_retrans_total")),
|
|
"lost_total": _coerce_int(raw.get("lost_total")),
|
|
"repeat_total": _coerce_int(raw.get("repeat_total")),
|
|
"xmit_total": _coerce_int(raw.get("xmit_total")),
|
|
}
|
|
|
|
def add_sample(self, key: str, stats: dict[str, Any] | None) -> None:
|
|
sample = {
|
|
"ts_monotonic": time.monotonic(),
|
|
"updated_at": utc_iso_now(),
|
|
"stats": self._normalize(stats),
|
|
}
|
|
with self._lock:
|
|
history = self._samples.setdefault(key, deque(maxlen=TREND_HISTORY_SIZE))
|
|
history.append(sample)
|
|
|
|
def latest_updated_at(self, key: str) -> str | None:
|
|
with self._lock:
|
|
history = self._samples.get(key)
|
|
if not history:
|
|
return None
|
|
return str(history[-1].get("updated_at") or "")
|
|
|
|
def describe(self, key: str, current_stats: dict[str, Any] | None) -> dict[str, Any]:
|
|
current = self._normalize(current_stats)
|
|
with self._lock:
|
|
history = list(self._samples.get(key, ()))
|
|
|
|
timeline = history + [{"stats": current, "updated_at": utc_iso_now()}]
|
|
previous = timeline[-2]["stats"] if len(timeline) >= 2 else None
|
|
trend_window = [entry["stats"] for entry in timeline[-TREND_WINDOW_SIZE:]]
|
|
deadband = max(2.0, 0.05 * float(max(current.get("window_limit", 0), 1)))
|
|
|
|
snd_queue_delta = 0
|
|
snd_buffer_delta = 0
|
|
retrans_delta = 0
|
|
fast_retrans_delta = 0
|
|
lost_delta = 0
|
|
repeat_delta = 0
|
|
out_segs_delta = 0
|
|
if previous is not None:
|
|
snd_queue_delta = max(0, current["snd_queue"] - _coerce_int(previous.get("snd_queue")))
|
|
snd_buffer_delta = max(0, current["snd_buffer"] - _coerce_int(previous.get("snd_buffer")))
|
|
retrans_delta = max(0, current["retrans_total"] - _coerce_int(previous.get("retrans_total")))
|
|
fast_retrans_delta = max(0, current["fast_retrans_total"] - _coerce_int(previous.get("fast_retrans_total")))
|
|
lost_delta = max(0, current["lost_total"] - _coerce_int(previous.get("lost_total")))
|
|
repeat_delta = max(0, current["repeat_total"] - _coerce_int(previous.get("repeat_total")))
|
|
out_segs_delta = max(0, current["out_segs_total"] - _coerce_int(previous.get("out_segs_total")))
|
|
|
|
def classify(field: str) -> str:
|
|
if len(trend_window) < 2:
|
|
return "stable"
|
|
oldest = float(_coerce_int(trend_window[0].get(field)))
|
|
newest = float(_coerce_int(trend_window[-1].get(field)))
|
|
delta = newest - oldest
|
|
if abs(delta) < deadband:
|
|
return "stable"
|
|
return "rising" if delta > 0 else "falling"
|
|
|
|
repair_rate_pct = 0.0
|
|
if out_segs_delta > 0:
|
|
repair_rate_pct = round((retrans_delta / out_segs_delta) * 100.0, 3)
|
|
|
|
return {
|
|
"kcp": current,
|
|
"trend": {
|
|
"snd_queue_delta": snd_queue_delta,
|
|
"snd_buffer_delta": snd_buffer_delta,
|
|
"snd_queue_trend": classify("snd_queue"),
|
|
"snd_buffer_trend": classify("snd_buffer"),
|
|
"retrans_delta": retrans_delta,
|
|
"fast_retrans_delta": fast_retrans_delta,
|
|
"lost_delta": lost_delta,
|
|
"repeat_delta": repeat_delta,
|
|
"out_segs_delta": out_segs_delta,
|
|
"repair_rate_pct": repair_rate_pct,
|
|
},
|
|
}
|
|
|
|
|
|
class HubTelemetryReceiver:
|
|
def __init__(self) -> None:
|
|
self._lock = threading.Lock()
|
|
self._thread: threading.Thread | None = None
|
|
self._started = False
|
|
self._session = None
|
|
self._session_cls = None
|
|
self._msg_type_text = None
|
|
self._msg_type_error = None
|
|
self._telemetry_defaults: dict[str, Any] = {}
|
|
self._latest_snapshot: dict[str, Any] | None = None
|
|
self._last_error = ""
|
|
self._last_received_wall = 0.0
|
|
self._last_received_monotonic = 0.0
|
|
self._reconnect_count = 0
|
|
self._ever_connected = False
|
|
self._closing = threading.Event()
|
|
self._load_backend()
|
|
|
|
def _load_backend(self) -> None:
|
|
try:
|
|
self._import_backend()
|
|
except Exception as error: # pragma: no cover - optional runtime dependency
|
|
self._last_error = f"omnisocket import failed: {error}"
|
|
|
|
def _import_backend(self) -> None:
|
|
try:
|
|
from omnisocket import MSG_TYPE_ERROR, MSG_TYPE_TEXT, Session, TELEMETRY_DEFAULTS # type: ignore
|
|
except ImportError:
|
|
python_dir = WORKSPACE_ROOT / "OmniSocketGo" / "python"
|
|
if python_dir.exists():
|
|
sys.path.insert(0, str(python_dir))
|
|
from omnisocket import MSG_TYPE_ERROR, MSG_TYPE_TEXT, Session, TELEMETRY_DEFAULTS # type: ignore
|
|
|
|
self._msg_type_error = MSG_TYPE_ERROR
|
|
self._msg_type_text = MSG_TYPE_TEXT
|
|
self._session_cls = Session
|
|
self._telemetry_defaults = dict(TELEMETRY_DEFAULTS)
|
|
|
|
def _connect_session(self):
|
|
assert self._session_cls is not None
|
|
|
|
config = load_omnisocket_config()
|
|
transport_cfg = config.get("transport", {})
|
|
telemetry_cfg = config.get("telemetry_receiver", {})
|
|
|
|
session = self._session_cls()
|
|
session.connect(
|
|
server_addr=str(transport_cfg.get("server_addr", "127.0.0.1:10909")),
|
|
peer_id=str(telemetry_cfg.get("peer_id", "peer-a-telemetry")),
|
|
relay_via=str(transport_cfg.get("relay_via", "")),
|
|
bind_ip=str(transport_cfg.get("bind_ip", "")),
|
|
bind_device=str(transport_cfg.get("bind_device", "")),
|
|
**self._telemetry_defaults,
|
|
)
|
|
return session
|
|
|
|
def ensure_started(self) -> None:
|
|
if self._session_cls is None:
|
|
return
|
|
|
|
with self._lock:
|
|
if self._started or self._closing.is_set():
|
|
return
|
|
self._started = True
|
|
self._thread = threading.Thread(
|
|
target=self._run,
|
|
name="hub-telemetry-receiver",
|
|
daemon=True,
|
|
)
|
|
self._thread.start()
|
|
|
|
def _run(self) -> None:
|
|
while not self._closing.is_set():
|
|
try:
|
|
session = self._connect_session()
|
|
with self._lock:
|
|
self._session = session
|
|
self._last_error = ""
|
|
if self._ever_connected:
|
|
self._reconnect_count += 1
|
|
else:
|
|
self._ever_connected = True
|
|
|
|
while not self._closing.is_set():
|
|
result = session.recv(timeout_ms=1000)
|
|
if result is None:
|
|
continue
|
|
|
|
from_peer, msg_type, payload = result
|
|
if msg_type == self._msg_type_error:
|
|
with self._lock:
|
|
self._last_error = f"hub error from {from_peer}: {payload.decode('utf-8', errors='replace')}"
|
|
continue
|
|
if msg_type != self._msg_type_text:
|
|
continue
|
|
|
|
snapshot = json.loads(payload.decode("utf-8"))
|
|
if snapshot.get("type") != "hub_kcp_snapshot":
|
|
continue
|
|
|
|
now_wall = time.time()
|
|
now_mono = time.monotonic()
|
|
with self._lock:
|
|
self._latest_snapshot = snapshot
|
|
self._last_received_wall = now_wall
|
|
self._last_received_monotonic = now_mono
|
|
self._last_error = ""
|
|
except Exception as error: # pragma: no cover - runtime integration path
|
|
if not self._closing.is_set():
|
|
session_error = ""
|
|
if self._session is not None:
|
|
try:
|
|
session_error = str(dict(self._session.stats()).get("last_server_error", "") or "")
|
|
except Exception:
|
|
session_error = ""
|
|
with self._lock:
|
|
self._last_error = session_error or str(error)
|
|
finally:
|
|
with self._lock:
|
|
session = self._session
|
|
self._session = None
|
|
if self._closing.is_set():
|
|
self._started = False
|
|
if session is not None:
|
|
try:
|
|
session.close()
|
|
except Exception:
|
|
pass
|
|
if not self._closing.is_set():
|
|
time.sleep(2)
|
|
|
|
def get_snapshot(self) -> dict[str, Any]:
|
|
self.ensure_started()
|
|
cfg = load_omnisocket_config().get("telemetry_receiver", {})
|
|
stale_after_ms = max(500, int(cfg.get("stale_after_ms", 1500)))
|
|
|
|
with self._lock:
|
|
received_monotonic = self._last_received_monotonic
|
|
received_wall = self._last_received_wall
|
|
snapshot = self._latest_snapshot
|
|
connected = self._session is not None
|
|
last_error = self._last_error
|
|
reconnect_count = self._reconnect_count
|
|
if self._session is not None:
|
|
try:
|
|
session_stats = dict(self._session.stats())
|
|
except Exception:
|
|
session_stats = {}
|
|
else:
|
|
session_stats = {}
|
|
|
|
stale = True
|
|
if received_monotonic > 0.0:
|
|
stale = (time.monotonic() - received_monotonic) * 1000.0 > stale_after_ms
|
|
|
|
return {
|
|
"connected": connected,
|
|
"updated_at": _utc_from_epoch(received_wall),
|
|
"received_at_monotonic": received_monotonic,
|
|
"stale": stale,
|
|
"peer_id": str(cfg.get("peer_id", "peer-a-telemetry")),
|
|
"snapshot": snapshot or {"sessions": []},
|
|
"last_error": last_error,
|
|
"registered": bool(session_stats.get("registered", 0)),
|
|
"last_server_error": str(session_stats.get("last_server_error", "") or ""),
|
|
"reconnect_count": reconnect_count,
|
|
}
|
|
|
|
def close(self) -> None:
|
|
self._closing.set()
|
|
with self._lock:
|
|
session = self._session
|
|
if session is not None:
|
|
try:
|
|
session.close()
|
|
except Exception:
|
|
pass
|
|
thread = self._thread
|
|
if thread is not None and thread.is_alive():
|
|
thread.join(timeout=0.5)
|
|
|
|
|
|
class NetworkTelemetryService:
|
|
def __init__(
|
|
self,
|
|
video_receiver: OmniSocketVideoReceiver,
|
|
control_sender: OmniSocketControlSender,
|
|
control_ack_tracker: ControlAckTracker,
|
|
control_ack_receiver: OmniSocketControlAckReceiver,
|
|
control_arbiter: ControlArbiter,
|
|
native_ingress: NativeUdpControlIngress,
|
|
hub_receiver: HubTelemetryReceiver,
|
|
video_display_probe_store: VideoDisplayProbeStore,
|
|
) -> None:
|
|
self._video_receiver = video_receiver
|
|
self._control_sender = control_sender
|
|
self._control_ack_tracker = control_ack_tracker
|
|
self._control_ack_receiver = control_ack_receiver
|
|
self._control_arbiter = control_arbiter
|
|
self._native_ingress = native_ingress
|
|
self._hub_receiver = hub_receiver
|
|
self._video_display_probe_store = video_display_probe_store
|
|
self._trend_tracker = KcpTrendTracker()
|
|
self._rate_lock = threading.Lock()
|
|
self._last_rate_sample: tuple[float, int, int] | None = None
|
|
self._sample_thread: threading.Thread | None = None
|
|
self._sample_started = False
|
|
self._last_remote_snapshot_at = 0.0
|
|
self._closing = threading.Event()
|
|
|
|
def _ensure_started(self) -> None:
|
|
self._video_receiver.ensure_started()
|
|
self._control_arbiter.ensure_started()
|
|
self._control_ack_receiver.ensure_started()
|
|
self._native_ingress.ensure_started()
|
|
self._hub_receiver.ensure_started()
|
|
with self._rate_lock:
|
|
if self._sample_started or self._closing.is_set():
|
|
return
|
|
self._sample_started = True
|
|
self._sample_thread = threading.Thread(
|
|
target=self._sample_loop,
|
|
name="network-telemetry-sampler",
|
|
daemon=True,
|
|
)
|
|
self._sample_thread.start()
|
|
|
|
def _sample_loop(self) -> None:
|
|
interval_seconds = LOCAL_SAMPLE_INTERVAL_MS / 1000.0
|
|
while not self._closing.is_set():
|
|
try:
|
|
self._trend_tracker.add_sample("a_to_d.video", self._video_receiver.session_kcp_stats())
|
|
self._trend_tracker.add_sample("a_to_d.control", self._control_sender.session_kcp_stats())
|
|
except Exception:
|
|
pass
|
|
time.sleep(interval_seconds)
|
|
|
|
def _compute_rates(self, send_bytes: int, recv_bytes: int) -> tuple[float, float]:
|
|
now = time.monotonic()
|
|
with self._rate_lock:
|
|
previous = self._last_rate_sample
|
|
self._last_rate_sample = (now, send_bytes, recv_bytes)
|
|
|
|
if previous is None:
|
|
return 0.0, 0.0
|
|
|
|
prev_time, prev_send, prev_recv = previous
|
|
elapsed = now - prev_time
|
|
if elapsed <= 0.0:
|
|
return 0.0, 0.0
|
|
|
|
tx_kbps = max(0.0, ((send_bytes - prev_send) * 8.0) / elapsed / 1000.0)
|
|
rx_kbps = max(0.0, ((recv_bytes - prev_recv) * 8.0) / elapsed / 1000.0)
|
|
return tx_kbps, rx_kbps
|
|
|
|
def _ingest_remote_snapshot(self, telemetry_state: dict[str, Any]) -> None:
|
|
received_at = float(telemetry_state.get("received_at_monotonic") or 0.0)
|
|
if received_at <= 0.0 or received_at <= self._last_remote_snapshot_at:
|
|
return
|
|
|
|
snapshot = telemetry_state.get("snapshot") or {}
|
|
sessions = snapshot.get("sessions") or []
|
|
for session in sessions:
|
|
peer_id = str(session.get("peer_id", "")).strip()
|
|
if not peer_id:
|
|
continue
|
|
self._trend_tracker.add_sample(f"hub::{peer_id}", session)
|
|
self._last_remote_snapshot_at = received_at
|
|
|
|
def _build_session_payload(
|
|
self,
|
|
trend_key: str,
|
|
peer_id: str,
|
|
app_stats: dict[str, Any] | None,
|
|
current_kcp: dict[str, Any] | None,
|
|
updated_at: str | None,
|
|
stale: bool,
|
|
) -> dict[str, Any]:
|
|
described = self._trend_tracker.describe(trend_key, current_kcp)
|
|
connected = bool(described["kcp"].get("connected"))
|
|
if app_stats is not None and "registered" in app_stats:
|
|
connected = bool(app_stats.get("registered"))
|
|
return {
|
|
"peer_id": peer_id,
|
|
"connected": connected,
|
|
"updated_at": updated_at,
|
|
"stale": stale,
|
|
"app": app_stats,
|
|
"kcp": described["kcp"],
|
|
"trend": described["trend"],
|
|
}
|
|
|
|
def _build_link(self, source: str, updated_at: str | None, stale: bool, sessions: dict[str, dict[str, Any]]) -> dict[str, Any]:
|
|
session_items = list(sessions.values())
|
|
active_sessions = [session for session in session_items if session.get("connected") and not session.get("stale")]
|
|
retrans_sum = sum(_coerce_int(session.get("trend", {}).get("retrans_delta")) for session in active_sessions)
|
|
out_segs_sum = sum(_coerce_int(session.get("trend", {}).get("out_segs_delta")) for session in active_sessions)
|
|
repair_rate_pct = round((retrans_sum / out_segs_sum) * 100.0, 3) if out_segs_sum > 0 else 0.0
|
|
|
|
return {
|
|
"source": source,
|
|
"updated_at": updated_at,
|
|
"stale": stale,
|
|
"aggregate": {
|
|
"online_sessions": len(active_sessions),
|
|
"max_window_pressure_pct": max(
|
|
(_coerce_float(session.get("kcp", {}).get("window_pressure_pct")) for session in active_sessions),
|
|
default=0.0,
|
|
),
|
|
"sum_snd_queue": sum(_coerce_int(session.get("kcp", {}).get("snd_queue")) for session in active_sessions),
|
|
"sum_snd_buffer": sum(_coerce_int(session.get("kcp", {}).get("snd_buffer")) for session in active_sessions),
|
|
"sum_retrans_delta": retrans_sum,
|
|
"sum_out_segs_delta": out_segs_sum,
|
|
"repair_rate_pct": repair_rate_pct,
|
|
},
|
|
"sessions": sessions,
|
|
}
|
|
|
|
def _pick_primary_session(self, links: dict[str, dict[str, Any]]) -> dict[str, Any] | None:
|
|
candidates = (
|
|
links["a_to_d"]["sessions"]["control"],
|
|
links["a_to_d"]["sessions"]["video"],
|
|
links["d_to_b"]["sessions"]["control"],
|
|
links["d_to_b"]["sessions"]["video"],
|
|
)
|
|
for session in candidates:
|
|
if session.get("connected") and not session.get("stale"):
|
|
return session
|
|
return None
|
|
|
|
def _derive_robot_health(
|
|
self,
|
|
*,
|
|
video_receiver_status: dict[str, Any],
|
|
local_control_registered: bool,
|
|
remote_control_fresh: bool,
|
|
remote_video_fresh: bool,
|
|
telemetry_state: dict[str, Any],
|
|
watchdog_status: dict[str, Any] | None,
|
|
) -> dict[str, Any]:
|
|
if watchdog_status is not None:
|
|
explicit_health = self._derive_robot_health_from_watchdog(watchdog_status)
|
|
if explicit_health is not None:
|
|
return explicit_health
|
|
|
|
has_recent_frame = bool(video_receiver_status.get("has_recent_frame"))
|
|
telemetry_connected = bool(telemetry_state.get("connected"))
|
|
telemetry_stale = bool(telemetry_state.get("stale", True))
|
|
|
|
if has_recent_frame and remote_control_fresh and remote_video_fresh:
|
|
fault_reason = "none"
|
|
recovery_state = "ok"
|
|
elif not remote_control_fresh and not remote_video_fresh and not has_recent_frame:
|
|
fault_reason = "network_or_robot_unreachable"
|
|
recovery_state = "recovering" if telemetry_connected and not telemetry_stale else "degraded"
|
|
elif remote_control_fresh and not remote_video_fresh:
|
|
fault_reason = "video_session_recovering"
|
|
recovery_state = "recovering"
|
|
elif not remote_control_fresh and local_control_registered:
|
|
fault_reason = "control_session_recovering"
|
|
recovery_state = "recovering"
|
|
elif remote_control_fresh and not has_recent_frame:
|
|
fault_reason = "video_pipeline_stalled"
|
|
recovery_state = "degraded"
|
|
else:
|
|
fault_reason = "unknown"
|
|
recovery_state = "degraded"
|
|
|
|
return {
|
|
"fault_reason": fault_reason,
|
|
"recovery_state": recovery_state,
|
|
"confidence": "derived",
|
|
"updated_at": utc_iso_now(),
|
|
}
|
|
|
|
def _derive_robot_health_from_watchdog(self, watchdog_status: dict[str, Any]) -> dict[str, Any] | None:
|
|
updated_at_epoch_ms = _coerce_int(watchdog_status.get("updated_at_epoch_ms"))
|
|
if updated_at_epoch_ms <= 0:
|
|
return None
|
|
|
|
now_epoch_ms = int(time.time() * 1000)
|
|
if now_epoch_ms - updated_at_epoch_ms > WATCHDOG_STATUS_STALE_MS:
|
|
return None
|
|
|
|
raw_fault_reason = str(watchdog_status.get("fault_reason", "") or "")
|
|
raw_recovery_state = str(watchdog_status.get("recovery_state", "") or "")
|
|
normalized_fault_reason = "unknown"
|
|
normalized_recovery_state = raw_recovery_state or "degraded"
|
|
mapped_health = WATCHDOG_FAULT_REASON_MAP.get(raw_fault_reason)
|
|
|
|
if mapped_health is not None:
|
|
normalized_fault_reason, recovery_override = mapped_health
|
|
if recovery_override is not None:
|
|
normalized_recovery_state = recovery_override
|
|
if raw_recovery_state == "backoff":
|
|
normalized_recovery_state = "backoff"
|
|
|
|
return {
|
|
"fault_reason": normalized_fault_reason,
|
|
"recovery_state": normalized_recovery_state,
|
|
"confidence": "derived",
|
|
"updated_at": _utc_from_epoch(updated_at_epoch_ms / 1000.0) or utc_iso_now(),
|
|
}
|
|
|
|
def _derive_latency_estimate(
|
|
self,
|
|
*,
|
|
links: dict[str, dict[str, Any]],
|
|
video_receiver_status: dict[str, Any],
|
|
display_probe_status: dict[str, Any],
|
|
) -> dict[str, Any]:
|
|
a_to_d_control_raw = links["a_to_d"]["sessions"]["control"]["kcp"].get("srtt_ms")
|
|
d_to_b_control_raw = links["d_to_b"]["sessions"]["control"]["kcp"].get("srtt_ms")
|
|
a_to_d_control_min_raw = links["a_to_d"]["sessions"]["control"]["kcp"].get("min_srtt_ms")
|
|
d_to_b_control_min_raw = links["d_to_b"]["sessions"]["control"]["kcp"].get("min_srtt_ms")
|
|
a_to_d_video_raw = links["a_to_d"]["sessions"]["video"]["kcp"].get("srtt_ms")
|
|
d_to_b_video_raw = links["d_to_b"]["sessions"]["video"]["kcp"].get("srtt_ms")
|
|
|
|
a_to_d_control = _coerce_float(a_to_d_control_raw) if a_to_d_control_raw is not None else None
|
|
d_to_b_control = _coerce_float(d_to_b_control_raw) if d_to_b_control_raw is not None else None
|
|
a_to_d_control_min = _coerce_float(a_to_d_control_min_raw) if a_to_d_control_min_raw is not None else None
|
|
d_to_b_control_min = _coerce_float(d_to_b_control_min_raw) if d_to_b_control_min_raw is not None else None
|
|
a_to_d_video = _coerce_float(a_to_d_video_raw) if a_to_d_video_raw is not None else None
|
|
d_to_b_video = _coerce_float(d_to_b_video_raw) if d_to_b_video_raw is not None else None
|
|
ack_estimate = self._control_ack_tracker.get_latest_estimate()
|
|
capture_to_send_raw = video_receiver_status.get("latest_capture_to_send_ms")
|
|
request_to_paint_raw = display_probe_status.get("request_to_paint_ms")
|
|
capture_to_send_ms = _coerce_float(capture_to_send_raw) if capture_to_send_raw is not None else None
|
|
request_to_paint_ms = _coerce_float(request_to_paint_raw) if request_to_paint_raw is not None else None
|
|
video_network_oneway_est_ms = (
|
|
round((a_to_d_video + d_to_b_video) / 2.0, 3)
|
|
if a_to_d_video is not None and d_to_b_video is not None
|
|
else None
|
|
)
|
|
video_partial_est_ms = None
|
|
if capture_to_send_ms is not None and video_network_oneway_est_ms is not None:
|
|
video_partial_est_ms = round(capture_to_send_ms + video_network_oneway_est_ms, 3)
|
|
video_e2e_est_ms = None
|
|
if video_partial_est_ms is not None and request_to_paint_ms is not None:
|
|
video_e2e_est_ms = round(video_partial_est_ms + request_to_paint_ms, 3)
|
|
|
|
return {
|
|
"control_loop_rtt_ms": ack_estimate.get("control_loop_rtt_ms"),
|
|
"control_to_persist_est_ms": ack_estimate.get("control_to_persist_est_ms"),
|
|
"control_oneway_srtt_est_ms": (
|
|
round((a_to_d_control + d_to_b_control) / 2.0, 3)
|
|
if a_to_d_control is not None and d_to_b_control is not None
|
|
else None
|
|
),
|
|
"control_oneway_bestcase_est_ms": (
|
|
round((a_to_d_control_min + d_to_b_control_min) / 2.0, 3)
|
|
if a_to_d_control_min is not None and d_to_b_control_min is not None
|
|
else None
|
|
),
|
|
"video_network_oneway_est_ms": video_network_oneway_est_ms,
|
|
"video_partial_est_ms": video_partial_est_ms,
|
|
"video_e2e_est_ms": video_e2e_est_ms,
|
|
"estimate_method": {
|
|
"control": "ack_loop" if ack_estimate.get("ack_available") else "srtt_fallback",
|
|
"video": "capture_to_send+srtt/2+request_to_paint" if video_e2e_est_ms is not None else "capture_to_send+srtt/2",
|
|
},
|
|
"clock_sync_required": False,
|
|
"assumptions": [
|
|
"control one-way estimate uses ACK loop when available",
|
|
"video one-way estimate uses per-leg SRTT and local paint timing",
|
|
],
|
|
"confidence": {
|
|
"control": "derived_ack" if ack_estimate.get("ack_available") else "fallback_srtt",
|
|
"video": "derived_local_probe" if video_e2e_est_ms is not None else "partial_without_probe",
|
|
},
|
|
}
|
|
|
|
def get_latest(self) -> dict[str, Any]:
|
|
self._ensure_started()
|
|
|
|
config = load_omnisocket_config()
|
|
video_receiver_cfg = config.get("video_receiver", {})
|
|
control_sender_cfg = config.get("control_sender", {})
|
|
video_sender_cfg = config.get("video_sender", {})
|
|
|
|
video_app = self._video_receiver.session_stats()
|
|
control_app = self._control_sender.session_stats()
|
|
video_kcp = self._video_receiver.session_kcp_stats()
|
|
control_kcp = self._control_sender.session_kcp_stats()
|
|
video_receiver_status = self._video_receiver.get_status()
|
|
arbiter_status = self._control_arbiter.get_status()
|
|
ingress_status = self._native_ingress.get_status()
|
|
sender_status = self._control_sender.get_status()
|
|
ack_receiver_status = self._control_ack_receiver.get_status()
|
|
ack_status = self._control_ack_tracker.get_latest_estimate()
|
|
telemetry_state = self._hub_receiver.get_snapshot()
|
|
display_probe_status = self._video_display_probe_store.get_status()
|
|
|
|
total_send_bytes = int(video_app.get("send_bytes", 0)) + int(control_app.get("send_bytes", 0))
|
|
total_recv_bytes = int(video_app.get("recv_bytes", 0)) + int(control_app.get("recv_bytes", 0))
|
|
tx_kbps, rx_kbps = self._compute_rates(total_send_bytes, total_recv_bytes)
|
|
|
|
local_updated_at = utc_iso_now()
|
|
local_sessions = {
|
|
"video": self._build_session_payload(
|
|
"a_to_d.video",
|
|
str(video_receiver_cfg.get("peer_id", "peer-a-video")),
|
|
video_app,
|
|
video_kcp,
|
|
local_updated_at,
|
|
False,
|
|
),
|
|
"control": self._build_session_payload(
|
|
"a_to_d.control",
|
|
str(control_sender_cfg.get("peer_id", "peer-a-ctrl")),
|
|
control_app,
|
|
control_kcp,
|
|
local_updated_at,
|
|
False,
|
|
),
|
|
}
|
|
|
|
remote_snapshot = telemetry_state.get("snapshot") or {}
|
|
remote_sessions_by_peer = {
|
|
str(session.get("peer_id", "")).strip(): session
|
|
for session in remote_snapshot.get("sessions", []) or []
|
|
if str(session.get("peer_id", "")).strip()
|
|
}
|
|
remote_updated_at = telemetry_state.get("updated_at")
|
|
remote_stale = bool(telemetry_state.get("stale", True))
|
|
remote_sessions = {
|
|
"video": self._build_session_payload(
|
|
f"hub::{str(video_sender_cfg.get('peer_id', 'peer-b-video'))}",
|
|
str(video_sender_cfg.get("peer_id", "peer-b-video")),
|
|
None,
|
|
remote_sessions_by_peer.get(str(video_sender_cfg.get("peer_id", "peer-b-video")), {}),
|
|
remote_updated_at,
|
|
remote_stale,
|
|
),
|
|
"control": self._build_session_payload(
|
|
f"hub::{str(control_sender_cfg.get('target_peer', 'peer-b-ctrl'))}",
|
|
str(control_sender_cfg.get("target_peer", "peer-b-ctrl")),
|
|
None,
|
|
remote_sessions_by_peer.get(str(control_sender_cfg.get("target_peer", "peer-b-ctrl")), {}),
|
|
remote_updated_at,
|
|
remote_stale,
|
|
),
|
|
}
|
|
self._video_receiver.update_remote_video_srtt(
|
|
_coerce_int(remote_sessions["video"]["kcp"].get("srtt_ms")) if remote_sessions["video"]["kcp"].get("srtt_ms") is not None else None
|
|
)
|
|
|
|
links = {
|
|
"a_to_d": self._build_link("local-a-side", local_updated_at, False, local_sessions),
|
|
"d_to_b": self._build_link("hub-telemetry", remote_updated_at, remote_stale, remote_sessions),
|
|
}
|
|
|
|
primary_session = self._pick_primary_session(links)
|
|
primary_kcp = dict(primary_session.get("kcp", {})) if primary_session is not None else {}
|
|
self._ingest_remote_snapshot(telemetry_state)
|
|
|
|
fresh_connected_sessions = (
|
|
links["a_to_d"]["aggregate"]["online_sessions"] + links["d_to_b"]["aggregate"]["online_sessions"]
|
|
)
|
|
latency_ms = primary_kcp.get("srtt_ms") if primary_session is not None else None
|
|
jitter_ms = primary_kcp.get("srttvar_ms") if primary_session is not None else None
|
|
local_control_registered = bool(control_app.get("registered", 0))
|
|
remote_control_fresh = bool(remote_sessions["control"].get("connected")) and not bool(remote_sessions["control"].get("stale"))
|
|
remote_video_fresh = bool(remote_sessions["video"].get("connected")) and not bool(remote_sessions["video"].get("stale"))
|
|
watchdog_status = _load_optional_json(WATCHDOG_STATUS_PATH)
|
|
robot_health = self._derive_robot_health(
|
|
video_receiver_status=video_receiver_status,
|
|
local_control_registered=local_control_registered,
|
|
remote_control_fresh=remote_control_fresh,
|
|
remote_video_fresh=remote_video_fresh,
|
|
telemetry_state=telemetry_state,
|
|
watchdog_status=watchdog_status,
|
|
)
|
|
latency_estimate = self._derive_latency_estimate(
|
|
links=links,
|
|
video_receiver_status=video_receiver_status,
|
|
display_probe_status=display_probe_status,
|
|
)
|
|
|
|
if local_control_registered and remote_control_fresh:
|
|
peer_status = "online"
|
|
elif local_control_registered or bool(local_sessions["video"].get("connected")):
|
|
peer_status = "degraded"
|
|
elif sender_status.get("backend_ready"):
|
|
peer_status = "idle"
|
|
else:
|
|
peer_status = "backend-unavailable"
|
|
|
|
return {
|
|
"peer_status": peer_status,
|
|
"latency_ms": latency_ms,
|
|
"jitter_ms": jitter_ms,
|
|
"packet_loss_pct": None,
|
|
"tx_kbps": round(tx_kbps, 3),
|
|
"rx_kbps": round(rx_kbps, 3),
|
|
"transport": "OmniSocket / kcp",
|
|
"source_mode": "omnisocket-live" if fresh_connected_sessions > 0 else "omnisocket-idle",
|
|
"updated_at": utc_iso_now(),
|
|
"active_control_source": arbiter_status["active_source"],
|
|
"control_lease_remaining_ms": arbiter_status["control_lease_remaining_ms"],
|
|
"combined": {
|
|
"connected_sessions": fresh_connected_sessions,
|
|
"send_bytes": total_send_bytes,
|
|
"recv_bytes": total_recv_bytes,
|
|
"tx_kbps": round(tx_kbps, 3),
|
|
"rx_kbps": round(rx_kbps, 3),
|
|
},
|
|
"sessions": {
|
|
"video": {
|
|
"app": video_app,
|
|
"kcp": local_sessions["video"]["kcp"],
|
|
},
|
|
"control": {
|
|
"app": control_app,
|
|
"kcp": local_sessions["control"]["kcp"],
|
|
},
|
|
},
|
|
"links": links,
|
|
"latency_estimate": latency_estimate,
|
|
"video_freshness": video_receiver_status.get("freshness", {}),
|
|
"control_ack_status": {
|
|
**ack_status,
|
|
"receiver": ack_receiver_status,
|
|
},
|
|
"telemetry_receiver": {
|
|
"hub_connected": bool(telemetry_state.get("connected")),
|
|
"hub_updated_at": telemetry_state.get("updated_at"),
|
|
"hub_stale": remote_stale,
|
|
"last_error": telemetry_state.get("last_error", ""),
|
|
"peer_id": telemetry_state.get("peer_id", ""),
|
|
"registered": bool(telemetry_state.get("registered", False)),
|
|
"last_server_error": str(telemetry_state.get("last_server_error", "") or ""),
|
|
"reconnect_count": int(telemetry_state.get("reconnect_count", 0)),
|
|
},
|
|
"robot_health": robot_health,
|
|
"ingress": {
|
|
"native_udp": ingress_status,
|
|
},
|
|
"control": {
|
|
"arbiter": arbiter_status,
|
|
"sender": sender_status,
|
|
"ack_receiver": ack_receiver_status,
|
|
},
|
|
}
|
|
|
|
def close(self) -> None:
|
|
self._closing.set()
|
|
thread = self._sample_thread
|
|
if thread is not None and thread.is_alive():
|
|
thread.join(timeout=0.5)
|