diff --git a/README.md b/README.md index c77e734..ccb9c58 100644 --- a/README.md +++ b/README.md @@ -27,6 +27,30 @@ make python-ext make python-install ``` +## A-Side OmniDaemon + +The A-side daemon is configured from `config/a_side_omnidaemon.yaml` in this repo. The safest way to start it is to pass that file explicitly, because the installed Python package does not bundle the YAML config. + +Run from a source checkout: + +```bash +python -m omnisocket_a_side.daemon --config "$(pwd)/config/a_side_omnidaemon.yaml" +``` + +Or, if you installed the console script: + +```bash +OMNIDAEMON_CONFIG="$(pwd)/config/a_side_omnidaemon.yaml" \ +omnisocket-a-side-daemon +``` + +Optional overrides: + +- `OMNIDAEMON_SOCKET=/tmp/omnisocket-a-side.sock` selects the local UDS path. +- `OMNIDAEMON_CONFIG=/abs/path/to/a_side_omnidaemon.yaml` overrides `--config`. + +For `robot-command-center` and the A-side senders, keep the daemon and its clients on the same Linux machine so they can share the Unix-domain socket. + ## Run On Different Machines Server `D` runs the KCP hub on `0.0.0.0:10909`: diff --git a/config/a_side_omnidaemon.yaml b/config/a_side_omnidaemon.yaml new file mode 100644 index 0000000..62d9963 --- /dev/null +++ b/config/a_side_omnidaemon.yaml @@ -0,0 +1,51 @@ +transport: + server_addr: "81.70.156.140:10909" + relay_via: "106.55.173.235:10909" + bind_ip: "" + bind_device: "" + +control_sender: + peer_id: "peer-a-ctrl" + target_peer: "peer-b-ctrl" + nodelay: 1 + interval_ms: 5 + resend: 2 + nc: 1 + sndwnd: 32 + rcvwnd: 32 + mtu: 1400 + stats_interval_ms: 100 + +video_receiver: + peer_id: "peer-a-video" + buffer_bytes: 1048576 + nodelay: 1 + interval_ms: 10 + resend: 2 + nc: 1 + sndwnd: 256 + rcvwnd: 256 + mtu: 1400 + stats_interval_ms: 100 + +daemon: + socket_path: "/tmp/omnisocket-a-side.sock" + reconnect_delay_ms: 2000 + telemetry_interval_ms: 100 + analog_send_hz: 100 + frame_stale_ms: 500 + +policy: + health_window_ms: 2000 + green_srtt_ms: 35 + yellow_srtt_ms: 60 + retrans_red_threshold: 10 + profile_green: + fps: 15 + max_frame_kb: 60 + profile_yellow: + fps: 10 + max_frame_kb: 40 + profile_red: + fps: 5 + max_frame_kb: 20 diff --git a/include/peer_kcp_client.h b/include/peer_kcp_client.h index d61111a..5b7afdc 100644 --- a/include/peer_kcp_client.h +++ b/include/peer_kcp_client.h @@ -27,6 +27,7 @@ int kcp_client_receive_timed(kcp_client_t *client, message_t *out_msg, int timeo int kcp_client_receive(kcp_client_t *client, message_t *out_msg); int kcp_client_receive_binary_into(kcp_client_t *client, void *buffer, size_t buffer_len, kcp_client_recv_meta_t *out_meta, int timeout_ms); int kcp_client_persist_message(kcp_client_t *client, const message_t *msg, const char *inbox_dir, char *out_path, size_t out_path_len); +int kcp_client_metrics_snapshot(kcp_client_t *client, kcp_conn_metrics_t *out_metrics); int kcp_client_close(kcp_client_t *client); void kcp_client_free(kcp_client_t *client); diff --git a/include/transport_kcp.h b/include/transport_kcp.h index d7c3c95..737752e 100644 --- a/include/transport_kcp.h +++ b/include/transport_kcp.h @@ -43,6 +43,32 @@ extern "C" { typedef struct kcp_conn kcp_conn_t; typedef struct kcp_listener kcp_listener_t; +typedef struct kcp_conn_metrics { + int connected; + int has_conv; + uint32_t conv; + char local_addr[OMNI_MAX_ADDR_TEXT]; + char remote_addr[OMNI_MAX_ADDR_TEXT]; + uint32_t rto_ms; + int32_t srtt_ms; + int32_t srttvar_ms; + uint64_t bytes_sent; + uint64_t bytes_received; + uint64_t in_pkts; + uint64_t out_pkts; + uint64_t in_segs; + uint64_t out_segs; + uint64_t retrans_segs; + uint64_t fast_retrans_segs; + uint64_t early_retrans_segs; + uint64_t lost_segs; + uint64_t repeat_segs; + uint64_t in_errs; + uint64_t kcp_in_errs; + uint64_t ring_buffer_snd_queue; + uint64_t ring_buffer_rcv_queue; + uint64_t ring_buffer_snd_buffer; +} kcp_conn_metrics_t; typedef struct kcp_conn_options { int nodelay; int interval_ms; @@ -68,6 +94,7 @@ int kcp_conn_close(kcp_conn_t *conn); void kcp_conn_free(kcp_conn_t *conn); uint32_t kcp_conn_conv(const kcp_conn_t *conn); int kcp_conn_local_addr(const kcp_conn_t *conn, struct sockaddr_storage *addr, socklen_t *addr_len); +int kcp_conn_metrics_snapshot(kcp_conn_t *conn, kcp_conn_metrics_t *out_metrics); kcp_listener_t *kcp_listener_listen(const char *listen_addr, const char *bind_device, kcp_packet_debug_logger_t *packet_logger, const char *node_role, const char *node_id); kcp_conn_t *kcp_listener_accept(kcp_listener_t *listener); diff --git a/python/omnisocket/_omnisocket.c b/python/omnisocket/_omnisocket.c index 38d852f..c919b4e 100644 --- a/python/omnisocket/_omnisocket.c +++ b/python/omnisocket/_omnisocket.c @@ -22,6 +22,13 @@ PyDoc_STRVAR( "current frame has already been consumed and is lost." ); +PyDoc_STRVAR( + PyOmniSession_kcp_metrics_doc, + "kcp_metrics() -> dict\n" + "\n" + "Return a snapshot of low-level KCP metrics for the current session." +); + static PyObject *PyOmniSession_new(PyTypeObject *type, PyObject *args, PyObject *kwargs) { PyOmniSession *self; (void) args; @@ -281,6 +288,67 @@ static PyObject *PyOmniSession_stats(PyOmniSession *self, PyObject *Py_UNUSED(ig ); } +static PyObject *PyOmniSession_kcp_metrics(PyOmniSession *self, PyObject *Py_UNUSED(ignored)) { + omnisocket_session_kcp_metrics_t metrics; + + memset(&metrics, 0, sizeof(metrics)); + if (omnisocket_session_kcp_metrics_snapshot(&self->session, &metrics) != 0) { + return PyErr_SetFromErrno(PyExc_OSError); + } + + return Py_BuildValue( + "{s:i,s:i,s:I,s:s,s:s,s:I,s:i,s:i,s:K,s:K,s:K,s:K,s:K,s:K,s:K,s:K,s:K,s:K,s:K,s:K,s:K,s:K,s:K}", + "connected", + metrics.connected, + "has_conv", + metrics.has_conv, + "conv", + metrics.conv, + "local_addr", + metrics.local_addr, + "remote_addr", + metrics.remote_addr, + "rto_ms", + metrics.rto_ms, + "srtt_ms", + metrics.srtt_ms, + "srttvar_ms", + metrics.srttvar_ms, + "bytes_sent", + (unsigned long long) metrics.bytes_sent, + "bytes_received", + (unsigned long long) metrics.bytes_received, + "in_pkts", + (unsigned long long) metrics.in_pkts, + "out_pkts", + (unsigned long long) metrics.out_pkts, + "in_segs", + (unsigned long long) metrics.in_segs, + "out_segs", + (unsigned long long) metrics.out_segs, + "retrans_segs", + (unsigned long long) metrics.retrans_segs, + "fast_retrans_segs", + (unsigned long long) metrics.fast_retrans_segs, + "early_retrans_segs", + (unsigned long long) metrics.early_retrans_segs, + "lost_segs", + (unsigned long long) metrics.lost_segs, + "repeat_segs", + (unsigned long long) metrics.repeat_segs, + "in_errs", + (unsigned long long) metrics.in_errs, + "kcp_in_errs", + (unsigned long long) metrics.kcp_in_errs, + "ring_buffer_snd_queue", + (unsigned long long) metrics.ring_buffer_snd_queue, + "ring_buffer_rcv_queue", + (unsigned long long) metrics.ring_buffer_rcv_queue, + "ring_buffer_snd_buffer", + (unsigned long long) metrics.ring_buffer_snd_buffer + ); +} + static PyMethodDef PyOmniSession_methods[] = { {"connect", (PyCFunction) PyOmniSession_connect, METH_VARARGS | METH_KEYWORDS, NULL}, {"close", (PyCFunction) PyOmniSession_close, METH_NOARGS, NULL}, @@ -288,6 +356,7 @@ static PyMethodDef PyOmniSession_methods[] = { {"recv", (PyCFunction) PyOmniSession_recv, METH_VARARGS | METH_KEYWORDS, PyOmniSession_recv_doc}, {"recv_into", (PyCFunction) PyOmniSession_recv_into, METH_VARARGS | METH_KEYWORDS, PyOmniSession_recv_into_doc}, {"stats", (PyCFunction) PyOmniSession_stats, METH_NOARGS, NULL}, + {"kcp_metrics", (PyCFunction) PyOmniSession_kcp_metrics, METH_NOARGS, PyOmniSession_kcp_metrics_doc}, {NULL, NULL, 0, NULL} }; diff --git a/python/omnisocket/omnisocket_client.c b/python/omnisocket/omnisocket_client.c index 3557780..53b8cab 100644 --- a/python/omnisocket/omnisocket_client.c +++ b/python/omnisocket/omnisocket_client.c @@ -246,3 +246,72 @@ void omnisocket_session_stats_snapshot(omnisocket_session_t *session, omnisocket *out_stats = session->stats; pthread_mutex_unlock(&session->mutex); } + +int omnisocket_session_kcp_metrics_snapshot( + omnisocket_session_t *session, + omnisocket_session_kcp_metrics_t *out_metrics +) { + kcp_client_t *client = NULL; + kcp_conn_metrics_t metrics; + int rc = 0; + + if (session == NULL || out_metrics == NULL) { + errno = EINVAL; + return -1; + } + + memset(out_metrics, 0, sizeof(*out_metrics)); + + pthread_mutex_lock(&session->mutex); + if (session->client != NULL && !session->closing) { + client = session->client; + session->active_ops += 1; + } + pthread_mutex_unlock(&session->mutex); + + if (client == NULL) { + return 0; + } + + memset(&metrics, 0, sizeof(metrics)); + rc = kcp_client_metrics_snapshot(client, &metrics); + + pthread_mutex_lock(&session->mutex); + if (session->active_ops > 0) { + session->active_ops -= 1; + } + if (session->closing && session->active_ops == 0) { + pthread_cond_broadcast(&session->idle_cond); + } + pthread_mutex_unlock(&session->mutex); + + if (rc != 0) { + return rc; + } + + out_metrics->connected = metrics.connected; + out_metrics->has_conv = metrics.has_conv; + out_metrics->conv = metrics.conv; + snprintf(out_metrics->local_addr, sizeof(out_metrics->local_addr), "%s", metrics.local_addr); + snprintf(out_metrics->remote_addr, sizeof(out_metrics->remote_addr), "%s", metrics.remote_addr); + out_metrics->rto_ms = metrics.rto_ms; + out_metrics->srtt_ms = metrics.srtt_ms; + out_metrics->srttvar_ms = metrics.srttvar_ms; + out_metrics->bytes_sent = metrics.bytes_sent; + out_metrics->bytes_received = metrics.bytes_received; + out_metrics->in_pkts = metrics.in_pkts; + out_metrics->out_pkts = metrics.out_pkts; + out_metrics->in_segs = metrics.in_segs; + out_metrics->out_segs = metrics.out_segs; + out_metrics->retrans_segs = metrics.retrans_segs; + out_metrics->fast_retrans_segs = metrics.fast_retrans_segs; + out_metrics->early_retrans_segs = metrics.early_retrans_segs; + out_metrics->lost_segs = metrics.lost_segs; + out_metrics->repeat_segs = metrics.repeat_segs; + out_metrics->in_errs = metrics.in_errs; + out_metrics->kcp_in_errs = metrics.kcp_in_errs; + out_metrics->ring_buffer_snd_queue = metrics.ring_buffer_snd_queue; + out_metrics->ring_buffer_rcv_queue = metrics.ring_buffer_rcv_queue; + out_metrics->ring_buffer_snd_buffer = metrics.ring_buffer_snd_buffer; + return 0; +} diff --git a/python/omnisocket/omnisocket_client.h b/python/omnisocket/omnisocket_client.h index be9e4b2..cf7eeea 100644 --- a/python/omnisocket/omnisocket_client.h +++ b/python/omnisocket/omnisocket_client.h @@ -14,6 +14,33 @@ typedef struct omnisocket_session_stats { int connected; } omnisocket_session_stats_t; +typedef struct omnisocket_session_kcp_metrics { + int connected; + int has_conv; + uint32_t conv; + char local_addr[OMNI_MAX_ADDR_TEXT]; + char remote_addr[OMNI_MAX_ADDR_TEXT]; + uint32_t rto_ms; + int32_t srtt_ms; + int32_t srttvar_ms; + uint64_t bytes_sent; + uint64_t bytes_received; + uint64_t in_pkts; + uint64_t out_pkts; + uint64_t in_segs; + uint64_t out_segs; + uint64_t retrans_segs; + uint64_t fast_retrans_segs; + uint64_t early_retrans_segs; + uint64_t lost_segs; + uint64_t repeat_segs; + uint64_t in_errs; + uint64_t kcp_in_errs; + uint64_t ring_buffer_snd_queue; + uint64_t ring_buffer_rcv_queue; + uint64_t ring_buffer_snd_buffer; +} omnisocket_session_kcp_metrics_t; + typedef struct omnisocket_session { pthread_mutex_t mutex; pthread_cond_t idle_cond; @@ -47,5 +74,9 @@ int omnisocket_session_recv_into( int timeout_ms ); void omnisocket_session_stats_snapshot(omnisocket_session_t *session, omnisocket_session_stats_t *out_stats); +int omnisocket_session_kcp_metrics_snapshot( + omnisocket_session_t *session, + omnisocket_session_kcp_metrics_t *out_metrics +); #endif diff --git a/python/omnisocket_a_side/__init__.py b/python/omnisocket_a_side/__init__.py new file mode 100644 index 0000000..2f51111 --- /dev/null +++ b/python/omnisocket_a_side/__init__.py @@ -0,0 +1,9 @@ +from pathlib import Path + + +PACKAGE_ROOT = Path(__file__).resolve().parent +PYTHON_ROOT = PACKAGE_ROOT.parent +REPO_ROOT = PYTHON_ROOT.parent +DEFAULT_SOCKET_PATH = "/tmp/omnisocket-a-side.sock" +DEFAULT_CONFIG_PATH = REPO_ROOT / "config" / "a_side_omnidaemon.yaml" +VERSION = "0.1.0" diff --git a/python/omnisocket_a_side/__main__.py b/python/omnisocket_a_side/__main__.py new file mode 100644 index 0000000..c5a0ee7 --- /dev/null +++ b/python/omnisocket_a_side/__main__.py @@ -0,0 +1,5 @@ +from .daemon import main + + +if __name__ == "__main__": + main() diff --git a/python/omnisocket_a_side/client.py b/python/omnisocket_a_side/client.py new file mode 100644 index 0000000..f02a234 --- /dev/null +++ b/python/omnisocket_a_side/client.py @@ -0,0 +1,149 @@ +"""Local Unix-domain HTTP client for the A-side OmniDaemon.""" + +from __future__ import annotations + +import http.client +import json +import os +import socket +import threading +from typing import Any + +from . import DEFAULT_SOCKET_PATH + + +class OmniDaemonError(RuntimeError): + def __init__(self, message: str, status_code: int | None = None) -> None: + super().__init__(message) + self.status_code = status_code + + +class UnixHTTPConnection(http.client.HTTPConnection): + def __init__(self, socket_path: str, timeout: float = 2.0) -> None: + super().__init__("localhost", timeout=timeout) + self.socket_path = socket_path + + def connect(self) -> None: # pragma: no cover - runtime depends on Linux socket support + if not hasattr(socket, "AF_UNIX"): + raise OSError("AF_UNIX sockets are not available on this platform") + self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + self.sock.settimeout(self.timeout) + self.sock.connect(self.socket_path) + + +class OmniDaemonClient: + def __init__(self, socket_path: str | None = None, timeout: float = 2.0) -> None: + self.socket_path = socket_path or os.getenv("OMNIDAEMON_SOCKET", DEFAULT_SOCKET_PATH) + self.timeout = timeout + self._local = threading.local() + + def get_health(self) -> dict[str, Any]: + return self._request_json("GET", "/v1/health") + + def get_state(self) -> dict[str, Any]: + return self._request_json("GET", "/v1/state") + + def get_video_frame(self) -> bytes: + return self._request_bytes("GET", "/v1/video/frame") + + def get_control_status(self) -> dict[str, Any]: + return self._request_json("GET", "/v1/control/status") + + def send_control_event( + self, + *, + source: str, + event_code: str, + drive_value: float = 1.0, + client_time_ms: int | None = None, + ) -> dict[str, Any]: + payload = { + "source": source, + "event_code": event_code, + "drive_value": float(drive_value), + "client_time_ms": client_time_ms, + } + return self._request_json("POST", "/v1/control/event", payload) + + def close(self) -> None: + self._reset_connection() + + def _request_json( + self, + method: str, + path: str, + payload: dict[str, Any] | None = None, + ) -> dict[str, Any]: + raw = self._request_bytes(method, path, payload) + if not raw: + return {} + try: + return json.loads(raw.decode("utf-8")) + except json.JSONDecodeError as error: + raise OmniDaemonError(f"invalid daemon JSON response: {error}") from error + + def _request_bytes( + self, + method: str, + path: str, + payload: dict[str, Any] | None = None, + ) -> bytes: + body = b"" + headers: dict[str, str] = {} + if payload is not None: + body = json.dumps(payload).encode("utf-8") + headers["Content-Type"] = "application/json" + headers["Content-Length"] = str(len(body)) + headers.setdefault("Connection", "keep-alive") + + for attempt in range(2): + connection = self._get_connection() + try: + connection.request(method, path, body=body, headers=headers) + response = connection.getresponse() + raw = response.read() + except FileNotFoundError as error: + self._reset_connection() + raise OmniDaemonError( + f"daemon socket not found: {self.socket_path}" + ) from error + except (OSError, http.client.HTTPException) as error: + self._reset_connection() + if attempt == 0: + continue + raise OmniDaemonError( + f"daemon request failed via {self.socket_path}: {error}" + ) from error + + if getattr(response, "will_close", False): + self._reset_connection() + + if response.status >= 400: + message = raw.decode("utf-8", errors="replace").strip() or response.reason + try: + parsed = json.loads(message) + if isinstance(parsed, dict) and "error" in parsed: + message = str(parsed["error"]) + except json.JSONDecodeError: + pass + raise OmniDaemonError(message, status_code=response.status) + return raw + + raise OmniDaemonError(f"daemon request failed via {self.socket_path}: retry exhausted") + + def _get_connection(self) -> UnixHTTPConnection: + connection = getattr(self._local, "connection", None) + if connection is None: + connection = UnixHTTPConnection(self.socket_path, timeout=self.timeout) + self._local.connection = connection + return connection + + def _reset_connection(self) -> None: + connection = getattr(self._local, "connection", None) + if connection is None: + return + try: + connection.close() + except OSError: + pass + self._local.connection = None diff --git a/python/omnisocket_a_side/control_codec.py b/python/omnisocket_a_side/control_codec.py new file mode 100644 index 0000000..0026355 --- /dev/null +++ b/python/omnisocket_a_side/control_codec.py @@ -0,0 +1,90 @@ +"""Binary control packet codec shared by the daemon and local clients.""" + +from __future__ import annotations + +from dataclasses import dataclass +import struct +import time + + +CONTROL_PACKET_VERSION = 1 +CONTROL_PACKET_STRUCT = struct.Struct("!BBHIfQ") + +EVENT_NAME_TO_ID = { + "pose_home": 1, + "pose_hold": 2, + "mode_stride": 3, + "surge_up": 6, + "surge_down": 7, + "sway_left": 8, + "sway_right": 9, + "spin_left": 10, + "spin_right": 11, + "set_surge": 12, + "set_sway": 13, + "set_spin": 14, + "set_lift": 15, + "lift_up": 16, + "lift_down": 17, + "trim_reset": 18, + "session_quit": 19, +} +EVENT_ID_TO_NAME = {value: key for key, value in EVENT_NAME_TO_ID.items()} +ANALOG_EVENT_CODES = {"set_surge", "set_sway", "set_spin"} + + +@dataclass(slots=True) +class ControlPacket: + seq_id: int + event_id: int + drive_value: float = 1.0 + sent_at_ns: int = 0 + + @property + def event_name(self) -> str: + return EVENT_ID_TO_NAME.get(self.event_id, f"unknown_{self.event_id}") + + def encode(self) -> bytes: + sent_at_ns = self.sent_at_ns or time.time_ns() + return CONTROL_PACKET_STRUCT.pack( + CONTROL_PACKET_VERSION, + self.event_id, + 0, + int(self.seq_id), + float(self.drive_value), + int(sent_at_ns), + ) + + @classmethod + def decode(cls, payload: bytes) -> "ControlPacket": + if len(payload) != CONTROL_PACKET_STRUCT.size: + raise ValueError( + f"invalid control packet length {len(payload)}; " + f"want {CONTROL_PACKET_STRUCT.size}" + ) + version, event_id, _reserved, seq_id, drive_value, sent_at_ns = ( + CONTROL_PACKET_STRUCT.unpack(payload) + ) + if version != CONTROL_PACKET_VERSION: + raise ValueError(f"unsupported control packet version {version}") + return cls( + seq_id=int(seq_id), + event_id=int(event_id), + drive_value=float(drive_value), + sent_at_ns=int(sent_at_ns), + ) + + +def make_control_packet( + seq_id: int, + event_name: str, + drive_value: float = 1.0, + sent_at_ns: int | None = None, +) -> ControlPacket: + event_id = EVENT_NAME_TO_ID[event_name] + return ControlPacket( + seq_id=seq_id, + event_id=event_id, + drive_value=drive_value, + sent_at_ns=time.time_ns() if sent_at_ns is None else sent_at_ns, + ) diff --git a/python/omnisocket_a_side/daemon.py b/python/omnisocket_a_side/daemon.py new file mode 100644 index 0000000..ef953fa --- /dev/null +++ b/python/omnisocket_a_side/daemon.py @@ -0,0 +1,1063 @@ +"""A-side OmniDaemon that owns local control/video OmniSocket sessions.""" + +from __future__ import annotations + +import argparse +import copy +import json +import os +from pathlib import Path +import queue +import signal +import socketserver +import threading +import time +from dataclasses import dataclass +from datetime import UTC, datetime +from http import HTTPStatus +from http.server import BaseHTTPRequestHandler +from typing import Any + +import yaml + +from . import DEFAULT_CONFIG_PATH, DEFAULT_SOCKET_PATH, VERSION +from .control_codec import ANALOG_EVENT_CODES, EVENT_NAME_TO_ID, make_control_packet + + +def utc_iso_now() -> str: + return datetime.now(UTC).isoformat(timespec="seconds").replace("+00:00", "Z") + + +def load_omnisocket_api(): + from omnisocket import CONTROL_DEFAULTS, MSG_TYPE_BINARY, Session, VIDEO_DEFAULTS + + return CONTROL_DEFAULTS, MSG_TYPE_BINARY, Session, VIDEO_DEFAULTS + + +def _merge_kcp_defaults(defaults: dict[str, Any], override: dict[str, Any]) -> dict[str, Any]: + merged = dict(defaults) + for key in ("nodelay", "interval_ms", "resend", "nc", "sndwnd", "rcvwnd", "mtu"): + if key in override: + merged[key] = int(override[key]) + return merged + + +def _load_config(config_path: str | None) -> dict[str, Any]: + path = Path(os.getenv("OMNIDAEMON_CONFIG", config_path or str(DEFAULT_CONFIG_PATH))) + raw: dict[str, Any] = {} + if path.exists(): + with path.open("r", encoding="utf-8") as file: + raw = yaml.safe_load(file) or {} + + control_defaults, _msg_type_binary, _session_cls, video_defaults = load_omnisocket_api() + + transport = dict(raw.get("transport", {})) + control = dict(raw.get("control_sender", {})) + video = dict(raw.get("video_receiver", {})) + daemon_cfg = dict(raw.get("daemon", {})) + policy = dict(raw.get("policy", {})) + + def _profile(name: str, fps: int, max_frame_kb: int) -> dict[str, int]: + section = dict(policy.get(name, {})) + return { + "fps": int(section.get("fps", fps)), + "max_frame_bytes": int(section.get("max_frame_kb", max_frame_kb)) * 1024, + } + + return { + "config_path": str(path), + "transport": { + "server_addr": str(transport.get("server_addr", "")), + "relay_via": str(transport.get("relay_via", "")), + "bind_ip": str(transport.get("bind_ip", "")), + "bind_device": str(transport.get("bind_device", "")), + }, + "control_sender": { + "peer_id": str(control.get("peer_id", "peer-a-ctrl")), + "target_peer": str(control.get("target_peer", "peer-b-ctrl")), + "stats_interval_ms": int(control.get("stats_interval_ms", 100)), + "kcp": _merge_kcp_defaults(control_defaults, control), + }, + "video_receiver": { + "peer_id": str(video.get("peer_id", "peer-a-video")), + "buffer_bytes": int(video.get("buffer_bytes", 1024 * 1024)), + "stats_interval_ms": int(video.get("stats_interval_ms", 100)), + "kcp": _merge_kcp_defaults(video_defaults, video), + }, + "daemon": { + "socket_path": os.getenv( + "OMNIDAEMON_SOCKET", + str(daemon_cfg.get("socket_path", DEFAULT_SOCKET_PATH)), + ), + "reconnect_delay_ms": int(daemon_cfg.get("reconnect_delay_ms", 2000)), + "telemetry_interval_ms": int(daemon_cfg.get("telemetry_interval_ms", 100)), + "analog_send_hz": int(daemon_cfg.get("analog_send_hz", 100)), + "frame_stale_ms": int(daemon_cfg.get("frame_stale_ms", 500)), + }, + "policy": { + "health_window_ms": int(policy.get("health_window_ms", 2000)), + "green_srtt_ms": int(policy.get("green_srtt_ms", 35)), + "yellow_srtt_ms": int(policy.get("yellow_srtt_ms", 60)), + "retrans_red_threshold": int(policy.get("retrans_red_threshold", 10)), + "profile_green": _profile("profile_green", 15, 60), + "profile_yellow": _profile("profile_yellow", 10, 40), + "profile_red": _profile("profile_red", 5, 20), + }, + } + + +def _default_stats() -> dict[str, int]: + return { + "send_calls": 0, + "send_bytes": 0, + "send_errors": 0, + "recv_calls": 0, + "recv_bytes": 0, + "recv_timeouts": 0, + "recv_errors": 0, + "connected": 0, + } + + +def _default_kcp_metrics() -> dict[str, Any]: + return { + "connected": 0, + "has_conv": 0, + "conv": 0, + "local_addr": "", + "remote_addr": "", + "rto_ms": 0, + "srtt_ms": 0, + "srttvar_ms": 0, + "bytes_sent": 0, + "bytes_received": 0, + "in_pkts": 0, + "out_pkts": 0, + "in_segs": 0, + "out_segs": 0, + "retrans_segs": 0, + "fast_retrans_segs": 0, + "early_retrans_segs": 0, + "lost_segs": 0, + "repeat_segs": 0, + "in_errs": 0, + "kcp_in_errs": 0, + "ring_buffer_snd_queue": 0, + "ring_buffer_rcv_queue": 0, + "ring_buffer_snd_buffer": 0, + } + + +@dataclass(slots=True) +class QueuedControlEvent: + seq_id: int + source: str + event_code: str + drive_value: float + client_time_ms: int + + +class ControlSessionManager: + def __init__(self, config: dict[str, Any]) -> None: + control_defaults, _msg_type_binary, session_cls, _video_defaults = load_omnisocket_api() + transport = config["transport"] + control_cfg = config["control_sender"] + daemon_cfg = config["daemon"] + + self._session_cls = session_cls + self._connect_kwargs = { + "server_addr": transport["server_addr"], + "relay_via": transport["relay_via"], + "bind_ip": transport["bind_ip"], + "bind_device": transport["bind_device"], + "peer_id": control_cfg["peer_id"], + "stats_interval_ms": control_cfg["stats_interval_ms"], + **_merge_kcp_defaults(control_defaults, control_cfg["kcp"]), + } + self._target_peer = control_cfg["target_peer"] + self._analog_interval = 1.0 / max(1, daemon_cfg["analog_send_hz"]) + self._reconnect_delay_s = max(0.25, daemon_cfg["reconnect_delay_ms"] / 1000.0) + + self._lock = threading.Lock() + self._stop_event = threading.Event() + self._wake_event = threading.Event() + self._discrete_queue: queue.Queue[QueuedControlEvent] = queue.Queue() + self._pending_analog: dict[str, QueuedControlEvent] = {} + self._session = None + self._connected = False + self._seq_id = 0 + self._last_seq_id: int | None = None + self._last_error = "" + self._last_connect_attempt = 0.0 + self._next_analog_flush = 0.0 + self._thread = threading.Thread( + target=self._run, + name="omni-a-side-control", + daemon=True, + ) + + def start(self) -> None: + self._thread.start() + + def stop(self) -> None: + self._stop_event.set() + self._wake_event.set() + if self._thread.is_alive(): + self._thread.join(timeout=2.0) + self._disconnect("control daemon stopped", drop_pending=True) + + def send_event( + self, + *, + source: str, + event_code: str, + drive_value: float, + client_time_ms: int | None, + ) -> dict[str, Any]: + if event_code not in EVENT_NAME_TO_ID: + return { + "accepted": False, + "assigned_seq_id": None, + "control_connected": self.is_connected(), + "queue_depth": self.queue_depth(), + "error": f"unknown event_code: {event_code}", + } + + if not self.is_connected(): + return { + "accepted": False, + "assigned_seq_id": None, + "control_connected": False, + "queue_depth": self.queue_depth(), + "error": self.last_error() or "control session is not connected", + } + + with self._lock: + seq_id = self._seq_id + self._seq_id += 1 + event = QueuedControlEvent( + seq_id=seq_id, + source=source, + event_code=event_code, + drive_value=float(drive_value), + client_time_ms=int(client_time_ms or time.time() * 1000), + ) + if event_code in ANALOG_EVENT_CODES: + self._pending_analog[event_code] = event + else: + self._discrete_queue.put_nowait(event) + self._wake_event.set() + return { + "accepted": True, + "assigned_seq_id": seq_id, + "control_connected": True, + "queue_depth": self.queue_depth(), + "error": "", + } + + def is_connected(self) -> bool: + with self._lock: + return self._connected and self._session is not None + + def queue_depth(self) -> int: + with self._lock: + return self._discrete_queue.qsize() + len(self._pending_analog) + + def last_error(self) -> str: + with self._lock: + return self._last_error + + def snapshot(self) -> dict[str, Any]: + with self._lock: + session = self._session + connected = self._connected and session is not None + last_error = self._last_error + last_seq_id = self._last_seq_id + queue_depth = self._discrete_queue.qsize() + len(self._pending_analog) + + stats = _default_stats() + metrics = _default_kcp_metrics() + if session is not None: + try: + stats = dict(session.stats()) + metrics = dict(session.kcp_metrics()) + connected = bool(stats.get("connected") and metrics.get("connected")) + except Exception as error: # pragma: no cover - runtime integration + connected = False + last_error = str(error) + + return { + "connected": connected, + "queue_depth": queue_depth, + "last_seq_id": last_seq_id, + "last_error": last_error, + "peer_id": self._connect_kwargs["peer_id"], + "target_peer": self._target_peer, + "server_addr": self._connect_kwargs["server_addr"], + "relay_via": self._connect_kwargs["relay_via"], + "stats": stats, + "kcp_metrics": metrics, + } + + def _run(self) -> None: + analog_order = ("set_surge", "set_sway", "set_spin") + while not self._stop_event.is_set(): + if not self.is_connected(): + now = time.monotonic() + if now - self._last_connect_attempt >= self._reconnect_delay_s: + self._last_connect_attempt = now + self._connect() + self._wake_event.wait(0.2) + self._wake_event.clear() + continue + + sent_any = False + while not self._stop_event.is_set(): + try: + event = self._discrete_queue.get_nowait() + except queue.Empty: + break + if not self._send_packet(event): + break + sent_any = True + + now = time.monotonic() + if now >= self._next_analog_flush: + analog_batch: list[QueuedControlEvent] = [] + with self._lock: + for event_code in analog_order: + event = self._pending_analog.pop(event_code, None) + if event is not None: + analog_batch.append(event) + if analog_batch: + for event in analog_batch: + if not self._send_packet(event): + break + sent_any = True + self._next_analog_flush = now + self._analog_interval + + if not sent_any: + wait_s = max(0.02, self._next_analog_flush - time.monotonic()) + self._wake_event.wait(wait_s) + self._wake_event.clear() + + def _connect(self) -> None: + session = self._session_cls() + try: + session.connect(**self._connect_kwargs) + except Exception as error: # pragma: no cover - runtime integration + with self._lock: + self._connected = False + self._last_error = str(error) + try: + session.close() + except Exception: + pass + return + + with self._lock: + self._session = session + self._connected = True + self._last_error = "" + self._next_analog_flush = time.monotonic() + + def _disconnect(self, error_message: str, *, drop_pending: bool) -> None: + with self._lock: + session = self._session + self._session = None + self._connected = False + if error_message: + self._last_error = error_message + if drop_pending: + self._pending_analog.clear() + while not self._discrete_queue.empty(): + try: + self._discrete_queue.get_nowait() + except queue.Empty: + break + if session is not None: + try: + session.close() + except Exception: + pass + + def _send_packet(self, event: QueuedControlEvent) -> bool: + with self._lock: + session = self._session + if session is None: + return False + + packet = make_control_packet(event.seq_id, event.event_code, event.drive_value) + try: + session.send(to=self._target_peer, data=packet.encode()) + except Exception as error: # pragma: no cover - runtime integration + self._disconnect(str(error), drop_pending=True) + return False + + with self._lock: + self._last_seq_id = event.seq_id + self._last_error = "" + return True + + +class VideoSessionManager: + def __init__(self, config: dict[str, Any]) -> None: + _control_defaults, msg_type_binary, session_cls, video_defaults = load_omnisocket_api() + transport = config["transport"] + video_cfg = config["video_receiver"] + daemon_cfg = config["daemon"] + + self._msg_type_binary = msg_type_binary + self._session_cls = session_cls + self._connect_kwargs = { + "server_addr": transport["server_addr"], + "relay_via": transport["relay_via"], + "bind_ip": transport["bind_ip"], + "bind_device": transport["bind_device"], + "peer_id": video_cfg["peer_id"], + "stats_interval_ms": video_cfg["stats_interval_ms"], + **_merge_kcp_defaults(video_defaults, video_cfg["kcp"]), + } + self._buffer_bytes = video_cfg["buffer_bytes"] + self._frame_stale_s = max(0.1, daemon_cfg["frame_stale_ms"] / 1000.0) + self._reconnect_delay_s = max(0.25, daemon_cfg["reconnect_delay_ms"] / 1000.0) + self._config_path = config["config_path"] + + self._lock = threading.Lock() + self._stop_event = threading.Event() + self._session = None + self._latest_frame: bytes | None = None + self._latest_received_at = 0.0 + self._latest_sequence: int | None = None + self._frames_received = 0 + self._last_error = "" + self._last_connect_attempt = 0.0 + self._thread = threading.Thread( + target=self._run, + name="omni-a-side-video", + daemon=True, + ) + + def start(self) -> None: + self._thread.start() + + def stop(self) -> None: + self._stop_event.set() + if self._thread.is_alive(): + self._thread.join(timeout=2.0) + self._disconnect("video daemon stopped") + + def get_latest_frame(self) -> bytes | None: + with self._lock: + if self._latest_frame is None: + return None + if time.time() - self._latest_received_at > self._frame_stale_s: + return None + return self._latest_frame + + def snapshot(self) -> dict[str, Any]: + with self._lock: + session = self._session + latest_received_at = self._latest_received_at + latest_sequence = self._latest_sequence + frames_received = self._frames_received + last_error = self._last_error + has_recent_frame = ( + self._latest_frame is not None + and time.time() - self._latest_received_at <= self._frame_stale_s + ) + + stats = _default_stats() + metrics = _default_kcp_metrics() + connected = session is not None + if session is not None: + try: + stats = dict(session.stats()) + metrics = dict(session.kcp_metrics()) + connected = bool(stats.get("connected") and metrics.get("connected")) + except Exception as error: # pragma: no cover - runtime integration + connected = False + last_error = str(error) + + return { + "connected": connected, + "has_recent_frame": has_recent_frame, + "latest_received_at": latest_received_at, + "latest_sequence": latest_sequence, + "frames_received": frames_received, + "last_error": last_error, + "buffer_bytes": self._buffer_bytes, + "peer_id": self._connect_kwargs["peer_id"], + "server_addr": self._connect_kwargs["server_addr"], + "relay_via": self._connect_kwargs["relay_via"], + "config_path": self._config_path, + "stats": stats, + "kcp_metrics": metrics, + } + + def _run(self) -> None: + while not self._stop_event.is_set(): + if not self._is_connected(): + now = time.monotonic() + if now - self._last_connect_attempt >= self._reconnect_delay_s: + self._last_connect_attempt = now + self._connect() + time.sleep(0.2) + continue + + with self._lock: + session = self._session + if session is None: + continue + + buffer = bytearray(self._buffer_bytes) + try: + while not self._stop_event.is_set(): + meta = session.recv_into(buffer, timeout_ms=200) + if meta is None: + continue + if meta.get("msg_type") != self._msg_type_binary: + continue + frame = bytes(buffer[: int(meta["body_len"])]) + jpeg_frame = self._extract_jpeg_frame(frame) + if jpeg_frame is None: + with self._lock: + self._last_error = "received non-JPEG binary frame" + continue + with self._lock: + self._latest_frame = jpeg_frame + self._latest_received_at = time.time() + self._latest_sequence = self._extract_sequence(frame) + self._frames_received += 1 + self._last_error = "" + except Exception as error: # pragma: no cover - runtime integration + self._disconnect(str(error)) + time.sleep(0.2) + + def _connect(self) -> None: + session = self._session_cls() + try: + session.connect(**self._connect_kwargs) + except Exception as error: # pragma: no cover - runtime integration + with self._lock: + self._session = None + self._last_error = str(error) + try: + session.close() + except Exception: + pass + return + + with self._lock: + self._session = session + self._last_error = "" + + def _disconnect(self, error_message: str) -> None: + with self._lock: + session = self._session + self._session = None + self._last_error = error_message + if session is not None: + try: + session.close() + except Exception: + pass + + def _is_connected(self) -> bool: + with self._lock: + return self._session is not None + + @staticmethod + def _extract_jpeg_frame(frame: bytes) -> bytes | None: + if frame.startswith(b"\xff\xd8"): + return frame + if len(frame) > 8 and frame[8:10] == b"\xff\xd8": + return frame[8:] + return None + + @staticmethod + def _extract_sequence(frame: bytes) -> int | None: + if len(frame) >= 8 and not frame.startswith(b"\xff\xd8"): + return int.from_bytes(frame[:8], "big") + return None + + +class PolicyEngine: + def __init__(self, policy_cfg: dict[str, Any]) -> None: + self._policy_cfg = policy_cfg + + def classify( + self, + *, + control_connected: bool, + avg_srtt_ms: float, + retrans_delta: int, + lost_delta: int, + ) -> tuple[str, dict[str, int]]: + if not control_connected: + return "red", dict(self._policy_cfg["profile_red"]) + if ( + avg_srtt_ms >= self._policy_cfg["yellow_srtt_ms"] + or lost_delta > 0 + or retrans_delta >= self._policy_cfg["retrans_red_threshold"] + ): + return "red", dict(self._policy_cfg["profile_red"]) + if avg_srtt_ms >= self._policy_cfg["green_srtt_ms"] or retrans_delta > 0: + return "yellow", dict(self._policy_cfg["profile_yellow"]) + return "green", dict(self._policy_cfg["profile_green"]) + + +class TelemetrySampler: + def __init__( + self, + config: dict[str, Any], + control_manager: ControlSessionManager, + video_manager: VideoSessionManager, + ) -> None: + self._config = config + self._control_manager = control_manager + self._video_manager = video_manager + self._policy_engine = PolicyEngine(config["policy"]) + self._interval_s = max(0.1, config["daemon"]["telemetry_interval_ms"] / 1000.0) + self._window_s = max(0.5, config["policy"]["health_window_ms"] / 1000.0) + + self._lock = threading.Lock() + self._stop_event = threading.Event() + self._thread = threading.Thread( + target=self._run, + name="omni-a-side-telemetry", + daemon=True, + ) + self._state = self._build_initial_state() + self._control_history: list[dict[str, Any]] = [] + self._last_totals: dict[str, Any] | None = None + self._started_at = utc_iso_now() + + def start(self) -> None: + self._thread.start() + + def stop(self) -> None: + self._stop_event.set() + if self._thread.is_alive(): + self._thread.join(timeout=2.0) + + def snapshot(self) -> dict[str, Any]: + with self._lock: + return copy.deepcopy(self._state) + + def _run(self) -> None: + while not self._stop_event.is_set(): + self._sample_once() + self._stop_event.wait(self._interval_s) + + def _sample_once(self) -> None: + now = time.time() + control = self._control_manager.snapshot() + video = self._video_manager.snapshot() + + control_metrics = control["kcp_metrics"] + video_metrics = video["kcp_metrics"] + + self._control_history.append( + { + "ts": now, + "connected": bool(control["connected"]), + "srtt_ms": float(control_metrics.get("srtt_ms", 0.0) or 0.0), + "retrans_segs": int(control_metrics.get("retrans_segs", 0) or 0), + "lost_segs": int(control_metrics.get("lost_segs", 0) or 0), + } + ) + self._control_history = [ + item for item in self._control_history if now - item["ts"] <= self._window_s + ] + + avg_srtt_ms = 0.0 + jitter_ms = 0.0 + retrans_delta_window = 0 + lost_delta_window = 0 + srtt_values = [item["srtt_ms"] for item in self._control_history if item["connected"]] + if srtt_values: + avg_srtt_ms = sum(srtt_values) / len(srtt_values) + if len(srtt_values) >= 2: + jitter_samples = [ + abs(srtt_values[index] - srtt_values[index - 1]) + for index in range(1, len(srtt_values)) + ] + jitter_ms = sum(jitter_samples) / len(jitter_samples) + if len(self._control_history) >= 2: + first = self._control_history[0] + last = self._control_history[-1] + retrans_delta_window = max(0, last["retrans_segs"] - first["retrans_segs"]) + lost_delta_window = max(0, last["lost_segs"] - first["lost_segs"]) + + health_band, profile = self._policy_engine.classify( + control_connected=bool(control["connected"]), + avg_srtt_ms=avg_srtt_ms, + retrans_delta=retrans_delta_window, + lost_delta=lost_delta_window, + ) + + total_bytes_sent = int(control_metrics.get("bytes_sent", 0)) + int(video_metrics.get("bytes_sent", 0)) + total_bytes_received = int(control_metrics.get("bytes_received", 0)) + int(video_metrics.get("bytes_received", 0)) + total_out_segs = int(control_metrics.get("out_segs", 0)) + int(video_metrics.get("out_segs", 0)) + total_retrans = int(control_metrics.get("retrans_segs", 0)) + int(video_metrics.get("retrans_segs", 0)) + tx_kbps = 0 + rx_kbps = 0 + retrans_pct = 0.0 + if self._last_totals is not None: + dt = max(0.001, now - self._last_totals["ts"]) + sent_delta = max(0, total_bytes_sent - self._last_totals["bytes_sent"]) + recv_delta = max(0, total_bytes_received - self._last_totals["bytes_received"]) + out_seg_delta = max(0, total_out_segs - self._last_totals["out_segs"]) + retrans_delta = max(0, total_retrans - self._last_totals["retrans"]) + tx_kbps = int((sent_delta * 8.0) / dt / 1000.0) + rx_kbps = int((recv_delta * 8.0) / dt / 1000.0) + if out_seg_delta > 0: + retrans_pct = round((retrans_delta / out_seg_delta) * 100.0, 2) + self._last_totals = { + "ts": now, + "bytes_sent": total_bytes_sent, + "bytes_received": total_bytes_received, + "out_segs": total_out_segs, + "retrans": total_retrans, + } + + network = { + "peer_status": "online" if control["connected"] or video["connected"] else "offline", + "latency_ms": round(avg_srtt_ms or float(control_metrics.get("srtt_ms", 0) or 0.0), 1), + "jitter_ms": round(jitter_ms, 1), + # Keep packet_loss_pct as a compatibility alias for existing clients. + "retrans_pct": round(retrans_pct, 2), + "packet_loss_pct": round(retrans_pct, 2), + "tx_kbps": tx_kbps, + "rx_kbps": rx_kbps, + "signal_dbm": None, + "transport": "OmniSocket / daemon", + "source_mode": "daemon-live", + "updated_at": utc_iso_now(), + } + + video_status = { + "available": bool(video["has_recent_frame"]), + "source_mode": "omnisocket-jpeg-live" if video["has_recent_frame"] else "omnisocket-waiting", + "frame_count": int(video["frames_received"]), + "fps": int(profile["fps"]), + "frame_dir": "omni-daemon://latest-frame", + "source_detail": ( + f"peer stream active, frames={video['frames_received']}" + if video["has_recent_frame"] + else (video["last_error"] or "waiting for latest JPEG frame from daemon") + ), + "receiver": { + "backend_ready": True, + "mode": "daemon", + "connected": bool(video["connected"]), + "has_recent_frame": bool(video["has_recent_frame"]), + "frames_received": int(video["frames_received"]), + "latest_sequence": video["latest_sequence"], + "last_error": video["last_error"], + "config_path": video["config_path"], + "server_addr": video["server_addr"], + "relay_via": video["relay_via"], + "peer_id": video["peer_id"], + "buffer_bytes": int(video["buffer_bytes"]), + "stats": video["stats"], + "kcp_metrics": video_metrics, + }, + } + + state = { + "network": network, + "video": video_status, + "control": { + "connected": bool(control["connected"]), + "queue_depth": int(control["queue_depth"]), + "last_seq_id": control["last_seq_id"], + "last_error": control["last_error"], + "peer_id": control["peer_id"], + "target_peer": control["target_peer"], + "server_addr": control["server_addr"], + "relay_via": control["relay_via"], + "stats": control["stats"], + "kcp_metrics": control_metrics, + }, + "policy": { + "health_band": health_band, + "recommended_video_profile": profile, + }, + "daemon": { + "started_at": self._started_at, + "socket_path": self._config["daemon"]["socket_path"], + "version": VERSION, + }, + } + + with self._lock: + self._state = state + + @staticmethod + def _build_initial_state() -> dict[str, Any]: + return { + "network": { + "peer_status": "offline", + "latency_ms": 0.0, + "jitter_ms": 0.0, + "retrans_pct": 0.0, + "packet_loss_pct": 0.0, + "tx_kbps": 0, + "rx_kbps": 0, + "signal_dbm": None, + "transport": "OmniSocket / daemon", + "source_mode": "daemon-starting", + "updated_at": utc_iso_now(), + }, + "video": { + "available": False, + "source_mode": "omnisocket-waiting", + "frame_count": 0, + "fps": 15, + "frame_dir": "omni-daemon://latest-frame", + "source_detail": "daemon is starting", + "receiver": { + "backend_ready": True, + "mode": "daemon", + "connected": False, + "has_recent_frame": False, + "frames_received": 0, + "latest_sequence": None, + "last_error": "", + "config_path": "", + "server_addr": "", + "relay_via": "", + "peer_id": "", + "buffer_bytes": 0, + }, + }, + "control": { + "connected": False, + "queue_depth": 0, + "last_seq_id": None, + "last_error": "", + "peer_id": "", + "target_peer": "", + "server_addr": "", + "relay_via": "", + "stats": _default_stats(), + "kcp_metrics": _default_kcp_metrics(), + }, + "policy": { + "health_band": "red", + "recommended_video_profile": { + "fps": 5, + "max_frame_bytes": 20 * 1024, + }, + }, + "daemon": { + "started_at": utc_iso_now(), + "socket_path": DEFAULT_SOCKET_PATH, + "version": VERSION, + }, + } + + +class ThreadingUnixHTTPServer(socketserver.ThreadingMixIn, socketserver.UnixStreamServer): + daemon_threads = True + + +class OmniDaemonHTTPHandler(BaseHTTPRequestHandler): + server_version = "OmniDaemonHTTP/1.0" + protocol_version = "HTTP/1.1" + + def do_GET(self) -> None: # pragma: no cover - exercised by runtime integration + app: ASideOmniDaemon = self.server.app # type: ignore[attr-defined] + if self.path == "/v1/health": + state = app.get_state() + control = state["control"] + video = state["video"]["receiver"] + policy = state["policy"] + status = "ok" if policy["health_band"] == "green" else "degraded" + if not control["connected"] and not video["connected"]: + status = "unavailable" + self._send_json( + HTTPStatus.OK, + { + "status": status, + "health_band": policy["health_band"], + "control_connected": control["connected"], + "video_connected": video["connected"], + "updated_at": state["network"]["updated_at"], + }, + ) + return + if self.path == "/v1/state": + self._send_json(HTTPStatus.OK, app.get_state()) + return + if self.path == "/v1/control/status": + self._send_json(HTTPStatus.OK, app.get_state()["control"]) + return + if self.path == "/v1/video/frame": + frame = app.get_latest_frame() + if frame is None: + self._send_json( + HTTPStatus.SERVICE_UNAVAILABLE, + {"error": "no fresh JPEG frame available"}, + ) + return + self._send_bytes(HTTPStatus.OK, frame, "image/jpeg") + return + self._send_json(HTTPStatus.NOT_FOUND, {"error": f"unknown path: {self.path}"}) + + def do_POST(self) -> None: # pragma: no cover - exercised by runtime integration + app: ASideOmniDaemon = self.server.app # type: ignore[attr-defined] + if self.path != "/v1/control/event": + self._send_json(HTTPStatus.NOT_FOUND, {"error": f"unknown path: {self.path}"}) + return + + try: + payload = self._read_json() + except ValueError as error: + self._send_json(HTTPStatus.BAD_REQUEST, {"error": str(error)}) + return + + result = app.send_control_event( + source=str(payload.get("source", "unknown")), + event_code=str(payload.get("event_code", "")), + drive_value=float(payload.get("drive_value", 1.0)), + client_time_ms=payload.get("client_time_ms"), + ) + status = HTTPStatus.OK if result.get("accepted") else HTTPStatus.SERVICE_UNAVAILABLE + self._send_json(status, result) + + def log_message(self, format: str, *args: Any) -> None: # noqa: A003 + return + + def _read_json(self) -> dict[str, Any]: + raw_length = self.headers.get("Content-Length") + if raw_length is None: + raise ValueError("missing Content-Length") + try: + length = int(raw_length) + except ValueError as error: + raise ValueError("invalid Content-Length") from error + raw = self.rfile.read(length) + try: + payload = json.loads(raw.decode("utf-8")) + except json.JSONDecodeError as error: + raise ValueError(f"invalid JSON body: {error}") from error + if not isinstance(payload, dict): + raise ValueError("request body must be a JSON object") + return payload + + def _send_json(self, status: HTTPStatus, payload: dict[str, Any]) -> None: + raw = json.dumps(payload).encode("utf-8") + self._send_bytes(status, raw, "application/json; charset=utf-8") + + def _send_bytes(self, status: HTTPStatus, payload: bytes, content_type: str) -> None: + self.send_response(status.value) + self.send_header("Content-Type", content_type) + self.send_header("Content-Length", str(len(payload))) + self.send_header("Cache-Control", "no-store") + self.send_header("Connection", "keep-alive") + self.end_headers() + self.wfile.write(payload) + + +class ASideOmniDaemon: + def __init__(self, config_path: str | None = None) -> None: + self._config = _load_config(config_path) + self._control_manager = ControlSessionManager(self._config) + self._video_manager = VideoSessionManager(self._config) + self._telemetry = TelemetrySampler( + self._config, + self._control_manager, + self._video_manager, + ) + self._server: ThreadingUnixHTTPServer | None = None + + @property + def socket_path(self) -> str: + return self._config["daemon"]["socket_path"] + + def start(self) -> None: + self._control_manager.start() + self._video_manager.start() + self._telemetry.start() + self._server = self._build_server() + + def stop(self) -> None: + if self._server is not None: + self._server.shutdown() + self._server.server_close() + self._server = None + try: + os.unlink(self.socket_path) + except FileNotFoundError: + pass + self._telemetry.stop() + self._video_manager.stop() + self._control_manager.stop() + + def serve_forever(self) -> None: + self.start() + assert self._server is not None + self._server.serve_forever() + + def get_state(self) -> dict[str, Any]: + return self._telemetry.snapshot() + + def get_latest_frame(self) -> bytes | None: + return self._video_manager.get_latest_frame() + + def send_control_event( + self, + *, + source: str, + event_code: str, + drive_value: float, + client_time_ms: int | None, + ) -> dict[str, Any]: + return self._control_manager.send_event( + source=source, + event_code=event_code, + drive_value=drive_value, + client_time_ms=client_time_ms, + ) + + def _build_server(self) -> ThreadingUnixHTTPServer: + socket_path = self.socket_path + socket_dir = os.path.dirname(socket_path) + if socket_dir: + os.makedirs(socket_dir, exist_ok=True) + try: + os.unlink(socket_path) + except FileNotFoundError: + pass + server = ThreadingUnixHTTPServer(socket_path, OmniDaemonHTTPHandler) + server.app = self # type: ignore[attr-defined] + return server + + +def main(argv: list[str] | None = None) -> None: + parser = argparse.ArgumentParser(description="Run the A-side OmniDaemon") + parser.add_argument("--config", dest="config_path", help="Path to daemon YAML config") + args = parser.parse_args(argv) + + app = ASideOmniDaemon(config_path=args.config_path) + + def _handle_signal(_signum: int, _frame: Any) -> None: + app.stop() + raise SystemExit(0) + + signal.signal(signal.SIGINT, _handle_signal) + signal.signal(signal.SIGTERM, _handle_signal) + + try: + app.serve_forever() + except KeyboardInterrupt: + pass + finally: + app.stop() + + +if __name__ == "__main__": + main() diff --git a/python/setup.py b/python/setup.py index f302f32..040ddaa 100644 --- a/python/setup.py +++ b/python/setup.py @@ -35,7 +35,12 @@ COMMON_SOURCES = [ setup( name="omnisocket", version="0.1.0", - packages=["omnisocket"], + packages=["omnisocket", "omnisocket_a_side"], + entry_points={ + "console_scripts": [ + "omnisocket-a-side-daemon=omnisocket_a_side.daemon:main", + ] + }, ext_modules=[ Extension( "omnisocket._omnisocket", diff --git a/src/peer_kcp_client.c b/src/peer_kcp_client.c index ec969c0..f568272 100644 --- a/src/peer_kcp_client.c +++ b/src/peer_kcp_client.c @@ -294,6 +294,14 @@ int kcp_client_persist_message(kcp_client_t *client, const message_t *msg, const return 0; } +int kcp_client_metrics_snapshot(kcp_client_t *client, kcp_conn_metrics_t *out_metrics) { + if (client == NULL || out_metrics == NULL) { + errno = EINVAL; + return -1; + } + return kcp_conn_metrics_snapshot(client->conn, out_metrics); +} + int kcp_client_close(kcp_client_t *client) { return client == NULL ? 0 : kcp_conn_close(client->conn); } diff --git a/src/transport_kcp.c b/src/transport_kcp.c index 30778ac..ef95625 100644 --- a/src/transport_kcp.c +++ b/src/transport_kcp.c @@ -1788,6 +1788,83 @@ int kcp_conn_local_addr(const kcp_conn_t *conn, struct sockaddr_storage *addr, s return 0; } +int kcp_conn_metrics_snapshot(kcp_conn_t *conn, kcp_conn_metrics_t *out_metrics) { + struct sockaddr_storage local_addr; + socklen_t local_len = sizeof(local_addr); + + if (conn == NULL || out_metrics == NULL) { + errno = EINVAL; + return -1; + } + + memset(out_metrics, 0, sizeof(*out_metrics)); + out_metrics->connected = !atomic_load(&conn->closed); + + if (conn->sock_state != NULL && !conn->socket_closed && + getsockname(conn->sock_state->fd, (struct sockaddr *) &local_addr, &local_len) == 0) { + omni_sockaddr_to_string( + (const struct sockaddr *) &local_addr, + local_len, + out_metrics->local_addr, + sizeof(out_metrics->local_addr) + ); + } + if (conn->remote_addr_len > 0) { + omni_sockaddr_to_string( + (const struct sockaddr *) &conn->remote_addr, + conn->remote_addr_len, + out_metrics->remote_addr, + sizeof(out_metrics->remote_addr) + ); + } + + if (conn->process_sampler != NULL) { + out_metrics->bytes_sent = atomic_load_explicit(&conn->process_sampler->bytes_sent, memory_order_relaxed); + out_metrics->bytes_received = atomic_load_explicit(&conn->process_sampler->bytes_received, memory_order_relaxed); + out_metrics->in_pkts = atomic_load_explicit(&conn->process_sampler->in_pkts, memory_order_relaxed); + out_metrics->out_pkts = atomic_load_explicit(&conn->process_sampler->out_pkts, memory_order_relaxed); + out_metrics->in_segs = atomic_load_explicit(&conn->process_sampler->in_segs, memory_order_relaxed); + out_metrics->out_segs = atomic_load_explicit(&conn->process_sampler->out_segs, memory_order_relaxed); + out_metrics->in_errs = atomic_load_explicit(&conn->process_sampler->in_errs, memory_order_relaxed); + out_metrics->kcp_in_errs = atomic_load_explicit(&conn->process_sampler->kcp_in_errs, memory_order_relaxed); + } else { + out_metrics->bytes_sent = conn->pending_bytes_sent; + out_metrics->bytes_received = conn->pending_bytes_received; + out_metrics->in_pkts = conn->pending_in_pkts; + out_metrics->out_pkts = conn->pending_out_pkts; + out_metrics->in_segs = conn->pending_in_segs; + out_metrics->out_segs = conn->pending_out_segs; + out_metrics->in_errs = conn->pending_in_errs; + out_metrics->kcp_in_errs = conn->pending_kcp_in_errs; + } + + pthread_mutex_lock(&conn->kcp_mu); + if (conn->kcp != NULL) { + out_metrics->has_conv = 1; + out_metrics->conv = conn->kcp->conv; + out_metrics->rto_ms = conn->kcp->rx_rto; + out_metrics->srtt_ms = conn->kcp->rx_srtt; + out_metrics->srttvar_ms = conn->kcp->rx_rttval; + out_metrics->ring_buffer_snd_queue = conn->kcp->nsnd_que; + out_metrics->ring_buffer_rcv_queue = conn->kcp->nrcv_que; + out_metrics->ring_buffer_snd_buffer = conn->kcp->nsnd_buf; + out_metrics->fast_retrans_segs = conn->kcp->fast_retrans_xmit; + /* This KCP fork does not implement early retransmit, so the counter stays zero. */ + out_metrics->early_retrans_segs = conn->kcp->early_retrans_xmit; + out_metrics->lost_segs = conn->kcp->lost_xmit; + out_metrics->repeat_segs = conn->kcp->repeat_xmit; + out_metrics->retrans_segs = + out_metrics->fast_retrans_segs + + out_metrics->early_retrans_segs + + out_metrics->lost_segs; + } else { + out_metrics->connected = 0; + } + pthread_mutex_unlock(&conn->kcp_mu); + + return 0; +} + int kcp_conn_close(kcp_conn_t *conn) { if (conn == NULL) { return 0; diff --git a/third_party/kcp/ikcp.c b/third_party/kcp/ikcp.c index b6176b9..11503ca 100644 --- a/third_party/kcp/ikcp.c +++ b/third_party/kcp/ikcp.c @@ -298,6 +298,10 @@ ikcpcb *ikcp_create(IUINT32 conv, void *user) kcp->fastlimit = IKCP_FASTACK_LIMIT; kcp->nocwnd = 0; kcp->xmit = 0; + kcp->fast_retrans_xmit = 0; + kcp->early_retrans_xmit = 0; + kcp->lost_xmit = 0; + kcp->repeat_xmit = 0; kcp->dead_link = IKCP_DEADLINK; kcp->output = NULL; kcp->writelog = NULL; @@ -788,6 +792,7 @@ void ikcp_parse_data(ikcpcb *kcp, IKCPSEG *newseg) } else { + kcp->repeat_xmit++; ikcp_segment_delete(kcp, newseg); } @@ -1192,6 +1197,7 @@ void ikcp_flush(ikcpcb *kcp) needsend = 1; segment->xmit++; kcp->xmit++; + kcp->lost_xmit++; if (kcp->nodelay == 0) { segment->rto += _imax_(segment->rto, (IUINT32)kcp->rx_rto); @@ -1211,6 +1217,7 @@ void ikcp_flush(ikcpcb *kcp) { needsend = 1; segment->xmit++; + kcp->fast_retrans_xmit++; segment->fastack = 0; segment->resendts = current + segment->rto; change++; diff --git a/third_party/kcp/ikcp.h b/third_party/kcp/ikcp.h index 5630269..21ac51f 100644 --- a/third_party/kcp/ikcp.h +++ b/third_party/kcp/ikcp.h @@ -300,6 +300,10 @@ struct IKCPCB IINT32 rx_rttval, rx_srtt, rx_rto, rx_minrto; IUINT32 snd_wnd, rcv_wnd, rmt_wnd, cwnd, probe; IUINT32 current, interval, ts_flush, xmit; + IUINT32 fast_retrans_xmit; + IUINT32 early_retrans_xmit; + IUINT32 lost_xmit; + IUINT32 repeat_xmit; IUINT32 nrcv_buf, nsnd_buf; IUINT32 nrcv_que, nsnd_que; IUINT32 nodelay, updated;