"""B-side OmniDaemon that owns control receive and manages video send workers.""" from __future__ import annotations import argparse import copy from dataclasses import dataclass from datetime import datetime, timezone from http import HTTPStatus from http.server import BaseHTTPRequestHandler import json import os from pathlib import Path import queue import signal import socket import socketserver import subprocess import sys import threading import time from typing import Any import yaml from omnisocket_a_side.control_codec import CONTROL_PACKET_STRUCT from . import ( DEFAULT_CONFIG_PATH, DEFAULT_CTRL_SOCKET_PATH, DEFAULT_SOCKET_PATH, REPO_ROOT, VERSION, ) def utc_iso_now() -> str: return datetime.now(timezone.utc).isoformat(timespec="seconds").replace("+00:00", "Z") def load_omnisocket_api(): from omnisocket import CONTROL_DEFAULTS, MSG_TYPE_BINARY, MSG_TYPE_ERROR, Session return CONTROL_DEFAULTS, MSG_TYPE_BINARY, MSG_TYPE_ERROR, Session def _merge_kcp_defaults(defaults: dict[str, Any], override: dict[str, Any]) -> dict[str, Any]: merged = dict(defaults) for key in ("nodelay", "interval_ms", "resend", "nc", "sndwnd", "rcvwnd", "mtu"): if key in override: merged[key] = int(override[key]) return merged def _default_stats() -> dict[str, int]: return { "send_calls": 0, "send_bytes": 0, "send_errors": 0, "recv_calls": 0, "recv_bytes": 0, "recv_timeouts": 0, "recv_errors": 0, "connected": 0, } def _default_kcp_metrics() -> dict[str, Any]: return { "connected": 0, "has_conv": 0, "conv": 0, "local_addr": "", "remote_addr": "", "rto_ms": 0, "srtt_ms": 0, "srttvar_ms": 0, "bytes_sent": 0, "bytes_received": 0, "in_pkts": 0, "out_pkts": 0, "in_segs": 0, "out_segs": 0, "retrans_segs": 0, "fast_retrans_segs": 0, "early_retrans_segs": 0, "lost_segs": 0, "repeat_segs": 0, "in_errs": 0, "kcp_in_errs": 0, "ring_buffer_snd_queue": 0, "ring_buffer_rcv_queue": 0, "ring_buffer_snd_buffer": 0, } @dataclass(slots=True) class VideoProfile: fps: int jpeg_quality_qscale: int max_frame_bytes: int def as_dict(self) -> dict[str, int]: return { "fps": int(self.fps), "jpeg_quality_qscale": int(self.jpeg_quality_qscale), "max_frame_bytes": int(self.max_frame_bytes), } def _load_config(config_path: str | None) -> dict[str, Any]: path = Path(os.getenv("OMNIBDAEMON_CONFIG", config_path or str(DEFAULT_CONFIG_PATH))) raw: dict[str, Any] = {} if path.exists(): with path.open("r", encoding="utf-8") as file: raw = yaml.safe_load(file) or {} control_defaults, _msg_type_binary, _msg_type_error, _session_cls = load_omnisocket_api() transport = dict(raw.get("transport", {})) control = dict(raw.get("control_receiver", {})) video = dict(raw.get("video_sender", {})) daemon_cfg = dict(raw.get("daemon", {})) policy = dict(raw.get("policy", {})) def _profile(name: str, fps: int, qscale: int, max_frame_bytes: int) -> dict[str, int]: section = dict(policy.get(name, {})) return { "fps": int(section.get("fps", fps)), "jpeg_quality_qscale": int(section.get("jpeg_quality_qscale", qscale)), "max_frame_bytes": int(section.get("max_frame_bytes", max_frame_bytes)), } binary_path = str(video.get("binary_path", "bin/b_side_video_sender")) if not os.path.isabs(binary_path): binary_path = str((REPO_ROOT / binary_path).resolve()) return { "config_path": str(path), "transport": { "server_addr": str(transport.get("server_addr", "")), "relay_via": str(transport.get("relay_via", "")), "bind_ip": str(transport.get("bind_ip", "")), "bind_device": str(transport.get("bind_device", "")), }, "control_receiver": { "peer_id": str(control.get("peer_id", "peer-b-ctrl")), "stats_interval_ms": int(control.get("stats_interval_ms", 100)), "queue_capacity": int(control.get("queue_capacity", 256)), "kcp": _merge_kcp_defaults(control_defaults, control), }, "video_sender": { "enabled": bool(video.get("enabled", False)), "binary_path": binary_path, "peer_id": str(video.get("peer_id", "peer-b-video")), "target_peer": str(video.get("target_peer", "peer-a-video")), "stats_interval_ms": int(video.get("stats_interval_ms", 100)), "device": str(video.get("device", "/dev/video0")), "capture_width": int(video.get("capture_width", 1280)), "capture_height": int(video.get("capture_height", 720)), "output_width": int(video.get("output_width", 640)), "output_height": int(video.get("output_height", 360)), "initial_profile": { "fps": int(video.get("fps", 10)), "jpeg_quality_qscale": int(video.get("jpeg_quality_qscale", 8)), "max_frame_bytes": int(video.get("max_frame_bytes", 40960)), }, }, "daemon": { "socket_path": os.getenv( "OMNIBDAEMON_SOCKET", str(daemon_cfg.get("socket_path", DEFAULT_SOCKET_PATH)), ), "ctrl_socket_path": os.getenv( "OMNIBDAEMON_CTRL_SOCKET", str(daemon_cfg.get("ctrl_socket_path", DEFAULT_CTRL_SOCKET_PATH)), ), "reconnect_delay_ms": int(daemon_cfg.get("reconnect_delay_ms", 2000)), "telemetry_interval_ms": int(daemon_cfg.get("telemetry_interval_ms", 100)), "worker_restart_delay_ms": int(daemon_cfg.get("worker_restart_delay_ms", 2000)), }, "policy": { "mode": str(policy.get("mode", "auto")).lower(), "health_window_ms": int(policy.get("health_window_ms", 2000)), "green_srtt_ms": int(policy.get("green_srtt_ms", 30)), "yellow_srtt_ms": int(policy.get("yellow_srtt_ms", 55)), "retrans_red_threshold": int(policy.get("retrans_red_threshold", 8)), "profile_green": _profile("profile_green", 10, 8, 40960), "profile_yellow": _profile("profile_yellow", 7, 12, 28672), "profile_red": _profile("profile_red", 5, 16, 20480), }, } class ControlRecvManager: def __init__(self, config: dict[str, Any]) -> None: control_defaults, msg_type_binary, msg_type_error, session_cls = load_omnisocket_api() transport = config["transport"] control_cfg = config["control_receiver"] daemon_cfg = config["daemon"] self._msg_type_binary = msg_type_binary self._msg_type_error = msg_type_error self._session_cls = session_cls self._connect_kwargs = { "server_addr": transport["server_addr"], "relay_via": transport["relay_via"], "bind_ip": transport["bind_ip"], "bind_device": transport["bind_device"], "peer_id": control_cfg["peer_id"], "stats_interval_ms": control_cfg["stats_interval_ms"], **_merge_kcp_defaults(control_defaults, control_cfg["kcp"]), } self._reconnect_delay_s = max(0.25, daemon_cfg["reconnect_delay_ms"] / 1000.0) self._queue: queue.Queue[bytes] = queue.Queue(maxsize=control_cfg["queue_capacity"]) self._lock = threading.Lock() self._stop_event = threading.Event() self._thread = threading.Thread(target=self._run, name="omni-b-side-control", daemon=True) self._session = None self._connected = False self._last_error = "" self._last_connect_attempt = 0.0 self._packets_received = 0 self._packets_enqueued = 0 self._ignored_non_binary = 0 self._ignored_bad_length = 0 self._dropped_queue_full = 0 @property def packet_queue(self) -> queue.Queue[bytes]: return self._queue def start(self) -> None: self._thread.start() def stop(self) -> None: self._stop_event.set() if self._thread.is_alive(): self._thread.join(timeout=2.0) self._disconnect("control receiver stopped") def snapshot(self) -> dict[str, Any]: with self._lock: session = self._session connected = self._connected and session is not None last_error = self._last_error packets_received = self._packets_received packets_enqueued = self._packets_enqueued ignored_non_binary = self._ignored_non_binary ignored_bad_length = self._ignored_bad_length dropped_queue_full = self._dropped_queue_full stats = _default_stats() metrics = _default_kcp_metrics() if session is not None: try: stats = dict(session.stats()) metrics = dict(session.kcp_metrics()) connected = bool(stats.get("connected") and metrics.get("connected")) except Exception as error: # pragma: no cover - runtime integration connected = False last_error = str(error) return { "connected": connected, "peer_id": self._connect_kwargs["peer_id"], "server_addr": self._connect_kwargs["server_addr"], "relay_via": self._connect_kwargs["relay_via"], "queue_depth": self._queue.qsize(), "packets_received": packets_received, "packets_enqueued": packets_enqueued, "ignored_non_binary": ignored_non_binary, "ignored_bad_length": ignored_bad_length, "dropped_queue_full": dropped_queue_full, "last_error": last_error, "stats": stats, "kcp_metrics": metrics, } def _run(self) -> None: while not self._stop_event.is_set(): if not self._is_connected(): now = time.monotonic() if now - self._last_connect_attempt >= self._reconnect_delay_s: self._last_connect_attempt = now self._connect() time.sleep(0.2) continue with self._lock: session = self._session if session is None: continue try: item = session.recv(timeout_ms=200) except Exception as error: # pragma: no cover - runtime integration self._disconnect(str(error)) continue if item is None: continue _from_peer, msg_type, payload = item with self._lock: self._packets_received += 1 if msg_type != self._msg_type_binary: with self._lock: self._ignored_non_binary += 1 self._disconnect(self._describe_unexpected_message(msg_type, payload)) continue if len(payload) != CONTROL_PACKET_STRUCT.size: with self._lock: self._ignored_bad_length += 1 continue self._enqueue_packet(bytes(payload)) def _connect(self) -> None: session = self._session_cls() try: session.connect(**self._connect_kwargs) except Exception as error: # pragma: no cover - runtime integration with self._lock: self._connected = False self._last_error = str(error) try: session.close() except Exception: pass return with self._lock: self._session = session self._connected = True self._last_error = "" def _disconnect(self, error_message: str) -> None: with self._lock: session = self._session self._session = None self._connected = False if error_message: self._last_error = error_message if session is not None: try: session.close() except Exception: pass def _describe_unexpected_message(self, msg_type: int, payload: bytes) -> str: detail = payload.decode("utf-8", errors="replace").strip() if msg_type == self._msg_type_error: return f"control session rejected by server: {detail or 'unknown error'}" if detail: return f"received unexpected control message type {msg_type}: {detail}" return f"received unexpected control message type {msg_type}" def _enqueue_packet(self, payload: bytes) -> None: try: self._queue.put_nowait(payload) except queue.Full: try: self._queue.get_nowait() except queue.Empty: pass with self._lock: self._dropped_queue_full += 1 try: self._queue.put_nowait(payload) except queue.Full: with self._lock: self._dropped_queue_full += 1 return with self._lock: self._packets_enqueued += 1 def _is_connected(self) -> bool: with self._lock: return self._connected and self._session is not None class ControlFanout: def __init__(self, socket_path: str, packet_queue: queue.Queue[bytes]) -> None: self._socket_path = socket_path self._packet_queue = packet_queue self._lock = threading.Lock() self._stop_event = threading.Event() self._accept_thread = threading.Thread(target=self._accept_loop, name="omni-b-side-ctrl-accept", daemon=True) self._send_thread = threading.Thread(target=self._send_loop, name="omni-b-side-ctrl-send", daemon=True) self._server_socket: socket.socket | None = None self._consumer_socket: socket.socket | None = None self._sent_packets = 0 self._accepted_connections = 0 self._dropped_no_consumer = 0 self._dropped_send_errors = 0 def start(self) -> None: socket_dir = os.path.dirname(self._socket_path) if socket_dir: os.makedirs(socket_dir, exist_ok=True) try: os.unlink(self._socket_path) except FileNotFoundError: pass server_socket = socket.socket(socket.AF_UNIX, socket.SOCK_SEQPACKET) server_socket.bind(self._socket_path) server_socket.listen(1) server_socket.settimeout(0.5) self._server_socket = server_socket self._accept_thread.start() self._send_thread.start() def stop(self) -> None: self._stop_event.set() with self._lock: consumer = self._consumer_socket self._consumer_socket = None if consumer is not None: try: consumer.close() except OSError: pass if self._server_socket is not None: try: self._server_socket.close() except OSError: pass self._server_socket = None if self._accept_thread.is_alive(): self._accept_thread.join(timeout=2.0) if self._send_thread.is_alive(): self._send_thread.join(timeout=2.0) try: os.unlink(self._socket_path) except FileNotFoundError: pass def snapshot(self) -> dict[str, Any]: with self._lock: consumer_connected = self._consumer_socket is not None sent_packets = self._sent_packets accepted_connections = self._accepted_connections dropped_no_consumer = self._dropped_no_consumer dropped_send_errors = self._dropped_send_errors return { "consumer_connected": consumer_connected, "queue_depth": self._packet_queue.qsize(), "sent_packets": sent_packets, "accepted_connections": accepted_connections, "dropped_no_consumer": dropped_no_consumer, "dropped_send_errors": dropped_send_errors, "socket_path": self._socket_path, } def _accept_loop(self) -> None: while not self._stop_event.is_set(): server_socket = self._server_socket if server_socket is None: return try: consumer, _addr = server_socket.accept() except socket.timeout: continue except OSError: if self._stop_event.is_set(): return time.sleep(0.2) continue with self._lock: old_consumer = self._consumer_socket self._consumer_socket = consumer self._accepted_connections += 1 if old_consumer is not None: try: old_consumer.close() except OSError: pass def _send_loop(self) -> None: while not self._stop_event.is_set(): try: payload = self._packet_queue.get(timeout=0.2) except queue.Empty: continue with self._lock: consumer = self._consumer_socket if consumer is None: with self._lock: self._dropped_no_consumer += 1 continue try: consumer.sendall(payload) except OSError: with self._lock: self._dropped_send_errors += 1 if self._consumer_socket is consumer: self._consumer_socket = None try: consumer.close() except OSError: pass continue with self._lock: self._sent_packets += 1 class VideoWorkerManager: def __init__(self, config: dict[str, Any]) -> None: self._config = config self._transport_cfg = config["transport"] self._video_cfg = config["video_sender"] self._daemon_cfg = config["daemon"] self._policy_cfg = config["policy"] self._enabled = bool(self._video_cfg["enabled"]) self._restart_delay_s = max(0.5, self._daemon_cfg["worker_restart_delay_ms"] / 1000.0) initial = self._video_cfg["initial_profile"] initial_profile = VideoProfile( fps=int(initial["fps"]), jpeg_quality_qscale=int(initial["jpeg_quality_qscale"]), max_frame_bytes=int(initial["max_frame_bytes"]), ) self._lock = threading.Lock() self._stop_event = threading.Event() self._thread = threading.Thread(target=self._run, name="omni-b-side-video-worker", daemon=True) self._reader_thread: threading.Thread | None = None self._process: subprocess.Popen[str] | None = None self._control_stream = None self._telemetry_stream = None self._mode = self._policy_cfg["mode"] if self._policy_cfg["mode"] in {"auto", "manual"} else "auto" self._desired_profile = initial_profile self._active_profile = initial_profile self._restart_count = 0 self._last_exit_code: int | None = None self._last_error = "" self._frames_total = 0 self._frames_sent = 0 self._frames_dropped = 0 self._oversized_drops = 0 self._last_frame_bytes = 0 self._last_encode_us = 0 self._last_drop_reason = "" self._last_frame_ts = 0 self._kcp_metrics = _default_kcp_metrics() def start(self) -> None: if not self._enabled: return if not os.path.exists(self._video_cfg["binary_path"]): print( ( "B-side video worker binary missing: " f"{self._video_cfg['binary_path']} " "(run `make b_side_video_sender` in OmniSocketGo)" ), file=sys.stderr, flush=True, ) self._thread.start() def stop(self) -> None: self._stop_event.set() self._send_worker_command({"type": "shutdown"}) if self._thread.is_alive(): self._thread.join(timeout=3.0) self._terminate_worker() def snapshot(self) -> dict[str, Any]: with self._lock: process = self._process running = process is not None and process.poll() is None return { "enabled": self._enabled, "running": running, "mode": self._mode, "desired_profile": self._desired_profile.as_dict(), "active_profile": self._active_profile.as_dict(), "restart_count": self._restart_count, "last_exit_code": self._last_exit_code, "last_error": self._last_error, "frames_total": self._frames_total, "frames_sent": self._frames_sent, "frames_dropped": self._frames_dropped, "oversized_drops": self._oversized_drops, "last_frame_bytes": self._last_frame_bytes, "last_encode_us": self._last_encode_us, "last_drop_reason": self._last_drop_reason, "last_frame_ts": self._last_frame_ts, "kcp_metrics": dict(self._kcp_metrics), "binary_path": self._video_cfg["binary_path"], } def set_profile( self, *, mode: str, fps: int | None = None, jpeg_quality_qscale: int | None = None, max_frame_bytes: int | None = None, ) -> dict[str, Any]: mode = mode.lower() if mode not in {"auto", "manual"}: raise ValueError("mode must be auto or manual") if mode == "auto": with self._lock: self._mode = "auto" profile = self._desired_profile self._send_worker_command({"type": "set_profile", **profile.as_dict()}) return self.snapshot() if fps is None or jpeg_quality_qscale is None or max_frame_bytes is None: raise ValueError("manual mode requires fps, jpeg_quality_qscale, and max_frame_bytes") profile = VideoProfile( fps=max(1, int(fps)), jpeg_quality_qscale=max(1, int(jpeg_quality_qscale)), max_frame_bytes=max(1024, int(max_frame_bytes)), ) with self._lock: self._mode = "manual" self._desired_profile = profile self._send_worker_command({"type": "set_profile", **profile.as_dict()}) return self.snapshot() def apply_auto_profile(self, profile: VideoProfile) -> None: with self._lock: if self._mode != "auto": return if self._desired_profile.as_dict() == profile.as_dict(): return self._desired_profile = profile self._send_worker_command({"type": "set_profile", **profile.as_dict()}) def _run(self) -> None: while not self._stop_event.is_set(): process = self._process if process is None: self._spawn_worker() time.sleep(0.2) continue exit_code = process.poll() if exit_code is None: time.sleep(0.5) continue with self._lock: self._last_exit_code = int(exit_code) self._last_error = f"video worker exited with code {exit_code}" self._restart_count += 1 self._close_worker_handles() with self._lock: self._process = None if self._stop_event.wait(self._restart_delay_s): return def _spawn_worker(self) -> None: binary_path = self._video_cfg["binary_path"] if not os.path.exists(binary_path): with self._lock: self._last_error = f"video worker binary not found: {binary_path}" self._stop_event.wait(self._restart_delay_s) return command_read_fd, command_write_fd = os.pipe() telemetry_read_fd, telemetry_write_fd = os.pipe() env = dict(os.environ) env.update( { "OMNI_VIDEO_SERVER_ADDR": self._transport_cfg["server_addr"], "OMNI_VIDEO_RELAY_VIA": self._transport_cfg["relay_via"], "OMNI_VIDEO_BIND_IP": self._transport_cfg["bind_ip"], "OMNI_VIDEO_BIND_DEVICE": self._transport_cfg["bind_device"], "OMNI_VIDEO_PEER_ID": self._video_cfg["peer_id"], "OMNI_VIDEO_TARGET_PEER": self._video_cfg["target_peer"], "OMNI_VIDEO_DEVICE": self._video_cfg["device"], "OMNI_VIDEO_CAPTURE_WIDTH": str(self._video_cfg["capture_width"]), "OMNI_VIDEO_CAPTURE_HEIGHT": str(self._video_cfg["capture_height"]), "OMNI_VIDEO_OUTPUT_WIDTH": str(self._video_cfg["output_width"]), "OMNI_VIDEO_OUTPUT_HEIGHT": str(self._video_cfg["output_height"]), "OMNI_VIDEO_STATS_INTERVAL_MS": str(self._video_cfg["stats_interval_ms"]), "OMNI_WORKER_CONTROL_FD": str(command_read_fd), "OMNI_WORKER_TELEMETRY_FD": str(telemetry_write_fd), } ) with self._lock: profile = self._desired_profile env["OMNI_VIDEO_FPS"] = str(profile.fps) env["OMNI_VIDEO_JPEG_QSCALE"] = str(profile.jpeg_quality_qscale) env["OMNI_VIDEO_MAX_FRAME_BYTES"] = str(profile.max_frame_bytes) try: process = subprocess.Popen( [binary_path], stdin=subprocess.DEVNULL, stdout=subprocess.DEVNULL, stderr=None, env=env, close_fds=True, pass_fds=(command_read_fd, telemetry_write_fd), ) except Exception as error: # pragma: no cover - runtime integration for fd in (command_read_fd, command_write_fd, telemetry_read_fd, telemetry_write_fd): try: os.close(fd) except OSError: pass with self._lock: self._last_error = str(error) self._stop_event.wait(self._restart_delay_s) return os.close(command_read_fd) os.close(telemetry_write_fd) control_stream = os.fdopen(command_write_fd, "w", buffering=1, encoding="utf-8") telemetry_stream = os.fdopen( telemetry_read_fd, "r", buffering=1, encoding="utf-8", errors="replace", ) with self._lock: self._process = process self._control_stream = control_stream self._telemetry_stream = telemetry_stream self._active_profile = profile self._last_error = "" reader_thread = threading.Thread( target=self._telemetry_loop, args=(telemetry_stream, process), name="omni-b-side-video-telemetry", daemon=True, ) self._reader_thread = reader_thread reader_thread.start() self._send_worker_command({"type": "set_profile", **profile.as_dict()}) def _telemetry_loop(self, telemetry_stream, process: subprocess.Popen[Any]) -> None: while not self._stop_event.is_set(): line = telemetry_stream.readline() if not line: return try: payload = json.loads(line) except json.JSONDecodeError: continue if not isinstance(payload, dict): continue payload_type = payload.get("type") with self._lock: if payload_type == "frame_stat": self._frames_total += 1 self._last_frame_bytes = int(payload.get("frame_bytes", 0) or 0) self._last_encode_us = int(payload.get("encode_us", 0) or 0) self._last_drop_reason = str(payload.get("drop_reason", "")) self._last_frame_ts = int(payload.get("ts_unix_ms", 0) or 0) if bool(payload.get("sent", False)): self._frames_sent += 1 else: self._frames_dropped += 1 if self._last_drop_reason == "frame_too_large": self._oversized_drops += 1 elif payload_type == "kcp_metrics": merged = _default_kcp_metrics() for key in merged: if key in payload: merged[key] = payload[key] self._kcp_metrics = merged else: self._last_error = f"unknown worker telemetry type: {payload_type}" if process.poll() is not None: return def _send_worker_command(self, payload: dict[str, Any]) -> None: with self._lock: control_stream = self._control_stream if control_stream is None: return try: control_stream.write(json.dumps(payload) + "\n") control_stream.flush() except OSError as error: with self._lock: self._last_error = str(error) def _close_worker_handles(self) -> None: with self._lock: control_stream = self._control_stream telemetry_stream = self._telemetry_stream self._control_stream = None self._telemetry_stream = None if control_stream is not None: try: control_stream.close() except OSError: pass if telemetry_stream is not None: try: telemetry_stream.close() except OSError: pass def _terminate_worker(self) -> None: self._close_worker_handles() with self._lock: process = self._process self._process = None if process is None: return if process.poll() is None: process.terminate() try: process.wait(timeout=2.0) except subprocess.TimeoutExpired: process.kill() try: process.wait(timeout=2.0) except subprocess.TimeoutExpired: pass class PolicyEngine: def __init__(self, policy_cfg: dict[str, Any]) -> None: self._policy_cfg = policy_cfg def classify( self, *, control_connected: bool, consumer_connected: bool, avg_srtt_ms: float, retrans_delta: int, video_oversized_recent: bool, video_backlogged: bool, ) -> tuple[str, VideoProfile]: if not control_connected or not consumer_connected: band = "red" elif avg_srtt_ms >= self._policy_cfg["yellow_srtt_ms"] or retrans_delta >= self._policy_cfg["retrans_red_threshold"]: band = "red" elif avg_srtt_ms >= self._policy_cfg["green_srtt_ms"] or retrans_delta > 0: band = "yellow" else: band = "green" if band == "green" and (video_oversized_recent or video_backlogged): band = "yellow" elif band == "yellow" and (video_oversized_recent or video_backlogged): band = "red" profile_raw = self._policy_cfg[f"profile_{band}"] return band, VideoProfile( fps=int(profile_raw["fps"]), jpeg_quality_qscale=int(profile_raw["jpeg_quality_qscale"]), max_frame_bytes=int(profile_raw["max_frame_bytes"]), ) class TelemetrySampler: def __init__( self, config: dict[str, Any], control_manager: ControlRecvManager, fanout: ControlFanout, video_worker: VideoWorkerManager, ) -> None: self._config = config self._control_manager = control_manager self._fanout = fanout self._video_worker = video_worker self._policy_engine = PolicyEngine(config["policy"]) self._interval_s = max(0.1, config["daemon"]["telemetry_interval_ms"] / 1000.0) self._window_s = max(self._interval_s, config["policy"]["health_window_ms"] / 1000.0) self._lock = threading.Lock() self._stop_event = threading.Event() self._thread = threading.Thread(target=self._run, name="omni-b-side-telemetry", daemon=True) self._started_at = utc_iso_now() self._state = self._build_initial_state(config) self._history: list[dict[str, Any]] = [] self._last_totals: dict[str, float] | None = None def start(self) -> None: self._thread.start() def stop(self) -> None: self._stop_event.set() if self._thread.is_alive(): self._thread.join(timeout=2.0) def snapshot(self) -> dict[str, Any]: with self._lock: return copy.deepcopy(self._state) def _run(self) -> None: while not self._stop_event.is_set(): self._sample_once() self._stop_event.wait(self._interval_s) def _sample_once(self) -> None: now = time.time() control = self._control_manager.snapshot() fanout = self._fanout.snapshot() video = self._video_worker.snapshot() control_metrics = control["kcp_metrics"] video_metrics = video["kcp_metrics"] self._history.append( { "ts": now, "connected": bool(control["connected"]), "srtt_ms": float(control_metrics.get("srtt_ms", 0.0) or 0.0), "retrans_segs": int(control_metrics.get("retrans_segs", 0) or 0), } ) self._history = [sample for sample in self._history if now - sample["ts"] <= self._window_s] srtt_values = [sample["srtt_ms"] for sample in self._history if sample["connected"]] avg_srtt_ms = sum(srtt_values) / len(srtt_values) if srtt_values else 0.0 jitter_ms = 0.0 if len(srtt_values) >= 2: deltas = [abs(srtt_values[index] - srtt_values[index - 1]) for index in range(1, len(srtt_values))] jitter_ms = sum(deltas) / len(deltas) retrans_delta = 0 if len(self._history) >= 2: retrans_delta = max(0, int(self._history[-1]["retrans_segs"] - self._history[0]["retrans_segs"])) last_frame_ts_ms = int(video["last_frame_ts"] or 0) video_oversized_recent = bool( video["last_drop_reason"] == "frame_too_large" and last_frame_ts_ms > 0 and now - (last_frame_ts_ms / 1000.0) <= self._window_s ) video_backlogged = bool( int(video_metrics.get("ring_buffer_snd_queue", 0) or 0) > 128 or int(video_metrics.get("ring_buffer_snd_buffer", 0) or 0) > 128 ) health_band, recommended_profile = self._policy_engine.classify( control_connected=bool(control["connected"]), consumer_connected=bool(fanout["consumer_connected"]), avg_srtt_ms=avg_srtt_ms, retrans_delta=retrans_delta, video_oversized_recent=video_oversized_recent, video_backlogged=video_backlogged, ) self._video_worker.apply_auto_profile(recommended_profile) total_bytes_sent = int(control_metrics.get("bytes_sent", 0) or 0) + int(video_metrics.get("bytes_sent", 0) or 0) total_bytes_received = int(control_metrics.get("bytes_received", 0) or 0) + int(video_metrics.get("bytes_received", 0) or 0) total_out_segs = int(control_metrics.get("out_segs", 0) or 0) + int(video_metrics.get("out_segs", 0) or 0) total_retrans = int(control_metrics.get("retrans_segs", 0) or 0) + int(video_metrics.get("retrans_segs", 0) or 0) tx_kbps = 0 rx_kbps = 0 retrans_pct = 0.0 if self._last_totals is not None: dt = max(0.001, now - self._last_totals["ts"]) sent_delta = max(0, total_bytes_sent - int(self._last_totals["bytes_sent"])) recv_delta = max(0, total_bytes_received - int(self._last_totals["bytes_received"])) out_seg_delta = max(0, total_out_segs - int(self._last_totals["out_segs"])) retrans_total_delta = max(0, total_retrans - int(self._last_totals["retrans"])) tx_kbps = int((sent_delta * 8.0) / dt / 1000.0) rx_kbps = int((recv_delta * 8.0) / dt / 1000.0) if out_seg_delta > 0: retrans_pct = round((retrans_total_delta / out_seg_delta) * 100.0, 2) self._last_totals = { "ts": now, "bytes_sent": float(total_bytes_sent), "bytes_received": float(total_bytes_received), "out_segs": float(total_out_segs), "retrans": float(total_retrans), } state = { "network": { "peer_status": "online" if control["connected"] else "offline", "latency_ms": round(avg_srtt_ms or float(control_metrics.get("srtt_ms", 0) or 0.0), 1), "jitter_ms": round(jitter_ms, 1), "retrans_pct": round(retrans_pct, 2), "packet_loss_pct": round(retrans_pct, 2), "tx_kbps": tx_kbps, "rx_kbps": rx_kbps, "signal_dbm": None, "transport": "OmniSocket / daemon", "source_mode": "daemon-live", "updated_at": utc_iso_now(), }, "control": { "connected": bool(control["connected"]), "consumer_connected": bool(fanout["consumer_connected"]), "queue_depth": int(control["queue_depth"]), "fanout_queue_depth": int(fanout["queue_depth"]), "packets_received": int(control["packets_received"]), "packets_enqueued": int(control["packets_enqueued"]), "sent_to_consumer": int(fanout["sent_packets"]), "dropped_no_consumer": int(fanout["dropped_no_consumer"]), "dropped_queue_full": int(control["dropped_queue_full"]), "ignored_non_binary": int(control["ignored_non_binary"]), "ignored_bad_length": int(control["ignored_bad_length"]), "last_error": control["last_error"], "peer_id": control["peer_id"], "server_addr": control["server_addr"], "relay_via": control["relay_via"], "stats": control["stats"], "kcp_metrics": control_metrics, }, "video": { "enabled": bool(video["enabled"]), "running": bool(video["running"]), "mode": str(video["mode"]), "desired_profile": video["desired_profile"], "active_profile": video["active_profile"], "restart_count": int(video["restart_count"]), "last_exit_code": video["last_exit_code"], "last_error": video["last_error"], "frames_total": int(video["frames_total"]), "frames_sent": int(video["frames_sent"]), "frames_dropped": int(video["frames_dropped"]), "oversized_drops": int(video["oversized_drops"]), "last_frame_bytes": int(video["last_frame_bytes"]), "last_encode_us": int(video["last_encode_us"]), "last_drop_reason": str(video["last_drop_reason"]), "last_frame_ts": int(video["last_frame_ts"]), "binary_path": video["binary_path"], "kcp_metrics": video_metrics, }, "policy": { "mode": video["mode"], "health_band": health_band, "recommended_video_profile": recommended_profile.as_dict(), }, "daemon": { "started_at": self._started_at, "version": VERSION, "config_path": self._config["config_path"], "socket_path": self._config["daemon"]["socket_path"], "ctrl_socket_path": self._config["daemon"]["ctrl_socket_path"], }, } with self._lock: self._state = state @staticmethod def _build_initial_state(config: dict[str, Any]) -> dict[str, Any]: return { "network": { "peer_status": "offline", "latency_ms": 0.0, "jitter_ms": 0.0, "retrans_pct": 0.0, "packet_loss_pct": 0.0, "tx_kbps": 0, "rx_kbps": 0, "signal_dbm": None, "transport": "OmniSocket / daemon", "source_mode": "daemon-starting", "updated_at": utc_iso_now(), }, "control": { "connected": False, "consumer_connected": False, "queue_depth": 0, "fanout_queue_depth": 0, "packets_received": 0, "packets_enqueued": 0, "sent_to_consumer": 0, "dropped_no_consumer": 0, "dropped_queue_full": 0, "ignored_non_binary": 0, "ignored_bad_length": 0, "last_error": "", "peer_id": "", "server_addr": "", "relay_via": "", "stats": _default_stats(), "kcp_metrics": _default_kcp_metrics(), }, "video": { "enabled": bool(config["video_sender"]["enabled"]), "running": False, "mode": config["policy"]["mode"], "desired_profile": dict(config["video_sender"]["initial_profile"]), "active_profile": dict(config["video_sender"]["initial_profile"]), "restart_count": 0, "last_exit_code": None, "last_error": "", "frames_total": 0, "frames_sent": 0, "frames_dropped": 0, "oversized_drops": 0, "last_frame_bytes": 0, "last_encode_us": 0, "last_drop_reason": "", "last_frame_ts": 0, "binary_path": config["video_sender"]["binary_path"], "kcp_metrics": _default_kcp_metrics(), }, "policy": { "mode": config["policy"]["mode"], "health_band": "red", "recommended_video_profile": dict(config["policy"]["profile_red"]), }, "daemon": { "started_at": utc_iso_now(), "version": VERSION, "config_path": config["config_path"], "socket_path": config["daemon"]["socket_path"], "ctrl_socket_path": config["daemon"]["ctrl_socket_path"], }, } class ThreadingUnixHTTPServer(socketserver.ThreadingMixIn, socketserver.UnixStreamServer): daemon_threads = True class OmniDaemonHTTPHandler(BaseHTTPRequestHandler): server_version = "BsideOmniDaemonHTTP/1.0" protocol_version = "HTTP/1.1" def do_GET(self) -> None: # pragma: no cover - exercised by runtime integration app: BSideOmniDaemon = self.server.app # type: ignore[attr-defined] if self.path == "/v1/health": state = app.get_state() band = state["policy"]["health_band"] status = "ok" if band == "green" else "degraded" if not state["control"]["connected"]: status = "unavailable" self._send_json( HTTPStatus.OK, { "status": status, "health_band": band, "control_connected": state["control"]["connected"], "consumer_connected": state["control"]["consumer_connected"], "video_running": state["video"]["running"], "updated_at": state["network"]["updated_at"], }, ) return if self.path == "/v1/state": self._send_json(HTTPStatus.OK, app.get_state()) return self._send_json(HTTPStatus.NOT_FOUND, {"error": f"unknown path: {self.path}"}) def do_POST(self) -> None: # pragma: no cover - exercised by runtime integration app: BSideOmniDaemon = self.server.app # type: ignore[attr-defined] if self.path != "/v1/video/profile": self._send_json(HTTPStatus.NOT_FOUND, {"error": f"unknown path: {self.path}"}) return try: payload = self._read_json() except ValueError as error: self._send_json(HTTPStatus.BAD_REQUEST, {"error": str(error)}) return try: mode = str(payload.get("mode", "auto")).lower() if mode == "auto": state = app.set_video_profile(mode="auto") else: state = app.set_video_profile( mode="manual", fps=int(payload.get("fps", 0)), jpeg_quality_qscale=int(payload.get("jpeg_quality_qscale", 0)), max_frame_bytes=int(payload.get("max_frame_bytes", 0)), ) except (TypeError, ValueError) as error: self._send_json(HTTPStatus.BAD_REQUEST, {"error": str(error)}) return self._send_json(HTTPStatus.OK, state) def log_message(self, format: str, *args: Any) -> None: # noqa: A003 return def _read_json(self) -> dict[str, Any]: raw_length = self.headers.get("Content-Length") if raw_length is None: raise ValueError("missing Content-Length") try: length = int(raw_length) except ValueError as error: raise ValueError("invalid Content-Length") from error raw = self.rfile.read(length) try: payload = json.loads(raw.decode("utf-8")) except json.JSONDecodeError as error: raise ValueError(f"invalid JSON body: {error}") from error if not isinstance(payload, dict): raise ValueError("request body must be a JSON object") return payload def _send_json(self, status: HTTPStatus, payload: dict[str, Any]) -> None: body = json.dumps(payload).encode("utf-8") self.send_response(status.value) self.send_header("Content-Type", "application/json; charset=utf-8") self.send_header("Content-Length", str(len(body))) self.send_header("Cache-Control", "no-store") self.send_header("Connection", "keep-alive") self.end_headers() self.wfile.write(body) class BSideOmniDaemon: def __init__(self, config_path: str | None = None) -> None: self._config = _load_config(config_path) self._control_manager = ControlRecvManager(self._config) self._fanout = ControlFanout( self._config["daemon"]["ctrl_socket_path"], self._control_manager.packet_queue, ) self._video_worker = VideoWorkerManager(self._config) self._telemetry = TelemetrySampler( self._config, self._control_manager, self._fanout, self._video_worker, ) self._server: ThreadingUnixHTTPServer | None = None self._server_thread: threading.Thread | None = None @property def socket_path(self) -> str: return self._config["daemon"]["socket_path"] def start(self) -> None: self._fanout.start() self._control_manager.start() self._video_worker.start() self._telemetry.start() self._server = self._build_server() def stop(self) -> None: server_thread = None if self._server is not None: if self._server_thread is not None and self._server_thread.is_alive(): self._server.shutdown() self._server.server_close() self._server = None if self._server_thread is not None and self._server_thread.is_alive(): server_thread = self._server_thread self._server_thread = None try: os.unlink(self.socket_path) except FileNotFoundError: pass if server_thread is not None and server_thread is not threading.current_thread(): server_thread.join(timeout=2.0) self._telemetry.stop() self._video_worker.stop() self._control_manager.stop() self._fanout.stop() def serve_forever(self) -> None: self.start() assert self._server is not None self._server_thread = threading.Thread( target=self._server.serve_forever, name="omni-b-side-http", daemon=True, ) self._server_thread.start() print( ( "B-side OmniDaemon ready " f"(state: curl --unix-socket {self.socket_path} http://localhost/v1/state)" ), file=sys.stderr, flush=True, ) self._server_thread.join() def get_state(self) -> dict[str, Any]: return self._telemetry.snapshot() def set_video_profile( self, *, mode: str, fps: int | None = None, jpeg_quality_qscale: int | None = None, max_frame_bytes: int | None = None, ) -> dict[str, Any]: return self._video_worker.set_profile( mode=mode, fps=fps, jpeg_quality_qscale=jpeg_quality_qscale, max_frame_bytes=max_frame_bytes, ) def _build_server(self) -> ThreadingUnixHTTPServer: socket_path = self.socket_path socket_dir = os.path.dirname(socket_path) if socket_dir: os.makedirs(socket_dir, exist_ok=True) try: os.unlink(socket_path) except FileNotFoundError: pass server = ThreadingUnixHTTPServer(socket_path, OmniDaemonHTTPHandler) server.app = self # type: ignore[attr-defined] return server def main(argv: list[str] | None = None) -> None: parser = argparse.ArgumentParser(description="Run the B-side OmniDaemon") parser.add_argument("--config", dest="config_path", help="Path to daemon YAML config") args = parser.parse_args(argv) app = BSideOmniDaemon(config_path=args.config_path) print( ( "B-side OmniDaemon starting " f"(config={app._config['config_path']}, " f"socket={app._config['daemon']['socket_path']}, " f"ctrl_socket={app._config['daemon']['ctrl_socket_path']}, " f"video_enabled={app._config['video_sender']['enabled']})" ), file=sys.stderr, flush=True, ) def _handle_signal(_signum: int, _frame: Any) -> None: app.stop() signal.signal(signal.SIGINT, _handle_signal) signal.signal(signal.SIGTERM, _handle_signal) try: app.serve_forever() except KeyboardInterrupt: pass finally: app.stop() if __name__ == "__main__": main()