Files
robot-command-center/backend/monitoring/telemetry.py
2026-04-18 16:07:28 +08:00

904 lines
39 KiB
Python

from __future__ import annotations
from collections import deque
import json
import os
import sys
import threading
import time
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from .common import (
VIDEO_TRAILER_COORDINATE_FORMAT,
WORKSPACE_ROOT,
load_omnisocket_config,
utc_iso_now,
)
from .control import ControlAckTracker, ControlArbiter, NativeUdpControlIngress, OmniSocketControlAckReceiver, OmniSocketControlSender
from .video import FrameTrailerMetadata, OmniSocketVideoReceiver, VideoDisplayProbeStore
LOCAL_SAMPLE_INTERVAL_MS = 500
TREND_HISTORY_SIZE = 10
TREND_WINDOW_SIZE = 5
BLITZ_RUNTIME_DIR = Path(os.getenv("BLITZ_RUNTIME_DIR", "/run/blitz-robot"))
WATCHDOG_STATUS_PATH = BLITZ_RUNTIME_DIR / "watchdog.status.json"
WATCHDOG_STATUS_STALE_MS = max(int(os.getenv("BLITZ_HEALTH_STALE_SEC", "15")), 1) * 1000
WATCHDOG_FAULT_REASON_MAP: dict[str, tuple[str, str | None]] = {
"": ("none", None),
"none": ("none", None),
"camera_missing": ("video_pipeline_stalled", "degraded"),
"camera_recovered": ("video_session_recovering", "recovering"),
"camera-reappeared-escalated": ("video_session_recovering", "recovering"),
"bside_status_stale": ("video_session_recovering", "recovering"),
"bside-unhealthy-escalated": ("video_session_recovering", "recovering"),
"ros_receiver_unhealthy": ("control_session_recovering", "recovering"),
"ros-unhealthy": ("control_session_recovering", "recovering"),
"network_or_robot_unreachable": ("network_or_robot_unreachable", "recovering"),
"network-recovered-ros-unhealthy": ("control_session_recovering", "recovering"),
"network-recovered-escalated": ("network_or_robot_unreachable", "recovering"),
}
def _utc_from_epoch(epoch_seconds: float | None) -> str | None:
if epoch_seconds is None or epoch_seconds <= 0.0:
return None
return datetime.fromtimestamp(epoch_seconds, timezone.utc).isoformat(timespec="seconds").replace("+00:00", "Z")
def _coerce_int(value: Any, default: int = 0) -> int:
try:
if value is None:
return default
return int(value)
except (TypeError, ValueError):
return default
def _coerce_float(value: Any, default: float = 0.0) -> float:
try:
if value is None:
return default
return float(value)
except (TypeError, ValueError):
return default
def _load_optional_json(path: Path) -> dict[str, Any] | None:
try:
if not path.exists():
return None
with path.open("r", encoding="utf-8") as file:
payload = json.load(file)
if isinstance(payload, dict):
return payload
except Exception:
return None
return None
class GpsDataService:
def __init__(self, receiver: OmniSocketVideoReceiver) -> None:
self._receiver = receiver
def get_latest(self) -> dict[str, Any]:
metadata = self._receiver.get_latest_frame_metadata()
if metadata is None:
return self._build_waiting_payload()
return self._build_payload_from_metadata(metadata)
def _build_waiting_payload(self) -> dict[str, Any]:
return {
"has_fix": False,
"utc_time": "--:--:--",
"latitude": None,
"longitude": None,
"satellites": None,
"altitude_m": None,
"coordinate_system": "WGS84",
"source_sentence": "VIDEO_TRAILER",
"raw_coordinate_format": VIDEO_TRAILER_COORDINATE_FORMAT,
"source_mode": "video-frame-trailer-waiting",
"updated_at": "",
}
def _build_payload_from_metadata(self, metadata: FrameTrailerMetadata) -> dict[str, Any]:
updated_at = _utc_from_epoch(metadata.received_at) or ""
if not metadata.has_gps_fix:
return {
"has_fix": False,
"utc_time": "--:--:--",
"latitude": None,
"longitude": None,
"raw_latitude_hex": f"0x{metadata.raw_latitude_hex}",
"raw_longitude_hex": f"0x{metadata.raw_longitude_hex}",
"satellites": None,
"altitude_m": None,
"coordinate_system": "WGS84",
"source_sentence": "VIDEO_TRAILER",
"raw_coordinate_format": VIDEO_TRAILER_COORDINATE_FORMAT,
"source_mode": "video-frame-trailer-no-fix",
"updated_at": updated_at,
}
timestamp_seconds = metadata.timestamp_ns / 1_000_000_000
return {
"has_fix": True,
"utc_time": datetime.fromtimestamp(timestamp_seconds, timezone.utc).strftime("%H:%M:%S"),
"latitude": round(metadata.latitude, 6) if metadata.latitude is not None else None,
"longitude": round(metadata.longitude, 6) if metadata.longitude is not None else None,
"raw_latitude_hex": f"0x{metadata.raw_latitude_hex}",
"raw_longitude_hex": f"0x{metadata.raw_longitude_hex}",
"satellites": None,
"altitude_m": None,
"coordinate_system": "WGS84",
"source_sentence": "VIDEO_TRAILER",
"raw_coordinate_format": VIDEO_TRAILER_COORDINATE_FORMAT,
"source_mode": "video-frame-trailer",
"updated_at": updated_at,
}
class KcpTrendTracker:
def __init__(self) -> None:
self._lock = threading.Lock()
self._samples: dict[str, deque[dict[str, Any]]] = {}
def _normalize(self, stats: dict[str, Any] | None) -> dict[str, Any]:
raw = dict(stats or {})
snd_wnd = _coerce_int(raw.get("snd_wnd"))
rmt_wnd = _coerce_int(raw.get("rmt_wnd"))
inflight = _coerce_int(raw.get("inflight"))
window_limit = _coerce_int(raw.get("window_limit"), min(snd_wnd, rmt_wnd) if snd_wnd and rmt_wnd else 0)
return {
"connected": _coerce_int(raw.get("connected")),
"conv": _coerce_int(raw.get("conv")),
"rto_ms": _coerce_int(raw.get("rto_ms")),
"srtt_ms": _coerce_int(raw.get("srtt_ms")),
"min_srtt_ms": _coerce_int(raw.get("min_srtt_ms")),
"srttvar_ms": _coerce_int(raw.get("srttvar_ms")),
"last_feedback_age_ms": _coerce_int(raw.get("last_feedback_age_ms")),
"snd_wnd": snd_wnd,
"rmt_wnd": rmt_wnd,
"inflight": inflight,
"window_limit": window_limit,
"window_pressure_pct": round(_coerce_float(raw.get("window_pressure_pct")), 3),
"snd_queue": _coerce_int(raw.get("snd_queue")),
"rcv_queue": _coerce_int(raw.get("rcv_queue")),
"snd_buffer": _coerce_int(raw.get("snd_buffer")),
"out_segs_total": _coerce_int(raw.get("out_segs_total")),
"retrans_total": _coerce_int(raw.get("retrans_total")),
"fast_retrans_total": _coerce_int(raw.get("fast_retrans_total")),
"lost_total": _coerce_int(raw.get("lost_total")),
"repeat_total": _coerce_int(raw.get("repeat_total")),
"xmit_total": _coerce_int(raw.get("xmit_total")),
}
def add_sample(self, key: str, stats: dict[str, Any] | None) -> None:
sample = {
"ts_monotonic": time.monotonic(),
"updated_at": utc_iso_now(),
"stats": self._normalize(stats),
}
with self._lock:
history = self._samples.setdefault(key, deque(maxlen=TREND_HISTORY_SIZE))
history.append(sample)
def latest_updated_at(self, key: str) -> str | None:
with self._lock:
history = self._samples.get(key)
if not history:
return None
return str(history[-1].get("updated_at") or "")
def describe(self, key: str, current_stats: dict[str, Any] | None) -> dict[str, Any]:
current = self._normalize(current_stats)
with self._lock:
history = list(self._samples.get(key, ()))
timeline = history + [{"stats": current, "updated_at": utc_iso_now()}]
previous = timeline[-2]["stats"] if len(timeline) >= 2 else None
trend_window = [entry["stats"] for entry in timeline[-TREND_WINDOW_SIZE:]]
deadband = max(2.0, 0.05 * float(max(current.get("window_limit", 0), 1)))
snd_queue_delta = 0
snd_buffer_delta = 0
retrans_delta = 0
fast_retrans_delta = 0
lost_delta = 0
repeat_delta = 0
out_segs_delta = 0
if previous is not None:
snd_queue_delta = max(0, current["snd_queue"] - _coerce_int(previous.get("snd_queue")))
snd_buffer_delta = max(0, current["snd_buffer"] - _coerce_int(previous.get("snd_buffer")))
retrans_delta = max(0, current["retrans_total"] - _coerce_int(previous.get("retrans_total")))
fast_retrans_delta = max(0, current["fast_retrans_total"] - _coerce_int(previous.get("fast_retrans_total")))
lost_delta = max(0, current["lost_total"] - _coerce_int(previous.get("lost_total")))
repeat_delta = max(0, current["repeat_total"] - _coerce_int(previous.get("repeat_total")))
out_segs_delta = max(0, current["out_segs_total"] - _coerce_int(previous.get("out_segs_total")))
def classify(field: str) -> str:
if len(trend_window) < 2:
return "stable"
oldest = float(_coerce_int(trend_window[0].get(field)))
newest = float(_coerce_int(trend_window[-1].get(field)))
delta = newest - oldest
if abs(delta) < deadband:
return "stable"
return "rising" if delta > 0 else "falling"
repair_rate_pct = 0.0
if out_segs_delta > 0:
repair_rate_pct = round((retrans_delta / out_segs_delta) * 100.0, 3)
return {
"kcp": current,
"trend": {
"snd_queue_delta": snd_queue_delta,
"snd_buffer_delta": snd_buffer_delta,
"snd_queue_trend": classify("snd_queue"),
"snd_buffer_trend": classify("snd_buffer"),
"retrans_delta": retrans_delta,
"fast_retrans_delta": fast_retrans_delta,
"lost_delta": lost_delta,
"repeat_delta": repeat_delta,
"out_segs_delta": out_segs_delta,
"repair_rate_pct": repair_rate_pct,
},
}
class HubTelemetryReceiver:
def __init__(self) -> None:
self._lock = threading.Lock()
self._thread: threading.Thread | None = None
self._started = False
self._session = None
self._session_cls = None
self._msg_type_text = None
self._msg_type_error = None
self._telemetry_defaults: dict[str, Any] = {}
self._latest_snapshot: dict[str, Any] | None = None
self._last_error = ""
self._last_received_wall = 0.0
self._last_received_monotonic = 0.0
self._reconnect_count = 0
self._ever_connected = False
self._closing = threading.Event()
self._load_backend()
def _load_backend(self) -> None:
try:
self._import_backend()
except Exception as error: # pragma: no cover - optional runtime dependency
self._last_error = f"omnisocket import failed: {error}"
def _import_backend(self) -> None:
try:
from omnisocket import MSG_TYPE_ERROR, MSG_TYPE_TEXT, Session, TELEMETRY_DEFAULTS # type: ignore
except ImportError:
python_dir = WORKSPACE_ROOT / "OmniSocketGo" / "python"
if python_dir.exists():
sys.path.insert(0, str(python_dir))
from omnisocket import MSG_TYPE_ERROR, MSG_TYPE_TEXT, Session, TELEMETRY_DEFAULTS # type: ignore
self._msg_type_error = MSG_TYPE_ERROR
self._msg_type_text = MSG_TYPE_TEXT
self._session_cls = Session
self._telemetry_defaults = dict(TELEMETRY_DEFAULTS)
def _connect_session(self):
assert self._session_cls is not None
config = load_omnisocket_config()
transport_cfg = config.get("transport", {})
telemetry_cfg = config.get("telemetry_receiver", {})
session = self._session_cls()
session.connect(
server_addr=str(transport_cfg.get("server_addr", "127.0.0.1:10909")),
peer_id=str(telemetry_cfg.get("peer_id", "peer-a-telemetry")),
relay_via=str(transport_cfg.get("relay_via", "")),
bind_ip=str(transport_cfg.get("bind_ip", "")),
bind_device=str(transport_cfg.get("bind_device", "")),
**self._telemetry_defaults,
)
return session
def ensure_started(self) -> None:
if self._session_cls is None:
return
with self._lock:
if self._started or self._closing.is_set():
return
self._started = True
self._thread = threading.Thread(
target=self._run,
name="hub-telemetry-receiver",
daemon=True,
)
self._thread.start()
def _run(self) -> None:
while not self._closing.is_set():
try:
session = self._connect_session()
with self._lock:
self._session = session
self._last_error = ""
if self._ever_connected:
self._reconnect_count += 1
else:
self._ever_connected = True
while not self._closing.is_set():
result = session.recv(timeout_ms=1000)
if result is None:
continue
from_peer, msg_type, payload = result
if msg_type == self._msg_type_error:
with self._lock:
self._last_error = f"hub error from {from_peer}: {payload.decode('utf-8', errors='replace')}"
continue
if msg_type != self._msg_type_text:
continue
snapshot = json.loads(payload.decode("utf-8"))
if snapshot.get("type") != "hub_kcp_snapshot":
continue
now_wall = time.time()
now_mono = time.monotonic()
with self._lock:
self._latest_snapshot = snapshot
self._last_received_wall = now_wall
self._last_received_monotonic = now_mono
self._last_error = ""
except Exception as error: # pragma: no cover - runtime integration path
if not self._closing.is_set():
session_error = ""
if self._session is not None:
try:
session_error = str(dict(self._session.stats()).get("last_server_error", "") or "")
except Exception:
session_error = ""
with self._lock:
self._last_error = session_error or str(error)
finally:
with self._lock:
session = self._session
self._session = None
if self._closing.is_set():
self._started = False
if session is not None:
try:
session.close()
except Exception:
pass
if not self._closing.is_set():
time.sleep(2)
def get_snapshot(self) -> dict[str, Any]:
self.ensure_started()
cfg = load_omnisocket_config().get("telemetry_receiver", {})
stale_after_ms = max(500, int(cfg.get("stale_after_ms", 1500)))
with self._lock:
received_monotonic = self._last_received_monotonic
received_wall = self._last_received_wall
snapshot = self._latest_snapshot
connected = self._session is not None
last_error = self._last_error
reconnect_count = self._reconnect_count
if self._session is not None:
try:
session_stats = dict(self._session.stats())
except Exception:
session_stats = {}
else:
session_stats = {}
stale = True
if received_monotonic > 0.0:
stale = (time.monotonic() - received_monotonic) * 1000.0 > stale_after_ms
return {
"connected": connected,
"updated_at": _utc_from_epoch(received_wall),
"received_at_monotonic": received_monotonic,
"stale": stale,
"peer_id": str(cfg.get("peer_id", "peer-a-telemetry")),
"snapshot": snapshot or {"sessions": []},
"last_error": last_error,
"registered": bool(session_stats.get("registered", 0)),
"last_server_error": str(session_stats.get("last_server_error", "") or ""),
"reconnect_count": reconnect_count,
}
def close(self) -> None:
self._closing.set()
with self._lock:
session = self._session
if session is not None:
try:
session.close()
except Exception:
pass
thread = self._thread
if thread is not None and thread.is_alive():
thread.join(timeout=0.5)
class NetworkTelemetryService:
def __init__(
self,
video_receiver: OmniSocketVideoReceiver,
control_sender: OmniSocketControlSender,
control_ack_tracker: ControlAckTracker,
control_ack_receiver: OmniSocketControlAckReceiver,
control_arbiter: ControlArbiter,
native_ingress: NativeUdpControlIngress,
hub_receiver: HubTelemetryReceiver,
video_display_probe_store: VideoDisplayProbeStore,
) -> None:
self._video_receiver = video_receiver
self._control_sender = control_sender
self._control_ack_tracker = control_ack_tracker
self._control_ack_receiver = control_ack_receiver
self._control_arbiter = control_arbiter
self._native_ingress = native_ingress
self._hub_receiver = hub_receiver
self._video_display_probe_store = video_display_probe_store
self._trend_tracker = KcpTrendTracker()
self._rate_lock = threading.Lock()
self._last_rate_sample: tuple[float, int, int] | None = None
self._sample_thread: threading.Thread | None = None
self._sample_started = False
self._last_remote_snapshot_at = 0.0
self._closing = threading.Event()
def _ensure_started(self) -> None:
self._video_receiver.ensure_started()
self._control_arbiter.ensure_started()
self._control_ack_receiver.ensure_started()
self._native_ingress.ensure_started()
self._hub_receiver.ensure_started()
with self._rate_lock:
if self._sample_started or self._closing.is_set():
return
self._sample_started = True
self._sample_thread = threading.Thread(
target=self._sample_loop,
name="network-telemetry-sampler",
daemon=True,
)
self._sample_thread.start()
def _sample_loop(self) -> None:
interval_seconds = LOCAL_SAMPLE_INTERVAL_MS / 1000.0
while not self._closing.is_set():
try:
self._trend_tracker.add_sample("a_to_d.video", self._video_receiver.session_kcp_stats())
self._trend_tracker.add_sample("a_to_d.control", self._control_sender.session_kcp_stats())
except Exception:
pass
time.sleep(interval_seconds)
def _compute_rates(self, send_bytes: int, recv_bytes: int) -> tuple[float, float]:
now = time.monotonic()
with self._rate_lock:
previous = self._last_rate_sample
self._last_rate_sample = (now, send_bytes, recv_bytes)
if previous is None:
return 0.0, 0.0
prev_time, prev_send, prev_recv = previous
elapsed = now - prev_time
if elapsed <= 0.0:
return 0.0, 0.0
tx_kbps = max(0.0, ((send_bytes - prev_send) * 8.0) / elapsed / 1000.0)
rx_kbps = max(0.0, ((recv_bytes - prev_recv) * 8.0) / elapsed / 1000.0)
return tx_kbps, rx_kbps
def _ingest_remote_snapshot(self, telemetry_state: dict[str, Any]) -> None:
received_at = float(telemetry_state.get("received_at_monotonic") or 0.0)
if received_at <= 0.0 or received_at <= self._last_remote_snapshot_at:
return
snapshot = telemetry_state.get("snapshot") or {}
sessions = snapshot.get("sessions") or []
for session in sessions:
peer_id = str(session.get("peer_id", "")).strip()
if not peer_id:
continue
self._trend_tracker.add_sample(f"hub::{peer_id}", session)
self._last_remote_snapshot_at = received_at
def _build_session_payload(
self,
trend_key: str,
peer_id: str,
app_stats: dict[str, Any] | None,
current_kcp: dict[str, Any] | None,
updated_at: str | None,
stale: bool,
) -> dict[str, Any]:
described = self._trend_tracker.describe(trend_key, current_kcp)
connected = bool(described["kcp"].get("connected"))
if app_stats is not None and "registered" in app_stats:
connected = bool(app_stats.get("registered"))
return {
"peer_id": peer_id,
"connected": connected,
"updated_at": updated_at,
"stale": stale,
"app": app_stats,
"kcp": described["kcp"],
"trend": described["trend"],
}
def _build_link(self, source: str, updated_at: str | None, stale: bool, sessions: dict[str, dict[str, Any]]) -> dict[str, Any]:
session_items = list(sessions.values())
active_sessions = [session for session in session_items if session.get("connected") and not session.get("stale")]
retrans_sum = sum(_coerce_int(session.get("trend", {}).get("retrans_delta")) for session in active_sessions)
out_segs_sum = sum(_coerce_int(session.get("trend", {}).get("out_segs_delta")) for session in active_sessions)
repair_rate_pct = round((retrans_sum / out_segs_sum) * 100.0, 3) if out_segs_sum > 0 else 0.0
return {
"source": source,
"updated_at": updated_at,
"stale": stale,
"aggregate": {
"online_sessions": len(active_sessions),
"max_window_pressure_pct": max(
(_coerce_float(session.get("kcp", {}).get("window_pressure_pct")) for session in active_sessions),
default=0.0,
),
"sum_snd_queue": sum(_coerce_int(session.get("kcp", {}).get("snd_queue")) for session in active_sessions),
"sum_snd_buffer": sum(_coerce_int(session.get("kcp", {}).get("snd_buffer")) for session in active_sessions),
"sum_retrans_delta": retrans_sum,
"sum_out_segs_delta": out_segs_sum,
"repair_rate_pct": repair_rate_pct,
},
"sessions": sessions,
}
def _pick_primary_session(self, links: dict[str, dict[str, Any]]) -> dict[str, Any] | None:
candidates = (
links["a_to_d"]["sessions"]["control"],
links["a_to_d"]["sessions"]["video"],
links["d_to_b"]["sessions"]["control"],
links["d_to_b"]["sessions"]["video"],
)
for session in candidates:
if session.get("connected") and not session.get("stale"):
return session
return None
def _derive_robot_health(
self,
*,
video_receiver_status: dict[str, Any],
local_control_registered: bool,
remote_control_fresh: bool,
remote_video_fresh: bool,
telemetry_state: dict[str, Any],
watchdog_status: dict[str, Any] | None,
) -> dict[str, Any]:
if watchdog_status is not None:
explicit_health = self._derive_robot_health_from_watchdog(watchdog_status)
if explicit_health is not None:
return explicit_health
has_recent_frame = bool(video_receiver_status.get("has_recent_frame"))
telemetry_connected = bool(telemetry_state.get("connected"))
telemetry_stale = bool(telemetry_state.get("stale", True))
if has_recent_frame and remote_control_fresh and remote_video_fresh:
fault_reason = "none"
recovery_state = "ok"
elif not remote_control_fresh and not remote_video_fresh and not has_recent_frame:
fault_reason = "network_or_robot_unreachable"
recovery_state = "recovering" if telemetry_connected and not telemetry_stale else "degraded"
elif remote_control_fresh and not remote_video_fresh:
fault_reason = "video_session_recovering"
recovery_state = "recovering"
elif not remote_control_fresh and local_control_registered:
fault_reason = "control_session_recovering"
recovery_state = "recovering"
elif remote_control_fresh and not has_recent_frame:
fault_reason = "video_pipeline_stalled"
recovery_state = "degraded"
else:
fault_reason = "unknown"
recovery_state = "degraded"
return {
"fault_reason": fault_reason,
"recovery_state": recovery_state,
"confidence": "derived",
"updated_at": utc_iso_now(),
}
def _derive_robot_health_from_watchdog(self, watchdog_status: dict[str, Any]) -> dict[str, Any] | None:
updated_at_epoch_ms = _coerce_int(watchdog_status.get("updated_at_epoch_ms"))
if updated_at_epoch_ms <= 0:
return None
now_epoch_ms = int(time.time() * 1000)
if now_epoch_ms - updated_at_epoch_ms > WATCHDOG_STATUS_STALE_MS:
return None
raw_fault_reason = str(watchdog_status.get("fault_reason", "") or "")
raw_recovery_state = str(watchdog_status.get("recovery_state", "") or "")
normalized_fault_reason = "unknown"
normalized_recovery_state = raw_recovery_state or "degraded"
mapped_health = WATCHDOG_FAULT_REASON_MAP.get(raw_fault_reason)
if mapped_health is not None:
normalized_fault_reason, recovery_override = mapped_health
if recovery_override is not None:
normalized_recovery_state = recovery_override
if raw_recovery_state == "backoff":
normalized_recovery_state = "backoff"
return {
"fault_reason": normalized_fault_reason,
"recovery_state": normalized_recovery_state,
"confidence": "derived",
"updated_at": _utc_from_epoch(updated_at_epoch_ms / 1000.0) or utc_iso_now(),
}
def _derive_latency_estimate(
self,
*,
links: dict[str, dict[str, Any]],
video_receiver_status: dict[str, Any],
display_probe_status: dict[str, Any],
) -> dict[str, Any]:
a_to_d_control_raw = links["a_to_d"]["sessions"]["control"]["kcp"].get("srtt_ms")
d_to_b_control_raw = links["d_to_b"]["sessions"]["control"]["kcp"].get("srtt_ms")
a_to_d_control_min_raw = links["a_to_d"]["sessions"]["control"]["kcp"].get("min_srtt_ms")
d_to_b_control_min_raw = links["d_to_b"]["sessions"]["control"]["kcp"].get("min_srtt_ms")
a_to_d_video_raw = links["a_to_d"]["sessions"]["video"]["kcp"].get("srtt_ms")
d_to_b_video_raw = links["d_to_b"]["sessions"]["video"]["kcp"].get("srtt_ms")
a_to_d_control = _coerce_float(a_to_d_control_raw) if a_to_d_control_raw is not None else None
d_to_b_control = _coerce_float(d_to_b_control_raw) if d_to_b_control_raw is not None else None
a_to_d_control_min = _coerce_float(a_to_d_control_min_raw) if a_to_d_control_min_raw is not None else None
d_to_b_control_min = _coerce_float(d_to_b_control_min_raw) if d_to_b_control_min_raw is not None else None
a_to_d_video = _coerce_float(a_to_d_video_raw) if a_to_d_video_raw is not None else None
d_to_b_video = _coerce_float(d_to_b_video_raw) if d_to_b_video_raw is not None else None
ack_estimate = self._control_ack_tracker.get_latest_estimate()
capture_to_send_raw = video_receiver_status.get("latest_capture_to_send_ms")
request_to_paint_raw = display_probe_status.get("request_to_paint_ms")
capture_to_send_ms = _coerce_float(capture_to_send_raw) if capture_to_send_raw is not None else None
request_to_paint_ms = _coerce_float(request_to_paint_raw) if request_to_paint_raw is not None else None
video_network_oneway_est_ms = (
round((a_to_d_video + d_to_b_video) / 2.0, 3)
if a_to_d_video is not None and d_to_b_video is not None
else None
)
video_partial_est_ms = None
if capture_to_send_ms is not None and video_network_oneway_est_ms is not None:
video_partial_est_ms = round(capture_to_send_ms + video_network_oneway_est_ms, 3)
video_e2e_est_ms = None
if video_partial_est_ms is not None and request_to_paint_ms is not None:
video_e2e_est_ms = round(video_partial_est_ms + request_to_paint_ms, 3)
return {
"control_loop_rtt_ms": ack_estimate.get("control_loop_rtt_ms"),
"control_to_persist_est_ms": ack_estimate.get("control_to_persist_est_ms"),
"control_oneway_srtt_est_ms": (
round((a_to_d_control + d_to_b_control) / 2.0, 3)
if a_to_d_control is not None and d_to_b_control is not None
else None
),
"control_oneway_bestcase_est_ms": (
round((a_to_d_control_min + d_to_b_control_min) / 2.0, 3)
if a_to_d_control_min is not None and d_to_b_control_min is not None
else None
),
"video_network_oneway_est_ms": video_network_oneway_est_ms,
"video_partial_est_ms": video_partial_est_ms,
"video_e2e_est_ms": video_e2e_est_ms,
"estimate_method": {
"control": "ack_loop" if ack_estimate.get("ack_available") else "srtt_fallback",
"video": "capture_to_send+srtt/2+request_to_paint" if video_e2e_est_ms is not None else "capture_to_send+srtt/2",
},
"clock_sync_required": False,
"assumptions": [
"control one-way estimate uses ACK loop when available",
"video one-way estimate uses per-leg SRTT and local paint timing",
],
"confidence": {
"control": "derived_ack" if ack_estimate.get("ack_available") else "fallback_srtt",
"video": "derived_local_probe" if video_e2e_est_ms is not None else "partial_without_probe",
},
}
def get_latest(self) -> dict[str, Any]:
self._ensure_started()
config = load_omnisocket_config()
video_receiver_cfg = config.get("video_receiver", {})
control_sender_cfg = config.get("control_sender", {})
video_sender_cfg = config.get("video_sender", {})
video_app = self._video_receiver.session_stats()
control_app = self._control_sender.session_stats()
video_kcp = self._video_receiver.session_kcp_stats()
control_kcp = self._control_sender.session_kcp_stats()
video_receiver_status = self._video_receiver.get_status()
arbiter_status = self._control_arbiter.get_status()
ingress_status = self._native_ingress.get_status()
sender_status = self._control_sender.get_status()
ack_receiver_status = self._control_ack_receiver.get_status()
ack_status = self._control_ack_tracker.get_latest_estimate()
telemetry_state = self._hub_receiver.get_snapshot()
display_probe_status = self._video_display_probe_store.get_status()
total_send_bytes = int(video_app.get("send_bytes", 0)) + int(control_app.get("send_bytes", 0))
total_recv_bytes = int(video_app.get("recv_bytes", 0)) + int(control_app.get("recv_bytes", 0))
tx_kbps, rx_kbps = self._compute_rates(total_send_bytes, total_recv_bytes)
local_updated_at = utc_iso_now()
local_sessions = {
"video": self._build_session_payload(
"a_to_d.video",
str(video_receiver_cfg.get("peer_id", "peer-a-video")),
video_app,
video_kcp,
local_updated_at,
False,
),
"control": self._build_session_payload(
"a_to_d.control",
str(control_sender_cfg.get("peer_id", "peer-a-ctrl")),
control_app,
control_kcp,
local_updated_at,
False,
),
}
remote_snapshot = telemetry_state.get("snapshot") or {}
remote_sessions_by_peer = {
str(session.get("peer_id", "")).strip(): session
for session in remote_snapshot.get("sessions", []) or []
if str(session.get("peer_id", "")).strip()
}
remote_updated_at = telemetry_state.get("updated_at")
remote_stale = bool(telemetry_state.get("stale", True))
remote_sessions = {
"video": self._build_session_payload(
f"hub::{str(video_sender_cfg.get('peer_id', 'peer-b-video'))}",
str(video_sender_cfg.get("peer_id", "peer-b-video")),
None,
remote_sessions_by_peer.get(str(video_sender_cfg.get("peer_id", "peer-b-video")), {}),
remote_updated_at,
remote_stale,
),
"control": self._build_session_payload(
f"hub::{str(control_sender_cfg.get('target_peer', 'peer-b-ctrl'))}",
str(control_sender_cfg.get("target_peer", "peer-b-ctrl")),
None,
remote_sessions_by_peer.get(str(control_sender_cfg.get("target_peer", "peer-b-ctrl")), {}),
remote_updated_at,
remote_stale,
),
}
self._video_receiver.update_remote_video_srtt(
_coerce_int(remote_sessions["video"]["kcp"].get("srtt_ms")) if remote_sessions["video"]["kcp"].get("srtt_ms") is not None else None
)
links = {
"a_to_d": self._build_link("local-a-side", local_updated_at, False, local_sessions),
"d_to_b": self._build_link("hub-telemetry", remote_updated_at, remote_stale, remote_sessions),
}
primary_session = self._pick_primary_session(links)
primary_kcp = dict(primary_session.get("kcp", {})) if primary_session is not None else {}
self._ingest_remote_snapshot(telemetry_state)
fresh_connected_sessions = (
links["a_to_d"]["aggregate"]["online_sessions"] + links["d_to_b"]["aggregate"]["online_sessions"]
)
latency_ms = primary_kcp.get("srtt_ms") if primary_session is not None else None
jitter_ms = primary_kcp.get("srttvar_ms") if primary_session is not None else None
local_control_registered = bool(control_app.get("registered", 0))
remote_control_fresh = bool(remote_sessions["control"].get("connected")) and not bool(remote_sessions["control"].get("stale"))
remote_video_fresh = bool(remote_sessions["video"].get("connected")) and not bool(remote_sessions["video"].get("stale"))
watchdog_status = _load_optional_json(WATCHDOG_STATUS_PATH)
robot_health = self._derive_robot_health(
video_receiver_status=video_receiver_status,
local_control_registered=local_control_registered,
remote_control_fresh=remote_control_fresh,
remote_video_fresh=remote_video_fresh,
telemetry_state=telemetry_state,
watchdog_status=watchdog_status,
)
latency_estimate = self._derive_latency_estimate(
links=links,
video_receiver_status=video_receiver_status,
display_probe_status=display_probe_status,
)
if local_control_registered and remote_control_fresh:
peer_status = "online"
elif local_control_registered or bool(local_sessions["video"].get("connected")):
peer_status = "degraded"
elif sender_status.get("backend_ready"):
peer_status = "idle"
else:
peer_status = "backend-unavailable"
return {
"peer_status": peer_status,
"latency_ms": latency_ms,
"jitter_ms": jitter_ms,
"packet_loss_pct": None,
"tx_kbps": round(tx_kbps, 3),
"rx_kbps": round(rx_kbps, 3),
"transport": "OmniSocket / kcp",
"source_mode": "omnisocket-live" if fresh_connected_sessions > 0 else "omnisocket-idle",
"updated_at": utc_iso_now(),
"active_control_source": arbiter_status["active_source"],
"control_lease_remaining_ms": arbiter_status["control_lease_remaining_ms"],
"combined": {
"connected_sessions": fresh_connected_sessions,
"send_bytes": total_send_bytes,
"recv_bytes": total_recv_bytes,
"tx_kbps": round(tx_kbps, 3),
"rx_kbps": round(rx_kbps, 3),
},
"sessions": {
"video": {
"app": video_app,
"kcp": local_sessions["video"]["kcp"],
},
"control": {
"app": control_app,
"kcp": local_sessions["control"]["kcp"],
},
},
"links": links,
"latency_estimate": latency_estimate,
"video_freshness": video_receiver_status.get("freshness", {}),
"control_ack_status": {
**ack_status,
"receiver": ack_receiver_status,
},
"telemetry_receiver": {
"hub_connected": bool(telemetry_state.get("connected")),
"hub_updated_at": telemetry_state.get("updated_at"),
"hub_stale": remote_stale,
"last_error": telemetry_state.get("last_error", ""),
"peer_id": telemetry_state.get("peer_id", ""),
"registered": bool(telemetry_state.get("registered", False)),
"last_server_error": str(telemetry_state.get("last_server_error", "") or ""),
"reconnect_count": int(telemetry_state.get("reconnect_count", 0)),
},
"robot_health": robot_health,
"ingress": {
"native_udp": ingress_status,
},
"control": {
"arbiter": arbiter_status,
"sender": sender_status,
"ack_receiver": ack_receiver_status,
},
}
def close(self) -> None:
self._closing.set()
thread = self._sample_thread
if thread is not None and thread.is_alive():
thread.join(timeout=0.5)