Files
robot-command-center/backend/monitoring/telemetry.py

617 lines
24 KiB
Python

from __future__ import annotations
from collections import deque
import json
import math
import sys
import threading
import time
from datetime import datetime, timezone
from typing import Any
from .common import (
GEOSTREAM_JSON_PATH,
GEOSTREAM_STALE_SECONDS,
WORKSPACE_ROOT,
load_omnisocket_config,
utc_iso_now,
)
from .control import ControlArbiter, NativeUdpControlIngress, OmniSocketControlSender
from .video import OmniSocketVideoReceiver
LOCAL_SAMPLE_INTERVAL_MS = 500
TREND_HISTORY_SIZE = 10
TREND_WINDOW_SIZE = 5
def _utc_from_epoch(epoch_seconds: float | None) -> str | None:
if epoch_seconds is None or epoch_seconds <= 0.0:
return None
return datetime.fromtimestamp(epoch_seconds, timezone.utc).isoformat(timespec="seconds").replace("+00:00", "Z")
def _coerce_int(value: Any, default: int = 0) -> int:
try:
if value is None:
return default
return int(value)
except (TypeError, ValueError):
return default
def _coerce_float(value: Any, default: float = 0.0) -> float:
try:
if value is None:
return default
return float(value)
except (TypeError, ValueError):
return default
class GpsDataService:
def get_latest(self) -> dict[str, Any]:
payload = self._read_geostream_payload()
if payload is not None:
payload["source_mode"] = "geostream-json"
payload["updated_at"] = utc_iso_now()
return payload
return self._build_simulated_payload()
def _read_geostream_payload(self) -> dict[str, Any] | None:
if not GEOSTREAM_JSON_PATH.exists():
return None
age_seconds = time.time() - GEOSTREAM_JSON_PATH.stat().st_mtime
if age_seconds > GEOSTREAM_STALE_SECONDS:
return None
try:
with GEOSTREAM_JSON_PATH.open("r", encoding="utf-8") as file:
return json.load(file)
except (OSError, json.JSONDecodeError):
return None
def _build_simulated_payload(self) -> dict[str, Any]:
tick = time.time() / 12.0
latitude = 31.2304 + math.sin(tick) * 0.0014
longitude = 121.4737 + math.cos(tick) * 0.0018
return {
"has_fix": True,
"utc_time": datetime.now(timezone.utc).strftime("%H:%M:%S"),
"latitude": round(latitude, 6),
"longitude": round(longitude, 6),
"satellites": 14 + int((math.sin(tick * 0.7) + 1.0) * 2),
"altitude_m": round(6.5 + math.cos(tick * 0.5) * 1.2, 2),
"coordinate_system": "WGS84",
"source_sentence": "SIMULATED",
"raw_coordinate_format": "decimal degrees",
"source_mode": "simulated",
"updated_at": utc_iso_now(),
}
class KcpTrendTracker:
def __init__(self) -> None:
self._lock = threading.Lock()
self._samples: dict[str, deque[dict[str, Any]]] = {}
def _normalize(self, stats: dict[str, Any] | None) -> dict[str, Any]:
raw = dict(stats or {})
snd_wnd = _coerce_int(raw.get("snd_wnd"))
rmt_wnd = _coerce_int(raw.get("rmt_wnd"))
inflight = _coerce_int(raw.get("inflight"))
window_limit = _coerce_int(raw.get("window_limit"), min(snd_wnd, rmt_wnd) if snd_wnd and rmt_wnd else 0)
return {
"connected": _coerce_int(raw.get("connected")),
"conv": _coerce_int(raw.get("conv")),
"rto_ms": _coerce_int(raw.get("rto_ms")),
"srtt_ms": _coerce_int(raw.get("srtt_ms")),
"srttvar_ms": _coerce_int(raw.get("srttvar_ms")),
"snd_wnd": snd_wnd,
"rmt_wnd": rmt_wnd,
"inflight": inflight,
"window_limit": window_limit,
"window_pressure_pct": round(_coerce_float(raw.get("window_pressure_pct")), 3),
"snd_queue": _coerce_int(raw.get("snd_queue")),
"rcv_queue": _coerce_int(raw.get("rcv_queue")),
"snd_buffer": _coerce_int(raw.get("snd_buffer")),
"out_segs_total": _coerce_int(raw.get("out_segs_total")),
"retrans_total": _coerce_int(raw.get("retrans_total")),
"fast_retrans_total": _coerce_int(raw.get("fast_retrans_total")),
"lost_total": _coerce_int(raw.get("lost_total")),
"repeat_total": _coerce_int(raw.get("repeat_total")),
"xmit_total": _coerce_int(raw.get("xmit_total")),
}
def add_sample(self, key: str, stats: dict[str, Any] | None) -> None:
sample = {
"ts_monotonic": time.monotonic(),
"updated_at": utc_iso_now(),
"stats": self._normalize(stats),
}
with self._lock:
history = self._samples.setdefault(key, deque(maxlen=TREND_HISTORY_SIZE))
history.append(sample)
def latest_updated_at(self, key: str) -> str | None:
with self._lock:
history = self._samples.get(key)
if not history:
return None
return str(history[-1].get("updated_at") or "")
def describe(self, key: str, current_stats: dict[str, Any] | None) -> dict[str, Any]:
current = self._normalize(current_stats)
with self._lock:
history = list(self._samples.get(key, ()))
timeline = history + [{"stats": current, "updated_at": utc_iso_now()}]
previous = timeline[-2]["stats"] if len(timeline) >= 2 else None
trend_window = [entry["stats"] for entry in timeline[-TREND_WINDOW_SIZE:]]
deadband = max(2.0, 0.05 * float(max(current.get("window_limit", 0), 1)))
snd_queue_delta = 0
snd_buffer_delta = 0
retrans_delta = 0
fast_retrans_delta = 0
lost_delta = 0
repeat_delta = 0
out_segs_delta = 0
if previous is not None:
snd_queue_delta = max(0, current["snd_queue"] - _coerce_int(previous.get("snd_queue")))
snd_buffer_delta = max(0, current["snd_buffer"] - _coerce_int(previous.get("snd_buffer")))
retrans_delta = max(0, current["retrans_total"] - _coerce_int(previous.get("retrans_total")))
fast_retrans_delta = max(0, current["fast_retrans_total"] - _coerce_int(previous.get("fast_retrans_total")))
lost_delta = max(0, current["lost_total"] - _coerce_int(previous.get("lost_total")))
repeat_delta = max(0, current["repeat_total"] - _coerce_int(previous.get("repeat_total")))
out_segs_delta = max(0, current["out_segs_total"] - _coerce_int(previous.get("out_segs_total")))
def classify(field: str) -> str:
if len(trend_window) < 2:
return "stable"
oldest = float(_coerce_int(trend_window[0].get(field)))
newest = float(_coerce_int(trend_window[-1].get(field)))
delta = newest - oldest
if abs(delta) < deadband:
return "stable"
return "rising" if delta > 0 else "falling"
repair_rate_pct = 0.0
if out_segs_delta > 0:
repair_rate_pct = round((retrans_delta / out_segs_delta) * 100.0, 3)
return {
"kcp": current,
"trend": {
"snd_queue_delta": snd_queue_delta,
"snd_buffer_delta": snd_buffer_delta,
"snd_queue_trend": classify("snd_queue"),
"snd_buffer_trend": classify("snd_buffer"),
"retrans_delta": retrans_delta,
"fast_retrans_delta": fast_retrans_delta,
"lost_delta": lost_delta,
"repeat_delta": repeat_delta,
"out_segs_delta": out_segs_delta,
"repair_rate_pct": repair_rate_pct,
},
}
class HubTelemetryReceiver:
def __init__(self) -> None:
self._lock = threading.Lock()
self._thread: threading.Thread | None = None
self._started = False
self._session = None
self._session_cls = None
self._msg_type_text = None
self._msg_type_error = None
self._telemetry_defaults: dict[str, Any] = {}
self._latest_snapshot: dict[str, Any] | None = None
self._last_error = ""
self._last_received_wall = 0.0
self._last_received_monotonic = 0.0
self._load_backend()
def _load_backend(self) -> None:
try:
self._import_backend()
except Exception as error: # pragma: no cover - optional runtime dependency
self._last_error = f"omnisocket import failed: {error}"
def _import_backend(self) -> None:
try:
from omnisocket import MSG_TYPE_ERROR, MSG_TYPE_TEXT, Session, TELEMETRY_DEFAULTS # type: ignore
except ImportError:
python_dir = WORKSPACE_ROOT / "OmniSocketGo" / "python"
if python_dir.exists():
sys.path.insert(0, str(python_dir))
from omnisocket import MSG_TYPE_ERROR, MSG_TYPE_TEXT, Session, TELEMETRY_DEFAULTS # type: ignore
self._msg_type_error = MSG_TYPE_ERROR
self._msg_type_text = MSG_TYPE_TEXT
self._session_cls = Session
self._telemetry_defaults = dict(TELEMETRY_DEFAULTS)
def _connect_session(self):
assert self._session_cls is not None
config = load_omnisocket_config()
transport_cfg = config.get("transport", {})
telemetry_cfg = config.get("telemetry_receiver", {})
session = self._session_cls()
session.connect(
server_addr=str(transport_cfg.get("server_addr", "127.0.0.1:10909")),
peer_id=str(telemetry_cfg.get("peer_id", "peer-a-telemetry")),
relay_via=str(transport_cfg.get("relay_via", "")),
bind_ip=str(transport_cfg.get("bind_ip", "")),
bind_device=str(transport_cfg.get("bind_device", "")),
**self._telemetry_defaults,
)
return session
def ensure_started(self) -> None:
if self._session_cls is None:
return
with self._lock:
if self._started:
return
self._started = True
self._thread = threading.Thread(
target=self._run,
name="hub-telemetry-receiver",
daemon=True,
)
self._thread.start()
def _run(self) -> None:
while True:
try:
session = self._connect_session()
with self._lock:
self._session = session
self._last_error = ""
while True:
result = session.recv(timeout_ms=1000)
if result is None:
continue
from_peer, msg_type, payload = result
if msg_type == self._msg_type_error:
with self._lock:
self._last_error = f"hub error from {from_peer}: {payload.decode('utf-8', errors='replace')}"
continue
if msg_type != self._msg_type_text:
continue
snapshot = json.loads(payload.decode("utf-8"))
if snapshot.get("type") != "hub_kcp_snapshot":
continue
now_wall = time.time()
now_mono = time.monotonic()
with self._lock:
self._latest_snapshot = snapshot
self._last_received_wall = now_wall
self._last_received_monotonic = now_mono
self._last_error = ""
except Exception as error: # pragma: no cover - runtime integration path
with self._lock:
self._last_error = str(error)
finally:
with self._lock:
session = self._session
self._session = None
if session is not None:
try:
session.close()
except Exception:
pass
time.sleep(2)
def get_snapshot(self) -> dict[str, Any]:
self.ensure_started()
cfg = load_omnisocket_config().get("telemetry_receiver", {})
stale_after_ms = max(500, int(cfg.get("stale_after_ms", 1500)))
with self._lock:
received_monotonic = self._last_received_monotonic
received_wall = self._last_received_wall
snapshot = self._latest_snapshot
connected = self._session is not None
last_error = self._last_error
stale = True
if received_monotonic > 0.0:
stale = (time.monotonic() - received_monotonic) * 1000.0 > stale_after_ms
return {
"connected": connected,
"updated_at": _utc_from_epoch(received_wall),
"received_at_monotonic": received_monotonic,
"stale": stale,
"peer_id": str(cfg.get("peer_id", "peer-a-telemetry")),
"snapshot": snapshot or {"sessions": []},
"last_error": last_error,
}
class NetworkTelemetryService:
def __init__(
self,
video_receiver: OmniSocketVideoReceiver,
control_sender: OmniSocketControlSender,
control_arbiter: ControlArbiter,
native_ingress: NativeUdpControlIngress,
hub_receiver: HubTelemetryReceiver,
) -> None:
self._video_receiver = video_receiver
self._control_sender = control_sender
self._control_arbiter = control_arbiter
self._native_ingress = native_ingress
self._hub_receiver = hub_receiver
self._trend_tracker = KcpTrendTracker()
self._rate_lock = threading.Lock()
self._last_rate_sample: tuple[float, int, int] | None = None
self._sample_thread: threading.Thread | None = None
self._sample_started = False
self._last_remote_snapshot_at = 0.0
def _ensure_started(self) -> None:
self._video_receiver.ensure_started()
self._control_arbiter.ensure_started()
self._native_ingress.ensure_started()
self._hub_receiver.ensure_started()
with self._rate_lock:
if self._sample_started:
return
self._sample_started = True
self._sample_thread = threading.Thread(
target=self._sample_loop,
name="network-telemetry-sampler",
daemon=True,
)
self._sample_thread.start()
def _sample_loop(self) -> None:
interval_seconds = LOCAL_SAMPLE_INTERVAL_MS / 1000.0
while True:
try:
self._trend_tracker.add_sample("a_to_d.video", self._video_receiver.session_kcp_stats())
self._trend_tracker.add_sample("a_to_d.control", self._control_sender.session_kcp_stats())
except Exception:
pass
time.sleep(interval_seconds)
def _compute_rates(self, send_bytes: int, recv_bytes: int) -> tuple[float, float]:
now = time.monotonic()
with self._rate_lock:
previous = self._last_rate_sample
self._last_rate_sample = (now, send_bytes, recv_bytes)
if previous is None:
return 0.0, 0.0
prev_time, prev_send, prev_recv = previous
elapsed = now - prev_time
if elapsed <= 0.0:
return 0.0, 0.0
tx_kbps = max(0.0, ((send_bytes - prev_send) * 8.0) / elapsed / 1000.0)
rx_kbps = max(0.0, ((recv_bytes - prev_recv) * 8.0) / elapsed / 1000.0)
return tx_kbps, rx_kbps
def _ingest_remote_snapshot(self, telemetry_state: dict[str, Any]) -> None:
received_at = float(telemetry_state.get("received_at_monotonic") or 0.0)
if received_at <= 0.0 or received_at <= self._last_remote_snapshot_at:
return
snapshot = telemetry_state.get("snapshot") or {}
sessions = snapshot.get("sessions") or []
for session in sessions:
peer_id = str(session.get("peer_id", "")).strip()
if not peer_id:
continue
self._trend_tracker.add_sample(f"hub::{peer_id}", session)
self._last_remote_snapshot_at = received_at
def _build_session_payload(
self,
trend_key: str,
peer_id: str,
app_stats: dict[str, Any] | None,
current_kcp: dict[str, Any] | None,
updated_at: str | None,
stale: bool,
) -> dict[str, Any]:
described = self._trend_tracker.describe(trend_key, current_kcp)
return {
"peer_id": peer_id,
"connected": bool(described["kcp"].get("connected")),
"updated_at": updated_at,
"stale": stale,
"app": app_stats,
"kcp": described["kcp"],
"trend": described["trend"],
}
def _build_link(self, source: str, updated_at: str | None, stale: bool, sessions: dict[str, dict[str, Any]]) -> dict[str, Any]:
session_items = list(sessions.values())
active_sessions = [session for session in session_items if session.get("connected") and not session.get("stale")]
retrans_sum = sum(_coerce_int(session.get("trend", {}).get("retrans_delta")) for session in active_sessions)
out_segs_sum = sum(_coerce_int(session.get("trend", {}).get("out_segs_delta")) for session in active_sessions)
repair_rate_pct = round((retrans_sum / out_segs_sum) * 100.0, 3) if out_segs_sum > 0 else 0.0
return {
"source": source,
"updated_at": updated_at,
"stale": stale,
"aggregate": {
"online_sessions": len(active_sessions),
"max_window_pressure_pct": max(
(_coerce_float(session.get("kcp", {}).get("window_pressure_pct")) for session in active_sessions),
default=0.0,
),
"sum_snd_queue": sum(_coerce_int(session.get("kcp", {}).get("snd_queue")) for session in active_sessions),
"sum_snd_buffer": sum(_coerce_int(session.get("kcp", {}).get("snd_buffer")) for session in active_sessions),
"sum_retrans_delta": retrans_sum,
"sum_out_segs_delta": out_segs_sum,
"repair_rate_pct": repair_rate_pct,
},
"sessions": sessions,
}
def _pick_primary_session(self, links: dict[str, dict[str, Any]]) -> dict[str, Any] | None:
candidates = (
links["a_to_d"]["sessions"]["control"],
links["a_to_d"]["sessions"]["video"],
links["d_to_b"]["sessions"]["control"],
links["d_to_b"]["sessions"]["video"],
)
for session in candidates:
if session.get("connected") and not session.get("stale"):
return session
return None
def get_latest(self) -> dict[str, Any]:
self._ensure_started()
config = load_omnisocket_config()
video_receiver_cfg = config.get("video_receiver", {})
control_sender_cfg = config.get("control_sender", {})
video_sender_cfg = config.get("video_sender", {})
video_app = self._video_receiver.session_stats()
control_app = self._control_sender.session_stats()
video_kcp = self._video_receiver.session_kcp_stats()
control_kcp = self._control_sender.session_kcp_stats()
arbiter_status = self._control_arbiter.get_status()
ingress_status = self._native_ingress.get_status()
sender_status = self._control_sender.get_status()
telemetry_state = self._hub_receiver.get_snapshot()
total_send_bytes = int(video_app.get("send_bytes", 0)) + int(control_app.get("send_bytes", 0))
total_recv_bytes = int(video_app.get("recv_bytes", 0)) + int(control_app.get("recv_bytes", 0))
tx_kbps, rx_kbps = self._compute_rates(total_send_bytes, total_recv_bytes)
local_updated_at = utc_iso_now()
local_sessions = {
"video": self._build_session_payload(
"a_to_d.video",
str(video_receiver_cfg.get("peer_id", "peer-a-video")),
video_app,
video_kcp,
local_updated_at,
False,
),
"control": self._build_session_payload(
"a_to_d.control",
str(control_sender_cfg.get("peer_id", "peer-a-ctrl")),
control_app,
control_kcp,
local_updated_at,
False,
),
}
remote_snapshot = telemetry_state.get("snapshot") or {}
remote_sessions_by_peer = {
str(session.get("peer_id", "")).strip(): session
for session in remote_snapshot.get("sessions", []) or []
if str(session.get("peer_id", "")).strip()
}
remote_updated_at = telemetry_state.get("updated_at")
remote_stale = bool(telemetry_state.get("stale", True))
remote_sessions = {
"video": self._build_session_payload(
f"hub::{str(video_sender_cfg.get('peer_id', 'peer-b-video'))}",
str(video_sender_cfg.get("peer_id", "peer-b-video")),
None,
remote_sessions_by_peer.get(str(video_sender_cfg.get("peer_id", "peer-b-video")), {}),
remote_updated_at,
remote_stale,
),
"control": self._build_session_payload(
f"hub::{str(control_sender_cfg.get('target_peer', 'peer-b-ctrl'))}",
str(control_sender_cfg.get("target_peer", "peer-b-ctrl")),
None,
remote_sessions_by_peer.get(str(control_sender_cfg.get("target_peer", "peer-b-ctrl")), {}),
remote_updated_at,
remote_stale,
),
}
links = {
"a_to_d": self._build_link("local-a-side", local_updated_at, False, local_sessions),
"d_to_b": self._build_link("hub-telemetry", remote_updated_at, remote_stale, remote_sessions),
}
primary_session = self._pick_primary_session(links)
primary_kcp = dict(primary_session.get("kcp", {})) if primary_session is not None else {}
self._ingest_remote_snapshot(telemetry_state)
fresh_connected_sessions = (
links["a_to_d"]["aggregate"]["online_sessions"] + links["d_to_b"]["aggregate"]["online_sessions"]
)
latency_ms = primary_kcp.get("srtt_ms") if primary_session is not None else None
jitter_ms = primary_kcp.get("srttvar_ms") if primary_session is not None else None
if fresh_connected_sessions > 0:
peer_status = "online"
elif sender_status.get("backend_ready"):
peer_status = "idle"
else:
peer_status = "backend-unavailable"
return {
"peer_status": peer_status,
"latency_ms": latency_ms,
"jitter_ms": jitter_ms,
"packet_loss_pct": None,
"tx_kbps": round(tx_kbps, 3),
"rx_kbps": round(rx_kbps, 3),
"transport": "OmniSocket / kcp",
"source_mode": "omnisocket-live" if fresh_connected_sessions > 0 else "omnisocket-idle",
"updated_at": utc_iso_now(),
"active_control_source": arbiter_status["active_source"],
"control_lease_remaining_ms": arbiter_status["control_lease_remaining_ms"],
"combined": {
"connected_sessions": fresh_connected_sessions,
"send_bytes": total_send_bytes,
"recv_bytes": total_recv_bytes,
"tx_kbps": round(tx_kbps, 3),
"rx_kbps": round(rx_kbps, 3),
},
"sessions": {
"video": {
"app": video_app,
"kcp": local_sessions["video"]["kcp"],
},
"control": {
"app": control_app,
"kcp": local_sessions["control"]["kcp"],
},
},
"links": links,
"telemetry_receiver": {
"hub_connected": bool(telemetry_state.get("connected")),
"hub_updated_at": telemetry_state.get("updated_at"),
"hub_stale": remote_stale,
"last_error": telemetry_state.get("last_error", ""),
"peer_id": telemetry_state.get("peer_id", ""),
},
"ingress": {
"native_udp": ingress_status,
},
"control": {
"arbiter": arbiter_status,
"sender": sender_status,
},
}