diff --git a/backend/monitoring/control.py b/backend/monitoring/control.py index 146a81d..0b8e505 100644 --- a/backend/monitoring/control.py +++ b/backend/monitoring/control.py @@ -33,6 +33,9 @@ class OmniSocketControlSender: self._send_errors = 0 self._drain_errors = 0 self._last_error = "" + self._reconnect_count = 0 + self._ever_connected = False + self._registered = False self._load_backend() def _load_backend(self) -> None: @@ -78,6 +81,8 @@ class OmniSocketControlSender: return with self._lock: + if self._closing.is_set(): + return if self._started and self._session is not None: return session, target_peer = self._connect_session() @@ -86,6 +91,11 @@ class OmniSocketControlSender: self._closing.clear() self._started = True self._last_error = "" + self._registered = bool(dict(session.stats()).get("registered", 0)) + if self._ever_connected: + self._reconnect_count += 1 + else: + self._ever_connected = True self._drain_thread = threading.Thread( target=self._drain_loop, name="omnisocket-control-drain", @@ -100,6 +110,7 @@ class OmniSocketControlSender: current = self._session self._session = None self._started = False + self._registered = False if current is not None: try: current.close() @@ -146,31 +157,50 @@ class OmniSocketControlSender: try: result = session.recv(timeout_ms=100) except Exception as error: + last_server_error = "" + try: + last_server_error = str(dict(session.stats()).get("last_server_error", "") or "") + except Exception: + last_server_error = "" with self._lock: self._drain_errors += 1 - self._last_error = str(error) + self._registered = False + self._last_error = last_server_error or str(error) if not self._closing.is_set(): self._reset_session(session) return if result is None: + try: + stats = dict(session.stats()) + except Exception: + stats = {} + with self._lock: + self._registered = bool(stats.get("registered", 0)) + if stats.get("last_server_error"): + self._last_error = str(stats.get("last_server_error")) continue from_peer, msg_type, payload = result if msg_type == self._msg_type_error: text = payload.decode("utf-8", errors="replace") + try: + stats = dict(session.stats()) + except Exception: + stats = {} with self._lock: self._last_error = f"server error from {from_peer}: {text}" + self._registered = bool(stats.get("registered", 0)) def session_stats(self) -> dict[str, Any]: with self._lock: session = self._session if session is None: - return {"connected": 0} + return {"connected": 0, "registered": 0, "last_server_error": self._last_error} try: return dict(session.stats()) except Exception: - return {"connected": 0} + return {"connected": 0, "registered": 0, "last_server_error": self._last_error} def session_kcp_stats(self) -> dict[str, Any]: with self._lock: @@ -180,16 +210,20 @@ class OmniSocketControlSender: def get_status(self) -> dict[str, Any]: config = load_omnisocket_config() control_cfg = config.get("control_sender", {}) + session_stats = self.session_stats() with self._lock: return { "backend_ready": self._session_cls is not None, "started": self._started, "connected": self._session is not None, + "registered": bool(session_stats.get("registered", 0)), "peer_id": str(control_cfg.get("peer_id", "")), "target_peer": str(control_cfg.get("target_peer", "")), "send_count": self._send_count, "send_errors": self._send_errors, "drain_errors": self._drain_errors, + "reconnect_count": self._reconnect_count, + "last_server_error": str(session_stats.get("last_server_error", "") or ""), "last_error": self._last_error, } @@ -228,6 +262,8 @@ class ControlArbiter: def ensure_started(self) -> None: self._load_config() with self._lock: + if self._closing.is_set(): + return if self._started: return self._started = True diff --git a/backend/monitoring/services.py b/backend/monitoring/services.py index 3940c8e..677029a 100644 --- a/backend/monitoring/services.py +++ b/backend/monitoring/services.py @@ -1,5 +1,7 @@ from __future__ import annotations +import atexit + from .control import ControlArbiter, NativeUdpControlIngress, OmniSocketControlSender from .telemetry import GpsDataService, HubTelemetryReceiver, NetworkTelemetryService from .video import OmniSocketVideoReceiver, VideoFrameService @@ -22,3 +24,21 @@ network_service = NetworkTelemetryService( _hub_telemetry_receiver, ) + +def shutdown_monitoring_services() -> None: + for closer in ( + network_service.close, + native_control_ingress.close, + control_arbiter.close, + _hub_telemetry_receiver.close, + _video_receiver.close, + _control_sender.close, + ): + try: + closer() + except Exception: + pass + + +atexit.register(shutdown_monitoring_services) + diff --git a/backend/monitoring/telemetry.py b/backend/monitoring/telemetry.py index 48bd784..64a7a48 100644 --- a/backend/monitoring/telemetry.py +++ b/backend/monitoring/telemetry.py @@ -214,6 +214,9 @@ class HubTelemetryReceiver: self._last_error = "" self._last_received_wall = 0.0 self._last_received_monotonic = 0.0 + self._reconnect_count = 0 + self._ever_connected = False + self._closing = threading.Event() self._load_backend() def _load_backend(self) -> None: @@ -259,7 +262,7 @@ class HubTelemetryReceiver: return with self._lock: - if self._started: + if self._started or self._closing.is_set(): return self._started = True self._thread = threading.Thread( @@ -270,14 +273,18 @@ class HubTelemetryReceiver: self._thread.start() def _run(self) -> None: - while True: + while not self._closing.is_set(): try: session = self._connect_session() with self._lock: self._session = session self._last_error = "" + if self._ever_connected: + self._reconnect_count += 1 + else: + self._ever_connected = True - while True: + while not self._closing.is_set(): result = session.recv(timeout_ms=1000) if result is None: continue @@ -302,18 +309,28 @@ class HubTelemetryReceiver: self._last_received_monotonic = now_mono self._last_error = "" except Exception as error: # pragma: no cover - runtime integration path - with self._lock: - self._last_error = str(error) + if not self._closing.is_set(): + session_error = "" + if self._session is not None: + try: + session_error = str(dict(self._session.stats()).get("last_server_error", "") or "") + except Exception: + session_error = "" + with self._lock: + self._last_error = session_error or str(error) finally: with self._lock: session = self._session self._session = None + if self._closing.is_set(): + self._started = False if session is not None: try: session.close() except Exception: pass - time.sleep(2) + if not self._closing.is_set(): + time.sleep(2) def get_snapshot(self) -> dict[str, Any]: self.ensure_started() @@ -326,6 +343,14 @@ class HubTelemetryReceiver: snapshot = self._latest_snapshot connected = self._session is not None last_error = self._last_error + reconnect_count = self._reconnect_count + if self._session is not None: + try: + session_stats = dict(self._session.stats()) + except Exception: + session_stats = {} + else: + session_stats = {} stale = True if received_monotonic > 0.0: @@ -339,8 +364,24 @@ class HubTelemetryReceiver: "peer_id": str(cfg.get("peer_id", "peer-a-telemetry")), "snapshot": snapshot or {"sessions": []}, "last_error": last_error, + "registered": bool(session_stats.get("registered", 0)), + "last_server_error": str(session_stats.get("last_server_error", "") or ""), + "reconnect_count": reconnect_count, } + def close(self) -> None: + self._closing.set() + with self._lock: + session = self._session + if session is not None: + try: + session.close() + except Exception: + pass + thread = self._thread + if thread is not None and thread.is_alive(): + thread.join(timeout=0.5) + class NetworkTelemetryService: def __init__( @@ -362,6 +403,7 @@ class NetworkTelemetryService: self._sample_thread: threading.Thread | None = None self._sample_started = False self._last_remote_snapshot_at = 0.0 + self._closing = threading.Event() def _ensure_started(self) -> None: self._video_receiver.ensure_started() @@ -369,7 +411,7 @@ class NetworkTelemetryService: self._native_ingress.ensure_started() self._hub_receiver.ensure_started() with self._rate_lock: - if self._sample_started: + if self._sample_started or self._closing.is_set(): return self._sample_started = True self._sample_thread = threading.Thread( @@ -381,7 +423,7 @@ class NetworkTelemetryService: def _sample_loop(self) -> None: interval_seconds = LOCAL_SAMPLE_INTERVAL_MS / 1000.0 - while True: + while not self._closing.is_set(): try: self._trend_tracker.add_sample("a_to_d.video", self._video_receiver.session_kcp_stats()) self._trend_tracker.add_sample("a_to_d.control", self._control_sender.session_kcp_stats()) @@ -431,9 +473,12 @@ class NetworkTelemetryService: stale: bool, ) -> dict[str, Any]: described = self._trend_tracker.describe(trend_key, current_kcp) + connected = bool(described["kcp"].get("connected")) + if app_stats is not None and "registered" in app_stats: + connected = bool(app_stats.get("registered")) return { "peer_id": peer_id, - "connected": bool(described["kcp"].get("connected")), + "connected": connected, "updated_at": updated_at, "stale": stale, "app": app_stats, @@ -561,9 +606,13 @@ class NetworkTelemetryService: ) latency_ms = primary_kcp.get("srtt_ms") if primary_session is not None else None jitter_ms = primary_kcp.get("srttvar_ms") if primary_session is not None else None + local_control_registered = bool(control_app.get("registered", 0)) + remote_control_fresh = bool(remote_sessions["control"].get("connected")) and not bool(remote_sessions["control"].get("stale")) - if fresh_connected_sessions > 0: + if local_control_registered and remote_control_fresh: peer_status = "online" + elif local_control_registered or bool(local_sessions["video"].get("connected")): + peer_status = "degraded" elif sender_status.get("backend_ready"): peer_status = "idle" else: @@ -605,6 +654,9 @@ class NetworkTelemetryService: "hub_stale": remote_stale, "last_error": telemetry_state.get("last_error", ""), "peer_id": telemetry_state.get("peer_id", ""), + "registered": bool(telemetry_state.get("registered", False)), + "last_server_error": str(telemetry_state.get("last_server_error", "") or ""), + "reconnect_count": int(telemetry_state.get("reconnect_count", 0)), }, "ingress": { "native_udp": ingress_status, @@ -614,3 +666,9 @@ class NetworkTelemetryService: "sender": sender_status, }, } + + def close(self) -> None: + self._closing.set() + thread = self._sample_thread + if thread is not None and thread.is_alive(): + thread.join(timeout=0.5) diff --git a/backend/monitoring/video.py b/backend/monitoring/video.py index 38abeba..e183fc7 100644 --- a/backend/monitoring/video.py +++ b/backend/monitoring/video.py @@ -50,6 +50,9 @@ class OmniSocketVideoReceiver: self._latency_samples_ms: deque[float] = deque(maxlen=VIDEO_TIMESTAMP_SAMPLE_SIZE) self._frames_received = 0 self._last_error = "" + self._reconnect_count = 0 + self._ever_connected = False + self._closing = threading.Event() self._load_backend() def _load_backend(self) -> None: @@ -76,7 +79,7 @@ class OmniSocketVideoReceiver: return with self._lock: - if self._started: + if self._started or self._closing.is_set(): return self._started = True self._thread = threading.Thread( @@ -167,14 +170,19 @@ class OmniSocketVideoReceiver: return timestamp_ns, VIDEO_TIMESTAMP_UNIT, VIDEO_TIMESTAMP_ENDIANNESS def _run(self) -> None: - while True: + while not self._closing.is_set(): try: session, buffer_bytes = self._connect_session() - self._session = session - self._last_error = "" + with self._lock: + self._session = session + self._last_error = "" + if self._ever_connected: + self._reconnect_count += 1 + else: + self._ever_connected = True buffer = bytearray(buffer_bytes) - while True: + while not self._closing.is_set(): meta = session.recv_into(buffer, timeout_ms=1000) if meta is None: continue @@ -207,15 +215,25 @@ class OmniSocketVideoReceiver: self._latency_samples_ms.append(latency_ms) self._frames_received += 1 except Exception as error: # pragma: no cover - runtime integration path - self._last_error = str(error) - time.sleep(2) + if not self._closing.is_set(): + session_error = "" + if self._session is not None: + try: + session_error = str(dict(self._session.stats()).get("last_server_error", "") or "") + except Exception: + session_error = "" + self._last_error = session_error or str(error) + time.sleep(2) finally: if self._session is not None: try: self._session.close() except Exception: pass + with self._lock: self._session = None + if self._closing.is_set(): + self._started = False def get_latest_frame(self) -> bytes | None: self.ensure_started() @@ -231,11 +249,11 @@ class OmniSocketVideoReceiver: with self._lock: session = self._session if session is None: - return {"connected": 0} + return {"connected": 0, "registered": 0, "last_server_error": self._last_error} try: return dict(session.stats()) except Exception: - return {"connected": 0} + return {"connected": 0, "registered": 0, "last_server_error": self._last_error} def session_kcp_stats(self) -> dict[str, Any]: self.ensure_started() @@ -248,6 +266,7 @@ class OmniSocketVideoReceiver: config = load_omnisocket_config() transport_cfg = config.get("transport", {}) video_cfg = config.get("video_receiver", {}) + session_stats = self.session_stats() with self._lock: has_recent_frame = self._latest_frame is not None and ( time.time() - self._latest_received_at <= OMNISOCKET_FRAME_FRESH_SECONDS @@ -276,9 +295,12 @@ class OmniSocketVideoReceiver: "backend_ready": self._session_cls is not None, "mode": VIDEO_SOURCE_MODE, "connected": self._session is not None, + "registered": bool(session_stats.get("registered", 0)), "has_recent_frame": has_recent_frame, "frames_received": self._frames_received, "latest_sequence": self._latest_sequence, + "reconnect_count": self._reconnect_count, + "last_server_error": str(session_stats.get("last_server_error", "") or ""), "last_error": self._last_error, "config_path": str(OMNISOCKET_CONFIG_PATH), "server_addr": str(transport_cfg.get("server_addr", "")), @@ -288,6 +310,19 @@ class OmniSocketVideoReceiver: "timing": timing_status, } + def close(self) -> None: + self._closing.set() + with self._lock: + session = self._session + if session is not None: + try: + session.close() + except Exception: + pass + thread = self._thread + if thread is not None and thread.is_alive(): + thread.join(timeout=0.5) + class VideoFrameService: def __init__(self, receiver: OmniSocketVideoReceiver) -> None: diff --git a/frontend/src/components/NetworkPanel.vue b/frontend/src/components/NetworkPanel.vue index d4b584a..cdbe187 100644 --- a/frontend/src/components/NetworkPanel.vue +++ b/frontend/src/components/NetworkPanel.vue @@ -86,9 +86,12 @@ function legSessions(link: LinkTelemetry | null): Array<{ name: string; data: Li

Transport: {{ network?.transport ?? 'n/a' }} / {{ network?.source_mode ?? 'n/a' }}

Telemetry Peer: {{ network?.telemetry_receiver?.peer_id ?? 'n/a' }}

+

Telemetry Registered: {{ network?.telemetry_receiver?.registered ? 'yes' : 'no' }}

Hub Freshness: {{ formatTime(network?.telemetry_receiver?.hub_updated_at) }}

Hub State: {{ network?.telemetry_receiver?.hub_stale ? 'stale' : 'fresh' }}

+

Telemetry Reconnects: {{ network?.telemetry_receiver?.reconnect_count ?? 0 }}

Hub Error: {{ network?.telemetry_receiver?.last_error }}

+

Telemetry Session Error: {{ network?.telemetry_receiver?.last_server_error }}

@@ -165,6 +168,8 @@ function legSessions(link: LinkTelemetry | null): Array<{ name: string; data: Li

Repeat: {{ formatScalar(session.data?.trend?.repeat_delta) }}

Repair Rate: {{ formatScalar(session.data?.trend?.repair_rate_pct, '%') }}

App Bytes: tx={{ session.data.app.send_bytes ?? 0 }} / rx={{ session.data.app.recv_bytes ?? 0 }}

+

Registered: {{ session.data.app.registered ? 'yes' : 'no' }}

+

Server Error: {{ session.data.app.last_server_error }}

@@ -174,7 +179,9 @@ function legSessions(link: LinkTelemetry | null): Array<{ name: string; data: Li

Combined: sessions={{ network?.combined?.connected_sessions ?? 0 }} send={{ network?.combined?.send_bytes ?? 0 }}B recv={{ network?.combined?.recv_bytes ?? 0 }}B

Native UDP: {{ network?.ingress?.native_udp?.bind_addr ?? 'n/a' }} packets={{ network?.ingress?.native_udp?.packets_received ?? 0 }} invalid={{ network?.ingress?.native_udp?.invalid_packets ?? 0 }}

-

Control Sender: {{ network?.control?.sender?.peer_id ?? 'n/a' }} -> {{ network?.control?.sender?.target_peer ?? 'n/a' }} sends={{ network?.control?.sender?.send_count ?? 0 }}

+

Control Sender: {{ network?.control?.sender?.peer_id ?? 'n/a' }} -> {{ network?.control?.sender?.target_peer ?? 'n/a' }} sends={{ network?.control?.sender?.send_count ?? 0 }} registered={{ network?.control?.sender?.registered ? 'yes' : 'no' }}

+

Control Reconnects: {{ network?.control?.sender?.reconnect_count ?? 0 }}

+

Control Session Error: {{ network?.control?.sender?.last_server_error }}

diff --git a/frontend/src/types.ts b/frontend/src/types.ts index df03d3d..0249435 100644 --- a/frontend/src/types.ts +++ b/frontend/src/types.ts @@ -14,6 +14,7 @@ export interface GpsTelemetry { export interface SessionAppStats { connected: number + registered?: number send_calls?: number send_bytes?: number send_errors?: number @@ -21,6 +22,7 @@ export interface SessionAppStats { recv_bytes?: number recv_timeouts?: number recv_errors?: number + last_server_error?: string } export interface SessionKcpStats { @@ -118,11 +120,14 @@ export interface ControlSenderStatus { backend_ready: boolean started: boolean connected: boolean + registered: boolean peer_id: string target_peer: string send_count: number send_errors: number drain_errors: number + reconnect_count: number + last_server_error: string last_error: string } @@ -132,6 +137,9 @@ export interface TelemetryReceiverStatus { hub_stale: boolean last_error: string peer_id: string + registered: boolean + last_server_error: string + reconnect_count: number } export interface NetworkTelemetry { @@ -191,9 +199,12 @@ export interface VideoStatus { backend_ready: boolean mode: string connected: boolean + registered: boolean has_recent_frame: boolean frames_received: number latest_sequence: number | null + reconnect_count: number + last_server_error: string last_error: string config_path: string server_addr?: string