Files
OmniSocketGo/python/omnisocket_b_side/daemon.py

1353 lines
51 KiB
Python

"""B-side OmniDaemon that owns control receive and manages video send workers."""
from __future__ import annotations
import argparse
import copy
from dataclasses import dataclass
from datetime import datetime, timezone
from http import HTTPStatus
from http.server import BaseHTTPRequestHandler
import json
import os
from pathlib import Path
import queue
import signal
import socket
import socketserver
import subprocess
import sys
import threading
import time
from typing import Any
import yaml
from omnisocket_a_side.control_codec import CONTROL_PACKET_STRUCT
from . import (
DEFAULT_CONFIG_PATH,
DEFAULT_CTRL_SOCKET_PATH,
DEFAULT_SOCKET_PATH,
REPO_ROOT,
VERSION,
)
def utc_iso_now() -> str:
return datetime.now(timezone.utc).isoformat(timespec="seconds").replace("+00:00", "Z")
def load_omnisocket_api():
from omnisocket import CONTROL_DEFAULTS, MSG_TYPE_BINARY, MSG_TYPE_ERROR, Session
return CONTROL_DEFAULTS, MSG_TYPE_BINARY, MSG_TYPE_ERROR, Session
def _merge_kcp_defaults(defaults: dict[str, Any], override: dict[str, Any]) -> dict[str, Any]:
merged = dict(defaults)
for key in ("nodelay", "interval_ms", "resend", "nc", "sndwnd", "rcvwnd", "mtu"):
if key in override:
merged[key] = int(override[key])
return merged
def _default_stats() -> dict[str, int]:
return {
"send_calls": 0,
"send_bytes": 0,
"send_errors": 0,
"recv_calls": 0,
"recv_bytes": 0,
"recv_timeouts": 0,
"recv_errors": 0,
"connected": 0,
}
def _default_kcp_metrics() -> dict[str, Any]:
return {
"connected": 0,
"has_conv": 0,
"conv": 0,
"local_addr": "",
"remote_addr": "",
"rto_ms": 0,
"srtt_ms": 0,
"srttvar_ms": 0,
"bytes_sent": 0,
"bytes_received": 0,
"in_pkts": 0,
"out_pkts": 0,
"in_segs": 0,
"out_segs": 0,
"retrans_segs": 0,
"fast_retrans_segs": 0,
"early_retrans_segs": 0,
"lost_segs": 0,
"repeat_segs": 0,
"in_errs": 0,
"kcp_in_errs": 0,
"ring_buffer_snd_queue": 0,
"ring_buffer_rcv_queue": 0,
"ring_buffer_snd_buffer": 0,
}
@dataclass(slots=True)
class VideoProfile:
fps: int
jpeg_quality_qscale: int
max_frame_bytes: int
def as_dict(self) -> dict[str, int]:
return {
"fps": int(self.fps),
"jpeg_quality_qscale": int(self.jpeg_quality_qscale),
"max_frame_bytes": int(self.max_frame_bytes),
}
def _load_config(config_path: str | None) -> dict[str, Any]:
path = Path(os.getenv("OMNIBDAEMON_CONFIG", config_path or str(DEFAULT_CONFIG_PATH)))
raw: dict[str, Any] = {}
if path.exists():
with path.open("r", encoding="utf-8") as file:
raw = yaml.safe_load(file) or {}
control_defaults, _msg_type_binary, _msg_type_error, _session_cls = load_omnisocket_api()
transport = dict(raw.get("transport", {}))
control = dict(raw.get("control_receiver", {}))
video = dict(raw.get("video_sender", {}))
daemon_cfg = dict(raw.get("daemon", {}))
policy = dict(raw.get("policy", {}))
def _profile(name: str, fps: int, qscale: int, max_frame_bytes: int) -> dict[str, int]:
section = dict(policy.get(name, {}))
return {
"fps": int(section.get("fps", fps)),
"jpeg_quality_qscale": int(section.get("jpeg_quality_qscale", qscale)),
"max_frame_bytes": int(section.get("max_frame_bytes", max_frame_bytes)),
}
binary_path = str(video.get("binary_path", "bin/b_side_video_sender"))
if not os.path.isabs(binary_path):
binary_path = str((REPO_ROOT / binary_path).resolve())
return {
"config_path": str(path),
"transport": {
"server_addr": str(transport.get("server_addr", "")),
"relay_via": str(transport.get("relay_via", "")),
"bind_ip": str(transport.get("bind_ip", "")),
"bind_device": str(transport.get("bind_device", "")),
},
"control_receiver": {
"peer_id": str(control.get("peer_id", "peer-b-ctrl")),
"stats_interval_ms": int(control.get("stats_interval_ms", 100)),
"queue_capacity": int(control.get("queue_capacity", 256)),
"kcp": _merge_kcp_defaults(control_defaults, control),
},
"video_sender": {
"enabled": bool(video.get("enabled", False)),
"binary_path": binary_path,
"peer_id": str(video.get("peer_id", "peer-b-video")),
"target_peer": str(video.get("target_peer", "peer-a-video")),
"stats_interval_ms": int(video.get("stats_interval_ms", 100)),
"device": str(video.get("device", "/dev/video0")),
"capture_width": int(video.get("capture_width", 1280)),
"capture_height": int(video.get("capture_height", 720)),
"output_width": int(video.get("output_width", 640)),
"output_height": int(video.get("output_height", 360)),
"initial_profile": {
"fps": int(video.get("fps", 10)),
"jpeg_quality_qscale": int(video.get("jpeg_quality_qscale", 8)),
"max_frame_bytes": int(video.get("max_frame_bytes", 40960)),
},
},
"daemon": {
"socket_path": os.getenv(
"OMNIBDAEMON_SOCKET",
str(daemon_cfg.get("socket_path", DEFAULT_SOCKET_PATH)),
),
"ctrl_socket_path": os.getenv(
"OMNIBDAEMON_CTRL_SOCKET",
str(daemon_cfg.get("ctrl_socket_path", DEFAULT_CTRL_SOCKET_PATH)),
),
"reconnect_delay_ms": int(daemon_cfg.get("reconnect_delay_ms", 2000)),
"telemetry_interval_ms": int(daemon_cfg.get("telemetry_interval_ms", 100)),
"worker_restart_delay_ms": int(daemon_cfg.get("worker_restart_delay_ms", 2000)),
},
"policy": {
"mode": str(policy.get("mode", "auto")).lower(),
"health_window_ms": int(policy.get("health_window_ms", 2000)),
"green_srtt_ms": int(policy.get("green_srtt_ms", 30)),
"yellow_srtt_ms": int(policy.get("yellow_srtt_ms", 55)),
"retrans_red_threshold": int(policy.get("retrans_red_threshold", 8)),
"profile_green": _profile("profile_green", 10, 8, 40960),
"profile_yellow": _profile("profile_yellow", 7, 12, 28672),
"profile_red": _profile("profile_red", 5, 16, 20480),
},
}
class ControlRecvManager:
def __init__(self, config: dict[str, Any]) -> None:
control_defaults, msg_type_binary, msg_type_error, session_cls = load_omnisocket_api()
transport = config["transport"]
control_cfg = config["control_receiver"]
daemon_cfg = config["daemon"]
self._msg_type_binary = msg_type_binary
self._msg_type_error = msg_type_error
self._session_cls = session_cls
self._connect_kwargs = {
"server_addr": transport["server_addr"],
"relay_via": transport["relay_via"],
"bind_ip": transport["bind_ip"],
"bind_device": transport["bind_device"],
"peer_id": control_cfg["peer_id"],
"stats_interval_ms": control_cfg["stats_interval_ms"],
**_merge_kcp_defaults(control_defaults, control_cfg["kcp"]),
}
self._reconnect_delay_s = max(0.25, daemon_cfg["reconnect_delay_ms"] / 1000.0)
self._queue: queue.Queue[bytes] = queue.Queue(maxsize=control_cfg["queue_capacity"])
self._lock = threading.Lock()
self._stop_event = threading.Event()
self._thread = threading.Thread(target=self._run, name="omni-b-side-control", daemon=True)
self._session = None
self._connected = False
self._last_error = ""
self._last_connect_attempt = 0.0
self._packets_received = 0
self._packets_enqueued = 0
self._ignored_non_binary = 0
self._ignored_bad_length = 0
self._dropped_queue_full = 0
@property
def packet_queue(self) -> queue.Queue[bytes]:
return self._queue
def start(self) -> None:
self._thread.start()
def stop(self) -> None:
self._stop_event.set()
if self._thread.is_alive():
self._thread.join(timeout=2.0)
self._disconnect("control receiver stopped")
def snapshot(self) -> dict[str, Any]:
with self._lock:
session = self._session
connected = self._connected and session is not None
last_error = self._last_error
packets_received = self._packets_received
packets_enqueued = self._packets_enqueued
ignored_non_binary = self._ignored_non_binary
ignored_bad_length = self._ignored_bad_length
dropped_queue_full = self._dropped_queue_full
stats = _default_stats()
metrics = _default_kcp_metrics()
if session is not None:
try:
stats = dict(session.stats())
metrics = dict(session.kcp_metrics())
connected = bool(stats.get("connected") and metrics.get("connected"))
except Exception as error: # pragma: no cover - runtime integration
connected = False
last_error = str(error)
return {
"connected": connected,
"peer_id": self._connect_kwargs["peer_id"],
"server_addr": self._connect_kwargs["server_addr"],
"relay_via": self._connect_kwargs["relay_via"],
"queue_depth": self._queue.qsize(),
"packets_received": packets_received,
"packets_enqueued": packets_enqueued,
"ignored_non_binary": ignored_non_binary,
"ignored_bad_length": ignored_bad_length,
"dropped_queue_full": dropped_queue_full,
"last_error": last_error,
"stats": stats,
"kcp_metrics": metrics,
}
def _run(self) -> None:
while not self._stop_event.is_set():
if not self._is_connected():
now = time.monotonic()
if now - self._last_connect_attempt >= self._reconnect_delay_s:
self._last_connect_attempt = now
self._connect()
time.sleep(0.2)
continue
with self._lock:
session = self._session
if session is None:
continue
try:
item = session.recv(timeout_ms=200)
except Exception as error: # pragma: no cover - runtime integration
self._disconnect(str(error))
continue
if item is None:
continue
_from_peer, msg_type, payload = item
with self._lock:
self._packets_received += 1
if msg_type != self._msg_type_binary:
with self._lock:
self._ignored_non_binary += 1
self._disconnect(self._describe_unexpected_message(msg_type, payload))
continue
if len(payload) != CONTROL_PACKET_STRUCT.size:
with self._lock:
self._ignored_bad_length += 1
continue
self._enqueue_packet(bytes(payload))
def _connect(self) -> None:
session = self._session_cls()
try:
session.connect(**self._connect_kwargs)
except Exception as error: # pragma: no cover - runtime integration
with self._lock:
self._connected = False
self._last_error = str(error)
try:
session.close()
except Exception:
pass
return
with self._lock:
self._session = session
self._connected = True
self._last_error = ""
def _disconnect(self, error_message: str) -> None:
with self._lock:
session = self._session
self._session = None
self._connected = False
if error_message:
self._last_error = error_message
if session is not None:
try:
session.close()
except Exception:
pass
def _describe_unexpected_message(self, msg_type: int, payload: bytes) -> str:
detail = payload.decode("utf-8", errors="replace").strip()
if msg_type == self._msg_type_error:
return f"control session rejected by server: {detail or 'unknown error'}"
if detail:
return f"received unexpected control message type {msg_type}: {detail}"
return f"received unexpected control message type {msg_type}"
def _enqueue_packet(self, payload: bytes) -> None:
try:
self._queue.put_nowait(payload)
except queue.Full:
try:
self._queue.get_nowait()
except queue.Empty:
pass
with self._lock:
self._dropped_queue_full += 1
try:
self._queue.put_nowait(payload)
except queue.Full:
with self._lock:
self._dropped_queue_full += 1
return
with self._lock:
self._packets_enqueued += 1
def _is_connected(self) -> bool:
with self._lock:
return self._connected and self._session is not None
class ControlFanout:
def __init__(self, socket_path: str, packet_queue: queue.Queue[bytes]) -> None:
self._socket_path = socket_path
self._packet_queue = packet_queue
self._lock = threading.Lock()
self._stop_event = threading.Event()
self._accept_thread = threading.Thread(target=self._accept_loop, name="omni-b-side-ctrl-accept", daemon=True)
self._send_thread = threading.Thread(target=self._send_loop, name="omni-b-side-ctrl-send", daemon=True)
self._server_socket: socket.socket | None = None
self._consumer_socket: socket.socket | None = None
self._sent_packets = 0
self._accepted_connections = 0
self._dropped_no_consumer = 0
self._dropped_send_errors = 0
def start(self) -> None:
socket_dir = os.path.dirname(self._socket_path)
if socket_dir:
os.makedirs(socket_dir, exist_ok=True)
try:
os.unlink(self._socket_path)
except FileNotFoundError:
pass
server_socket = socket.socket(socket.AF_UNIX, socket.SOCK_SEQPACKET)
server_socket.bind(self._socket_path)
server_socket.listen(1)
server_socket.settimeout(0.5)
self._server_socket = server_socket
self._accept_thread.start()
self._send_thread.start()
def stop(self) -> None:
self._stop_event.set()
with self._lock:
consumer = self._consumer_socket
self._consumer_socket = None
if consumer is not None:
try:
consumer.close()
except OSError:
pass
if self._server_socket is not None:
try:
self._server_socket.close()
except OSError:
pass
self._server_socket = None
if self._accept_thread.is_alive():
self._accept_thread.join(timeout=2.0)
if self._send_thread.is_alive():
self._send_thread.join(timeout=2.0)
try:
os.unlink(self._socket_path)
except FileNotFoundError:
pass
def snapshot(self) -> dict[str, Any]:
with self._lock:
consumer_connected = self._consumer_socket is not None
sent_packets = self._sent_packets
accepted_connections = self._accepted_connections
dropped_no_consumer = self._dropped_no_consumer
dropped_send_errors = self._dropped_send_errors
return {
"consumer_connected": consumer_connected,
"queue_depth": self._packet_queue.qsize(),
"sent_packets": sent_packets,
"accepted_connections": accepted_connections,
"dropped_no_consumer": dropped_no_consumer,
"dropped_send_errors": dropped_send_errors,
"socket_path": self._socket_path,
}
def _accept_loop(self) -> None:
while not self._stop_event.is_set():
server_socket = self._server_socket
if server_socket is None:
return
try:
consumer, _addr = server_socket.accept()
except socket.timeout:
continue
except OSError:
if self._stop_event.is_set():
return
time.sleep(0.2)
continue
with self._lock:
old_consumer = self._consumer_socket
self._consumer_socket = consumer
self._accepted_connections += 1
if old_consumer is not None:
try:
old_consumer.close()
except OSError:
pass
def _send_loop(self) -> None:
while not self._stop_event.is_set():
try:
payload = self._packet_queue.get(timeout=0.2)
except queue.Empty:
continue
with self._lock:
consumer = self._consumer_socket
if consumer is None:
with self._lock:
self._dropped_no_consumer += 1
continue
try:
consumer.sendall(payload)
except OSError:
with self._lock:
self._dropped_send_errors += 1
if self._consumer_socket is consumer:
self._consumer_socket = None
try:
consumer.close()
except OSError:
pass
continue
with self._lock:
self._sent_packets += 1
class VideoWorkerManager:
def __init__(self, config: dict[str, Any]) -> None:
self._config = config
self._transport_cfg = config["transport"]
self._video_cfg = config["video_sender"]
self._daemon_cfg = config["daemon"]
self._policy_cfg = config["policy"]
self._enabled = bool(self._video_cfg["enabled"])
self._restart_delay_s = max(0.5, self._daemon_cfg["worker_restart_delay_ms"] / 1000.0)
initial = self._video_cfg["initial_profile"]
initial_profile = VideoProfile(
fps=int(initial["fps"]),
jpeg_quality_qscale=int(initial["jpeg_quality_qscale"]),
max_frame_bytes=int(initial["max_frame_bytes"]),
)
self._lock = threading.Lock()
self._stop_event = threading.Event()
self._thread = threading.Thread(target=self._run, name="omni-b-side-video-worker", daemon=True)
self._reader_thread: threading.Thread | None = None
self._process: subprocess.Popen[str] | None = None
self._control_stream = None
self._telemetry_stream = None
self._mode = self._policy_cfg["mode"] if self._policy_cfg["mode"] in {"auto", "manual"} else "auto"
self._desired_profile = initial_profile
self._active_profile = initial_profile
self._restart_count = 0
self._last_exit_code: int | None = None
self._last_error = ""
self._frames_total = 0
self._frames_sent = 0
self._frames_dropped = 0
self._oversized_drops = 0
self._last_frame_bytes = 0
self._last_encode_us = 0
self._last_drop_reason = ""
self._last_frame_ts = 0
self._kcp_metrics = _default_kcp_metrics()
def start(self) -> None:
if not self._enabled:
return
if not os.path.exists(self._video_cfg["binary_path"]):
print(
(
"B-side video worker binary missing: "
f"{self._video_cfg['binary_path']} "
"(run `make b_side_video_sender` in OmniSocketGo)"
),
file=sys.stderr,
flush=True,
)
self._thread.start()
def stop(self) -> None:
self._stop_event.set()
self._send_worker_command({"type": "shutdown"})
if self._thread.is_alive():
self._thread.join(timeout=3.0)
self._terminate_worker()
def snapshot(self) -> dict[str, Any]:
with self._lock:
process = self._process
running = process is not None and process.poll() is None
return {
"enabled": self._enabled,
"running": running,
"mode": self._mode,
"desired_profile": self._desired_profile.as_dict(),
"active_profile": self._active_profile.as_dict(),
"restart_count": self._restart_count,
"last_exit_code": self._last_exit_code,
"last_error": self._last_error,
"frames_total": self._frames_total,
"frames_sent": self._frames_sent,
"frames_dropped": self._frames_dropped,
"oversized_drops": self._oversized_drops,
"last_frame_bytes": self._last_frame_bytes,
"last_encode_us": self._last_encode_us,
"last_drop_reason": self._last_drop_reason,
"last_frame_ts": self._last_frame_ts,
"kcp_metrics": dict(self._kcp_metrics),
"binary_path": self._video_cfg["binary_path"],
}
def set_profile(
self,
*,
mode: str,
fps: int | None = None,
jpeg_quality_qscale: int | None = None,
max_frame_bytes: int | None = None,
) -> dict[str, Any]:
mode = mode.lower()
if mode not in {"auto", "manual"}:
raise ValueError("mode must be auto or manual")
if mode == "auto":
with self._lock:
self._mode = "auto"
profile = self._desired_profile
self._send_worker_command({"type": "set_profile", **profile.as_dict()})
return self.snapshot()
if fps is None or jpeg_quality_qscale is None or max_frame_bytes is None:
raise ValueError("manual mode requires fps, jpeg_quality_qscale, and max_frame_bytes")
profile = VideoProfile(
fps=max(1, int(fps)),
jpeg_quality_qscale=max(1, int(jpeg_quality_qscale)),
max_frame_bytes=max(1024, int(max_frame_bytes)),
)
with self._lock:
self._mode = "manual"
self._desired_profile = profile
self._send_worker_command({"type": "set_profile", **profile.as_dict()})
return self.snapshot()
def apply_auto_profile(self, profile: VideoProfile) -> None:
with self._lock:
if self._mode != "auto":
return
if self._desired_profile.as_dict() == profile.as_dict():
return
self._desired_profile = profile
self._send_worker_command({"type": "set_profile", **profile.as_dict()})
def _run(self) -> None:
while not self._stop_event.is_set():
process = self._process
if process is None:
self._spawn_worker()
time.sleep(0.2)
continue
exit_code = process.poll()
if exit_code is None:
time.sleep(0.5)
continue
with self._lock:
self._last_exit_code = int(exit_code)
self._last_error = f"video worker exited with code {exit_code}"
self._restart_count += 1
self._close_worker_handles()
with self._lock:
self._process = None
if self._stop_event.wait(self._restart_delay_s):
return
def _spawn_worker(self) -> None:
binary_path = self._video_cfg["binary_path"]
if not os.path.exists(binary_path):
with self._lock:
self._last_error = f"video worker binary not found: {binary_path}"
self._stop_event.wait(self._restart_delay_s)
return
command_read_fd, command_write_fd = os.pipe()
telemetry_read_fd, telemetry_write_fd = os.pipe()
env = dict(os.environ)
env.update(
{
"OMNI_VIDEO_SERVER_ADDR": self._transport_cfg["server_addr"],
"OMNI_VIDEO_RELAY_VIA": self._transport_cfg["relay_via"],
"OMNI_VIDEO_BIND_IP": self._transport_cfg["bind_ip"],
"OMNI_VIDEO_BIND_DEVICE": self._transport_cfg["bind_device"],
"OMNI_VIDEO_PEER_ID": self._video_cfg["peer_id"],
"OMNI_VIDEO_TARGET_PEER": self._video_cfg["target_peer"],
"OMNI_VIDEO_DEVICE": self._video_cfg["device"],
"OMNI_VIDEO_CAPTURE_WIDTH": str(self._video_cfg["capture_width"]),
"OMNI_VIDEO_CAPTURE_HEIGHT": str(self._video_cfg["capture_height"]),
"OMNI_VIDEO_OUTPUT_WIDTH": str(self._video_cfg["output_width"]),
"OMNI_VIDEO_OUTPUT_HEIGHT": str(self._video_cfg["output_height"]),
"OMNI_VIDEO_STATS_INTERVAL_MS": str(self._video_cfg["stats_interval_ms"]),
"OMNI_WORKER_CONTROL_FD": str(command_read_fd),
"OMNI_WORKER_TELEMETRY_FD": str(telemetry_write_fd),
}
)
with self._lock:
profile = self._desired_profile
env["OMNI_VIDEO_FPS"] = str(profile.fps)
env["OMNI_VIDEO_JPEG_QSCALE"] = str(profile.jpeg_quality_qscale)
env["OMNI_VIDEO_MAX_FRAME_BYTES"] = str(profile.max_frame_bytes)
try:
process = subprocess.Popen(
[binary_path],
stdin=subprocess.DEVNULL,
stdout=subprocess.DEVNULL,
stderr=None,
env=env,
close_fds=True,
pass_fds=(command_read_fd, telemetry_write_fd),
)
except Exception as error: # pragma: no cover - runtime integration
for fd in (command_read_fd, command_write_fd, telemetry_read_fd, telemetry_write_fd):
try:
os.close(fd)
except OSError:
pass
with self._lock:
self._last_error = str(error)
self._stop_event.wait(self._restart_delay_s)
return
os.close(command_read_fd)
os.close(telemetry_write_fd)
control_stream = os.fdopen(command_write_fd, "w", buffering=1, encoding="utf-8")
telemetry_stream = os.fdopen(
telemetry_read_fd,
"r",
buffering=1,
encoding="utf-8",
errors="replace",
)
with self._lock:
self._process = process
self._control_stream = control_stream
self._telemetry_stream = telemetry_stream
self._active_profile = profile
self._last_error = ""
reader_thread = threading.Thread(
target=self._telemetry_loop,
args=(telemetry_stream, process),
name="omni-b-side-video-telemetry",
daemon=True,
)
self._reader_thread = reader_thread
reader_thread.start()
self._send_worker_command({"type": "set_profile", **profile.as_dict()})
def _telemetry_loop(self, telemetry_stream, process: subprocess.Popen[Any]) -> None:
while not self._stop_event.is_set():
line = telemetry_stream.readline()
if not line:
return
try:
payload = json.loads(line)
except json.JSONDecodeError:
continue
if not isinstance(payload, dict):
continue
payload_type = payload.get("type")
with self._lock:
if payload_type == "frame_stat":
self._frames_total += 1
self._last_frame_bytes = int(payload.get("frame_bytes", 0) or 0)
self._last_encode_us = int(payload.get("encode_us", 0) or 0)
self._last_drop_reason = str(payload.get("drop_reason", ""))
self._last_frame_ts = int(payload.get("ts_unix_ms", 0) or 0)
if bool(payload.get("sent", False)):
self._frames_sent += 1
else:
self._frames_dropped += 1
if self._last_drop_reason == "frame_too_large":
self._oversized_drops += 1
elif payload_type == "kcp_metrics":
merged = _default_kcp_metrics()
for key in merged:
if key in payload:
merged[key] = payload[key]
self._kcp_metrics = merged
else:
self._last_error = f"unknown worker telemetry type: {payload_type}"
if process.poll() is not None:
return
def _send_worker_command(self, payload: dict[str, Any]) -> None:
with self._lock:
control_stream = self._control_stream
if control_stream is None:
return
try:
control_stream.write(json.dumps(payload) + "\n")
control_stream.flush()
except OSError as error:
with self._lock:
self._last_error = str(error)
def _close_worker_handles(self) -> None:
with self._lock:
control_stream = self._control_stream
telemetry_stream = self._telemetry_stream
self._control_stream = None
self._telemetry_stream = None
if control_stream is not None:
try:
control_stream.close()
except OSError:
pass
if telemetry_stream is not None:
try:
telemetry_stream.close()
except OSError:
pass
def _terminate_worker(self) -> None:
self._close_worker_handles()
with self._lock:
process = self._process
self._process = None
if process is None:
return
if process.poll() is None:
process.terminate()
try:
process.wait(timeout=2.0)
except subprocess.TimeoutExpired:
process.kill()
try:
process.wait(timeout=2.0)
except subprocess.TimeoutExpired:
pass
class PolicyEngine:
def __init__(self, policy_cfg: dict[str, Any]) -> None:
self._policy_cfg = policy_cfg
def classify(
self,
*,
control_connected: bool,
consumer_connected: bool,
avg_srtt_ms: float,
retrans_delta: int,
video_oversized_recent: bool,
video_backlogged: bool,
) -> tuple[str, VideoProfile]:
if not control_connected or not consumer_connected:
band = "red"
elif avg_srtt_ms >= self._policy_cfg["yellow_srtt_ms"] or retrans_delta >= self._policy_cfg["retrans_red_threshold"]:
band = "red"
elif avg_srtt_ms >= self._policy_cfg["green_srtt_ms"] or retrans_delta > 0:
band = "yellow"
else:
band = "green"
if band == "green" and (video_oversized_recent or video_backlogged):
band = "yellow"
elif band == "yellow" and (video_oversized_recent or video_backlogged):
band = "red"
profile_raw = self._policy_cfg[f"profile_{band}"]
return band, VideoProfile(
fps=int(profile_raw["fps"]),
jpeg_quality_qscale=int(profile_raw["jpeg_quality_qscale"]),
max_frame_bytes=int(profile_raw["max_frame_bytes"]),
)
class TelemetrySampler:
def __init__(
self,
config: dict[str, Any],
control_manager: ControlRecvManager,
fanout: ControlFanout,
video_worker: VideoWorkerManager,
) -> None:
self._config = config
self._control_manager = control_manager
self._fanout = fanout
self._video_worker = video_worker
self._policy_engine = PolicyEngine(config["policy"])
self._interval_s = max(0.1, config["daemon"]["telemetry_interval_ms"] / 1000.0)
self._window_s = max(self._interval_s, config["policy"]["health_window_ms"] / 1000.0)
self._lock = threading.Lock()
self._stop_event = threading.Event()
self._thread = threading.Thread(target=self._run, name="omni-b-side-telemetry", daemon=True)
self._started_at = utc_iso_now()
self._state = self._build_initial_state(config)
self._history: list[dict[str, Any]] = []
self._last_totals: dict[str, float] | None = None
def start(self) -> None:
self._thread.start()
def stop(self) -> None:
self._stop_event.set()
if self._thread.is_alive():
self._thread.join(timeout=2.0)
def snapshot(self) -> dict[str, Any]:
with self._lock:
return copy.deepcopy(self._state)
def _run(self) -> None:
while not self._stop_event.is_set():
self._sample_once()
self._stop_event.wait(self._interval_s)
def _sample_once(self) -> None:
now = time.time()
control = self._control_manager.snapshot()
fanout = self._fanout.snapshot()
video = self._video_worker.snapshot()
control_metrics = control["kcp_metrics"]
video_metrics = video["kcp_metrics"]
self._history.append(
{
"ts": now,
"connected": bool(control["connected"]),
"srtt_ms": float(control_metrics.get("srtt_ms", 0.0) or 0.0),
"retrans_segs": int(control_metrics.get("retrans_segs", 0) or 0),
}
)
self._history = [sample for sample in self._history if now - sample["ts"] <= self._window_s]
srtt_values = [sample["srtt_ms"] for sample in self._history if sample["connected"]]
avg_srtt_ms = sum(srtt_values) / len(srtt_values) if srtt_values else 0.0
jitter_ms = 0.0
if len(srtt_values) >= 2:
deltas = [abs(srtt_values[index] - srtt_values[index - 1]) for index in range(1, len(srtt_values))]
jitter_ms = sum(deltas) / len(deltas)
retrans_delta = 0
if len(self._history) >= 2:
retrans_delta = max(0, int(self._history[-1]["retrans_segs"] - self._history[0]["retrans_segs"]))
last_frame_ts_ms = int(video["last_frame_ts"] or 0)
video_oversized_recent = bool(
video["last_drop_reason"] == "frame_too_large"
and last_frame_ts_ms > 0
and now - (last_frame_ts_ms / 1000.0) <= self._window_s
)
video_backlogged = bool(
int(video_metrics.get("ring_buffer_snd_queue", 0) or 0) > 128
or int(video_metrics.get("ring_buffer_snd_buffer", 0) or 0) > 128
)
health_band, recommended_profile = self._policy_engine.classify(
control_connected=bool(control["connected"]),
consumer_connected=bool(fanout["consumer_connected"]),
avg_srtt_ms=avg_srtt_ms,
retrans_delta=retrans_delta,
video_oversized_recent=video_oversized_recent,
video_backlogged=video_backlogged,
)
self._video_worker.apply_auto_profile(recommended_profile)
total_bytes_sent = int(control_metrics.get("bytes_sent", 0) or 0) + int(video_metrics.get("bytes_sent", 0) or 0)
total_bytes_received = int(control_metrics.get("bytes_received", 0) or 0) + int(video_metrics.get("bytes_received", 0) or 0)
total_out_segs = int(control_metrics.get("out_segs", 0) or 0) + int(video_metrics.get("out_segs", 0) or 0)
total_retrans = int(control_metrics.get("retrans_segs", 0) or 0) + int(video_metrics.get("retrans_segs", 0) or 0)
tx_kbps = 0
rx_kbps = 0
retrans_pct = 0.0
if self._last_totals is not None:
dt = max(0.001, now - self._last_totals["ts"])
sent_delta = max(0, total_bytes_sent - int(self._last_totals["bytes_sent"]))
recv_delta = max(0, total_bytes_received - int(self._last_totals["bytes_received"]))
out_seg_delta = max(0, total_out_segs - int(self._last_totals["out_segs"]))
retrans_total_delta = max(0, total_retrans - int(self._last_totals["retrans"]))
tx_kbps = int((sent_delta * 8.0) / dt / 1000.0)
rx_kbps = int((recv_delta * 8.0) / dt / 1000.0)
if out_seg_delta > 0:
retrans_pct = round((retrans_total_delta / out_seg_delta) * 100.0, 2)
self._last_totals = {
"ts": now,
"bytes_sent": float(total_bytes_sent),
"bytes_received": float(total_bytes_received),
"out_segs": float(total_out_segs),
"retrans": float(total_retrans),
}
state = {
"network": {
"peer_status": "online" if control["connected"] else "offline",
"latency_ms": round(avg_srtt_ms or float(control_metrics.get("srtt_ms", 0) or 0.0), 1),
"jitter_ms": round(jitter_ms, 1),
"retrans_pct": round(retrans_pct, 2),
"packet_loss_pct": round(retrans_pct, 2),
"tx_kbps": tx_kbps,
"rx_kbps": rx_kbps,
"signal_dbm": None,
"transport": "OmniSocket / daemon",
"source_mode": "daemon-live",
"updated_at": utc_iso_now(),
},
"control": {
"connected": bool(control["connected"]),
"consumer_connected": bool(fanout["consumer_connected"]),
"queue_depth": int(control["queue_depth"]),
"fanout_queue_depth": int(fanout["queue_depth"]),
"packets_received": int(control["packets_received"]),
"packets_enqueued": int(control["packets_enqueued"]),
"sent_to_consumer": int(fanout["sent_packets"]),
"dropped_no_consumer": int(fanout["dropped_no_consumer"]),
"dropped_queue_full": int(control["dropped_queue_full"]),
"ignored_non_binary": int(control["ignored_non_binary"]),
"ignored_bad_length": int(control["ignored_bad_length"]),
"last_error": control["last_error"],
"peer_id": control["peer_id"],
"server_addr": control["server_addr"],
"relay_via": control["relay_via"],
"stats": control["stats"],
"kcp_metrics": control_metrics,
},
"video": {
"enabled": bool(video["enabled"]),
"running": bool(video["running"]),
"mode": str(video["mode"]),
"desired_profile": video["desired_profile"],
"active_profile": video["active_profile"],
"restart_count": int(video["restart_count"]),
"last_exit_code": video["last_exit_code"],
"last_error": video["last_error"],
"frames_total": int(video["frames_total"]),
"frames_sent": int(video["frames_sent"]),
"frames_dropped": int(video["frames_dropped"]),
"oversized_drops": int(video["oversized_drops"]),
"last_frame_bytes": int(video["last_frame_bytes"]),
"last_encode_us": int(video["last_encode_us"]),
"last_drop_reason": str(video["last_drop_reason"]),
"last_frame_ts": int(video["last_frame_ts"]),
"binary_path": video["binary_path"],
"kcp_metrics": video_metrics,
},
"policy": {
"mode": video["mode"],
"health_band": health_band,
"recommended_video_profile": recommended_profile.as_dict(),
},
"daemon": {
"started_at": self._started_at,
"version": VERSION,
"config_path": self._config["config_path"],
"socket_path": self._config["daemon"]["socket_path"],
"ctrl_socket_path": self._config["daemon"]["ctrl_socket_path"],
},
}
with self._lock:
self._state = state
@staticmethod
def _build_initial_state(config: dict[str, Any]) -> dict[str, Any]:
return {
"network": {
"peer_status": "offline",
"latency_ms": 0.0,
"jitter_ms": 0.0,
"retrans_pct": 0.0,
"packet_loss_pct": 0.0,
"tx_kbps": 0,
"rx_kbps": 0,
"signal_dbm": None,
"transport": "OmniSocket / daemon",
"source_mode": "daemon-starting",
"updated_at": utc_iso_now(),
},
"control": {
"connected": False,
"consumer_connected": False,
"queue_depth": 0,
"fanout_queue_depth": 0,
"packets_received": 0,
"packets_enqueued": 0,
"sent_to_consumer": 0,
"dropped_no_consumer": 0,
"dropped_queue_full": 0,
"ignored_non_binary": 0,
"ignored_bad_length": 0,
"last_error": "",
"peer_id": "",
"server_addr": "",
"relay_via": "",
"stats": _default_stats(),
"kcp_metrics": _default_kcp_metrics(),
},
"video": {
"enabled": bool(config["video_sender"]["enabled"]),
"running": False,
"mode": config["policy"]["mode"],
"desired_profile": dict(config["video_sender"]["initial_profile"]),
"active_profile": dict(config["video_sender"]["initial_profile"]),
"restart_count": 0,
"last_exit_code": None,
"last_error": "",
"frames_total": 0,
"frames_sent": 0,
"frames_dropped": 0,
"oversized_drops": 0,
"last_frame_bytes": 0,
"last_encode_us": 0,
"last_drop_reason": "",
"last_frame_ts": 0,
"binary_path": config["video_sender"]["binary_path"],
"kcp_metrics": _default_kcp_metrics(),
},
"policy": {
"mode": config["policy"]["mode"],
"health_band": "red",
"recommended_video_profile": dict(config["policy"]["profile_red"]),
},
"daemon": {
"started_at": utc_iso_now(),
"version": VERSION,
"config_path": config["config_path"],
"socket_path": config["daemon"]["socket_path"],
"ctrl_socket_path": config["daemon"]["ctrl_socket_path"],
},
}
class ThreadingUnixHTTPServer(socketserver.ThreadingMixIn, socketserver.UnixStreamServer):
daemon_threads = True
class OmniDaemonHTTPHandler(BaseHTTPRequestHandler):
server_version = "BsideOmniDaemonHTTP/1.0"
protocol_version = "HTTP/1.1"
def do_GET(self) -> None: # pragma: no cover - exercised by runtime integration
app: BSideOmniDaemon = self.server.app # type: ignore[attr-defined]
if self.path == "/v1/health":
state = app.get_state()
band = state["policy"]["health_band"]
status = "ok" if band == "green" else "degraded"
if not state["control"]["connected"]:
status = "unavailable"
self._send_json(
HTTPStatus.OK,
{
"status": status,
"health_band": band,
"control_connected": state["control"]["connected"],
"consumer_connected": state["control"]["consumer_connected"],
"video_running": state["video"]["running"],
"updated_at": state["network"]["updated_at"],
},
)
return
if self.path == "/v1/state":
self._send_json(HTTPStatus.OK, app.get_state())
return
self._send_json(HTTPStatus.NOT_FOUND, {"error": f"unknown path: {self.path}"})
def do_POST(self) -> None: # pragma: no cover - exercised by runtime integration
app: BSideOmniDaemon = self.server.app # type: ignore[attr-defined]
if self.path != "/v1/video/profile":
self._send_json(HTTPStatus.NOT_FOUND, {"error": f"unknown path: {self.path}"})
return
try:
payload = self._read_json()
except ValueError as error:
self._send_json(HTTPStatus.BAD_REQUEST, {"error": str(error)})
return
try:
mode = str(payload.get("mode", "auto")).lower()
if mode == "auto":
state = app.set_video_profile(mode="auto")
else:
state = app.set_video_profile(
mode="manual",
fps=int(payload.get("fps", 0)),
jpeg_quality_qscale=int(payload.get("jpeg_quality_qscale", 0)),
max_frame_bytes=int(payload.get("max_frame_bytes", 0)),
)
except (TypeError, ValueError) as error:
self._send_json(HTTPStatus.BAD_REQUEST, {"error": str(error)})
return
self._send_json(HTTPStatus.OK, state)
def log_message(self, format: str, *args: Any) -> None: # noqa: A003
return
def _read_json(self) -> dict[str, Any]:
raw_length = self.headers.get("Content-Length")
if raw_length is None:
raise ValueError("missing Content-Length")
try:
length = int(raw_length)
except ValueError as error:
raise ValueError("invalid Content-Length") from error
raw = self.rfile.read(length)
try:
payload = json.loads(raw.decode("utf-8"))
except json.JSONDecodeError as error:
raise ValueError(f"invalid JSON body: {error}") from error
if not isinstance(payload, dict):
raise ValueError("request body must be a JSON object")
return payload
def _send_json(self, status: HTTPStatus, payload: dict[str, Any]) -> None:
body = json.dumps(payload).encode("utf-8")
self.send_response(status.value)
self.send_header("Content-Type", "application/json; charset=utf-8")
self.send_header("Content-Length", str(len(body)))
self.send_header("Cache-Control", "no-store")
self.send_header("Connection", "keep-alive")
try:
self.end_headers()
self.wfile.write(body)
except (BrokenPipeError, ConnectionResetError):
return
class BSideOmniDaemon:
def __init__(self, config_path: str | None = None) -> None:
self._config = _load_config(config_path)
self._control_manager = ControlRecvManager(self._config)
self._fanout = ControlFanout(
self._config["daemon"]["ctrl_socket_path"],
self._control_manager.packet_queue,
)
self._video_worker = VideoWorkerManager(self._config)
self._telemetry = TelemetrySampler(
self._config,
self._control_manager,
self._fanout,
self._video_worker,
)
self._server: ThreadingUnixHTTPServer | None = None
self._server_thread: threading.Thread | None = None
@property
def socket_path(self) -> str:
return self._config["daemon"]["socket_path"]
def start(self) -> None:
self._fanout.start()
self._control_manager.start()
self._video_worker.start()
self._telemetry.start()
self._server = self._build_server()
def stop(self) -> None:
server_thread = None
if self._server is not None:
if self._server_thread is not None and self._server_thread.is_alive():
self._server.shutdown()
self._server.server_close()
self._server = None
if self._server_thread is not None and self._server_thread.is_alive():
server_thread = self._server_thread
self._server_thread = None
try:
os.unlink(self.socket_path)
except FileNotFoundError:
pass
if server_thread is not None and server_thread is not threading.current_thread():
server_thread.join(timeout=2.0)
self._telemetry.stop()
self._video_worker.stop()
self._control_manager.stop()
self._fanout.stop()
def serve_forever(self) -> None:
self.start()
assert self._server is not None
self._server_thread = threading.Thread(
target=self._server.serve_forever,
name="omni-b-side-http",
daemon=True,
)
self._server_thread.start()
print(
(
"B-side OmniDaemon ready "
f"(state: curl --unix-socket {self.socket_path} http://localhost/v1/state)"
),
file=sys.stderr,
flush=True,
)
self._server_thread.join()
def get_state(self) -> dict[str, Any]:
return self._telemetry.snapshot()
def set_video_profile(
self,
*,
mode: str,
fps: int | None = None,
jpeg_quality_qscale: int | None = None,
max_frame_bytes: int | None = None,
) -> dict[str, Any]:
return self._video_worker.set_profile(
mode=mode,
fps=fps,
jpeg_quality_qscale=jpeg_quality_qscale,
max_frame_bytes=max_frame_bytes,
)
def _build_server(self) -> ThreadingUnixHTTPServer:
socket_path = self.socket_path
socket_dir = os.path.dirname(socket_path)
if socket_dir:
os.makedirs(socket_dir, exist_ok=True)
try:
os.unlink(socket_path)
except FileNotFoundError:
pass
server = ThreadingUnixHTTPServer(socket_path, OmniDaemonHTTPHandler)
server.app = self # type: ignore[attr-defined]
return server
def main(argv: list[str] | None = None) -> None:
parser = argparse.ArgumentParser(description="Run the B-side OmniDaemon")
parser.add_argument("--config", dest="config_path", help="Path to daemon YAML config")
args = parser.parse_args(argv)
app = BSideOmniDaemon(config_path=args.config_path)
print(
(
"B-side OmniDaemon starting "
f"(config={app._config['config_path']}, "
f"socket={app._config['daemon']['socket_path']}, "
f"ctrl_socket={app._config['daemon']['ctrl_socket_path']}, "
f"video_enabled={app._config['video_sender']['enabled']})"
),
file=sys.stderr,
flush=True,
)
def _handle_signal(_signum: int, _frame: Any) -> None:
app.stop()
signal.signal(signal.SIGINT, _handle_signal)
signal.signal(signal.SIGTERM, _handle_signal)
try:
app.serve_forever()
except KeyboardInterrupt:
pass
finally:
app.stop()
if __name__ == "__main__":
main()