diff --git a/README.md b/README.md index ccb9c58..c77e734 100644 --- a/README.md +++ b/README.md @@ -27,30 +27,6 @@ make python-ext make python-install ``` -## A-Side OmniDaemon - -The A-side daemon is configured from `config/a_side_omnidaemon.yaml` in this repo. The safest way to start it is to pass that file explicitly, because the installed Python package does not bundle the YAML config. - -Run from a source checkout: - -```bash -python -m omnisocket_a_side.daemon --config "$(pwd)/config/a_side_omnidaemon.yaml" -``` - -Or, if you installed the console script: - -```bash -OMNIDAEMON_CONFIG="$(pwd)/config/a_side_omnidaemon.yaml" \ -omnisocket-a-side-daemon -``` - -Optional overrides: - -- `OMNIDAEMON_SOCKET=/tmp/omnisocket-a-side.sock` selects the local UDS path. -- `OMNIDAEMON_CONFIG=/abs/path/to/a_side_omnidaemon.yaml` overrides `--config`. - -For `robot-command-center` and the A-side senders, keep the daemon and its clients on the same Linux machine so they can share the Unix-domain socket. - ## Run On Different Machines Server `D` runs the KCP hub on `0.0.0.0:10909`: diff --git a/config/a_side_omnidaemon.yaml b/config/a_side_omnidaemon.yaml deleted file mode 100644 index 62d9963..0000000 --- a/config/a_side_omnidaemon.yaml +++ /dev/null @@ -1,51 +0,0 @@ -transport: - server_addr: "81.70.156.140:10909" - relay_via: "106.55.173.235:10909" - bind_ip: "" - bind_device: "" - -control_sender: - peer_id: "peer-a-ctrl" - target_peer: "peer-b-ctrl" - nodelay: 1 - interval_ms: 5 - resend: 2 - nc: 1 - sndwnd: 32 - rcvwnd: 32 - mtu: 1400 - stats_interval_ms: 100 - -video_receiver: - peer_id: "peer-a-video" - buffer_bytes: 1048576 - nodelay: 1 - interval_ms: 10 - resend: 2 - nc: 1 - sndwnd: 256 - rcvwnd: 256 - mtu: 1400 - stats_interval_ms: 100 - -daemon: - socket_path: "/tmp/omnisocket-a-side.sock" - reconnect_delay_ms: 2000 - telemetry_interval_ms: 100 - analog_send_hz: 100 - frame_stale_ms: 500 - -policy: - health_window_ms: 2000 - green_srtt_ms: 35 - yellow_srtt_ms: 60 - retrans_red_threshold: 10 - profile_green: - fps: 15 - max_frame_kb: 60 - profile_yellow: - fps: 10 - max_frame_kb: 40 - profile_red: - fps: 5 - max_frame_kb: 20 diff --git a/include/peer_kcp_client.h b/include/peer_kcp_client.h index 5b7afdc..d61111a 100644 --- a/include/peer_kcp_client.h +++ b/include/peer_kcp_client.h @@ -27,7 +27,6 @@ int kcp_client_receive_timed(kcp_client_t *client, message_t *out_msg, int timeo int kcp_client_receive(kcp_client_t *client, message_t *out_msg); int kcp_client_receive_binary_into(kcp_client_t *client, void *buffer, size_t buffer_len, kcp_client_recv_meta_t *out_meta, int timeout_ms); int kcp_client_persist_message(kcp_client_t *client, const message_t *msg, const char *inbox_dir, char *out_path, size_t out_path_len); -int kcp_client_metrics_snapshot(kcp_client_t *client, kcp_conn_metrics_t *out_metrics); int kcp_client_close(kcp_client_t *client); void kcp_client_free(kcp_client_t *client); diff --git a/include/transport_kcp.h b/include/transport_kcp.h index 737752e..d7c3c95 100644 --- a/include/transport_kcp.h +++ b/include/transport_kcp.h @@ -43,32 +43,6 @@ extern "C" { typedef struct kcp_conn kcp_conn_t; typedef struct kcp_listener kcp_listener_t; -typedef struct kcp_conn_metrics { - int connected; - int has_conv; - uint32_t conv; - char local_addr[OMNI_MAX_ADDR_TEXT]; - char remote_addr[OMNI_MAX_ADDR_TEXT]; - uint32_t rto_ms; - int32_t srtt_ms; - int32_t srttvar_ms; - uint64_t bytes_sent; - uint64_t bytes_received; - uint64_t in_pkts; - uint64_t out_pkts; - uint64_t in_segs; - uint64_t out_segs; - uint64_t retrans_segs; - uint64_t fast_retrans_segs; - uint64_t early_retrans_segs; - uint64_t lost_segs; - uint64_t repeat_segs; - uint64_t in_errs; - uint64_t kcp_in_errs; - uint64_t ring_buffer_snd_queue; - uint64_t ring_buffer_rcv_queue; - uint64_t ring_buffer_snd_buffer; -} kcp_conn_metrics_t; typedef struct kcp_conn_options { int nodelay; int interval_ms; @@ -94,7 +68,6 @@ int kcp_conn_close(kcp_conn_t *conn); void kcp_conn_free(kcp_conn_t *conn); uint32_t kcp_conn_conv(const kcp_conn_t *conn); int kcp_conn_local_addr(const kcp_conn_t *conn, struct sockaddr_storage *addr, socklen_t *addr_len); -int kcp_conn_metrics_snapshot(kcp_conn_t *conn, kcp_conn_metrics_t *out_metrics); kcp_listener_t *kcp_listener_listen(const char *listen_addr, const char *bind_device, kcp_packet_debug_logger_t *packet_logger, const char *node_role, const char *node_id); kcp_conn_t *kcp_listener_accept(kcp_listener_t *listener); diff --git a/python/omnisocket/_omnisocket.c b/python/omnisocket/_omnisocket.c index c919b4e..38d852f 100644 --- a/python/omnisocket/_omnisocket.c +++ b/python/omnisocket/_omnisocket.c @@ -22,13 +22,6 @@ PyDoc_STRVAR( "current frame has already been consumed and is lost." ); -PyDoc_STRVAR( - PyOmniSession_kcp_metrics_doc, - "kcp_metrics() -> dict\n" - "\n" - "Return a snapshot of low-level KCP metrics for the current session." -); - static PyObject *PyOmniSession_new(PyTypeObject *type, PyObject *args, PyObject *kwargs) { PyOmniSession *self; (void) args; @@ -288,67 +281,6 @@ static PyObject *PyOmniSession_stats(PyOmniSession *self, PyObject *Py_UNUSED(ig ); } -static PyObject *PyOmniSession_kcp_metrics(PyOmniSession *self, PyObject *Py_UNUSED(ignored)) { - omnisocket_session_kcp_metrics_t metrics; - - memset(&metrics, 0, sizeof(metrics)); - if (omnisocket_session_kcp_metrics_snapshot(&self->session, &metrics) != 0) { - return PyErr_SetFromErrno(PyExc_OSError); - } - - return Py_BuildValue( - "{s:i,s:i,s:I,s:s,s:s,s:I,s:i,s:i,s:K,s:K,s:K,s:K,s:K,s:K,s:K,s:K,s:K,s:K,s:K,s:K,s:K,s:K,s:K}", - "connected", - metrics.connected, - "has_conv", - metrics.has_conv, - "conv", - metrics.conv, - "local_addr", - metrics.local_addr, - "remote_addr", - metrics.remote_addr, - "rto_ms", - metrics.rto_ms, - "srtt_ms", - metrics.srtt_ms, - "srttvar_ms", - metrics.srttvar_ms, - "bytes_sent", - (unsigned long long) metrics.bytes_sent, - "bytes_received", - (unsigned long long) metrics.bytes_received, - "in_pkts", - (unsigned long long) metrics.in_pkts, - "out_pkts", - (unsigned long long) metrics.out_pkts, - "in_segs", - (unsigned long long) metrics.in_segs, - "out_segs", - (unsigned long long) metrics.out_segs, - "retrans_segs", - (unsigned long long) metrics.retrans_segs, - "fast_retrans_segs", - (unsigned long long) metrics.fast_retrans_segs, - "early_retrans_segs", - (unsigned long long) metrics.early_retrans_segs, - "lost_segs", - (unsigned long long) metrics.lost_segs, - "repeat_segs", - (unsigned long long) metrics.repeat_segs, - "in_errs", - (unsigned long long) metrics.in_errs, - "kcp_in_errs", - (unsigned long long) metrics.kcp_in_errs, - "ring_buffer_snd_queue", - (unsigned long long) metrics.ring_buffer_snd_queue, - "ring_buffer_rcv_queue", - (unsigned long long) metrics.ring_buffer_rcv_queue, - "ring_buffer_snd_buffer", - (unsigned long long) metrics.ring_buffer_snd_buffer - ); -} - static PyMethodDef PyOmniSession_methods[] = { {"connect", (PyCFunction) PyOmniSession_connect, METH_VARARGS | METH_KEYWORDS, NULL}, {"close", (PyCFunction) PyOmniSession_close, METH_NOARGS, NULL}, @@ -356,7 +288,6 @@ static PyMethodDef PyOmniSession_methods[] = { {"recv", (PyCFunction) PyOmniSession_recv, METH_VARARGS | METH_KEYWORDS, PyOmniSession_recv_doc}, {"recv_into", (PyCFunction) PyOmniSession_recv_into, METH_VARARGS | METH_KEYWORDS, PyOmniSession_recv_into_doc}, {"stats", (PyCFunction) PyOmniSession_stats, METH_NOARGS, NULL}, - {"kcp_metrics", (PyCFunction) PyOmniSession_kcp_metrics, METH_NOARGS, PyOmniSession_kcp_metrics_doc}, {NULL, NULL, 0, NULL} }; diff --git a/python/omnisocket/omnisocket_client.c b/python/omnisocket/omnisocket_client.c index 53b8cab..3557780 100644 --- a/python/omnisocket/omnisocket_client.c +++ b/python/omnisocket/omnisocket_client.c @@ -246,72 +246,3 @@ void omnisocket_session_stats_snapshot(omnisocket_session_t *session, omnisocket *out_stats = session->stats; pthread_mutex_unlock(&session->mutex); } - -int omnisocket_session_kcp_metrics_snapshot( - omnisocket_session_t *session, - omnisocket_session_kcp_metrics_t *out_metrics -) { - kcp_client_t *client = NULL; - kcp_conn_metrics_t metrics; - int rc = 0; - - if (session == NULL || out_metrics == NULL) { - errno = EINVAL; - return -1; - } - - memset(out_metrics, 0, sizeof(*out_metrics)); - - pthread_mutex_lock(&session->mutex); - if (session->client != NULL && !session->closing) { - client = session->client; - session->active_ops += 1; - } - pthread_mutex_unlock(&session->mutex); - - if (client == NULL) { - return 0; - } - - memset(&metrics, 0, sizeof(metrics)); - rc = kcp_client_metrics_snapshot(client, &metrics); - - pthread_mutex_lock(&session->mutex); - if (session->active_ops > 0) { - session->active_ops -= 1; - } - if (session->closing && session->active_ops == 0) { - pthread_cond_broadcast(&session->idle_cond); - } - pthread_mutex_unlock(&session->mutex); - - if (rc != 0) { - return rc; - } - - out_metrics->connected = metrics.connected; - out_metrics->has_conv = metrics.has_conv; - out_metrics->conv = metrics.conv; - snprintf(out_metrics->local_addr, sizeof(out_metrics->local_addr), "%s", metrics.local_addr); - snprintf(out_metrics->remote_addr, sizeof(out_metrics->remote_addr), "%s", metrics.remote_addr); - out_metrics->rto_ms = metrics.rto_ms; - out_metrics->srtt_ms = metrics.srtt_ms; - out_metrics->srttvar_ms = metrics.srttvar_ms; - out_metrics->bytes_sent = metrics.bytes_sent; - out_metrics->bytes_received = metrics.bytes_received; - out_metrics->in_pkts = metrics.in_pkts; - out_metrics->out_pkts = metrics.out_pkts; - out_metrics->in_segs = metrics.in_segs; - out_metrics->out_segs = metrics.out_segs; - out_metrics->retrans_segs = metrics.retrans_segs; - out_metrics->fast_retrans_segs = metrics.fast_retrans_segs; - out_metrics->early_retrans_segs = metrics.early_retrans_segs; - out_metrics->lost_segs = metrics.lost_segs; - out_metrics->repeat_segs = metrics.repeat_segs; - out_metrics->in_errs = metrics.in_errs; - out_metrics->kcp_in_errs = metrics.kcp_in_errs; - out_metrics->ring_buffer_snd_queue = metrics.ring_buffer_snd_queue; - out_metrics->ring_buffer_rcv_queue = metrics.ring_buffer_rcv_queue; - out_metrics->ring_buffer_snd_buffer = metrics.ring_buffer_snd_buffer; - return 0; -} diff --git a/python/omnisocket/omnisocket_client.h b/python/omnisocket/omnisocket_client.h index cf7eeea..be9e4b2 100644 --- a/python/omnisocket/omnisocket_client.h +++ b/python/omnisocket/omnisocket_client.h @@ -14,33 +14,6 @@ typedef struct omnisocket_session_stats { int connected; } omnisocket_session_stats_t; -typedef struct omnisocket_session_kcp_metrics { - int connected; - int has_conv; - uint32_t conv; - char local_addr[OMNI_MAX_ADDR_TEXT]; - char remote_addr[OMNI_MAX_ADDR_TEXT]; - uint32_t rto_ms; - int32_t srtt_ms; - int32_t srttvar_ms; - uint64_t bytes_sent; - uint64_t bytes_received; - uint64_t in_pkts; - uint64_t out_pkts; - uint64_t in_segs; - uint64_t out_segs; - uint64_t retrans_segs; - uint64_t fast_retrans_segs; - uint64_t early_retrans_segs; - uint64_t lost_segs; - uint64_t repeat_segs; - uint64_t in_errs; - uint64_t kcp_in_errs; - uint64_t ring_buffer_snd_queue; - uint64_t ring_buffer_rcv_queue; - uint64_t ring_buffer_snd_buffer; -} omnisocket_session_kcp_metrics_t; - typedef struct omnisocket_session { pthread_mutex_t mutex; pthread_cond_t idle_cond; @@ -74,9 +47,5 @@ int omnisocket_session_recv_into( int timeout_ms ); void omnisocket_session_stats_snapshot(omnisocket_session_t *session, omnisocket_session_stats_t *out_stats); -int omnisocket_session_kcp_metrics_snapshot( - omnisocket_session_t *session, - omnisocket_session_kcp_metrics_t *out_metrics -); #endif diff --git a/python/omnisocket_a_side/__init__.py b/python/omnisocket_a_side/__init__.py deleted file mode 100644 index 2f51111..0000000 --- a/python/omnisocket_a_side/__init__.py +++ /dev/null @@ -1,9 +0,0 @@ -from pathlib import Path - - -PACKAGE_ROOT = Path(__file__).resolve().parent -PYTHON_ROOT = PACKAGE_ROOT.parent -REPO_ROOT = PYTHON_ROOT.parent -DEFAULT_SOCKET_PATH = "/tmp/omnisocket-a-side.sock" -DEFAULT_CONFIG_PATH = REPO_ROOT / "config" / "a_side_omnidaemon.yaml" -VERSION = "0.1.0" diff --git a/python/omnisocket_a_side/__main__.py b/python/omnisocket_a_side/__main__.py deleted file mode 100644 index c5a0ee7..0000000 --- a/python/omnisocket_a_side/__main__.py +++ /dev/null @@ -1,5 +0,0 @@ -from .daemon import main - - -if __name__ == "__main__": - main() diff --git a/python/omnisocket_a_side/client.py b/python/omnisocket_a_side/client.py deleted file mode 100644 index f02a234..0000000 --- a/python/omnisocket_a_side/client.py +++ /dev/null @@ -1,149 +0,0 @@ -"""Local Unix-domain HTTP client for the A-side OmniDaemon.""" - -from __future__ import annotations - -import http.client -import json -import os -import socket -import threading -from typing import Any - -from . import DEFAULT_SOCKET_PATH - - -class OmniDaemonError(RuntimeError): - def __init__(self, message: str, status_code: int | None = None) -> None: - super().__init__(message) - self.status_code = status_code - - -class UnixHTTPConnection(http.client.HTTPConnection): - def __init__(self, socket_path: str, timeout: float = 2.0) -> None: - super().__init__("localhost", timeout=timeout) - self.socket_path = socket_path - - def connect(self) -> None: # pragma: no cover - runtime depends on Linux socket support - if not hasattr(socket, "AF_UNIX"): - raise OSError("AF_UNIX sockets are not available on this platform") - self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - self.sock.settimeout(self.timeout) - self.sock.connect(self.socket_path) - - -class OmniDaemonClient: - def __init__(self, socket_path: str | None = None, timeout: float = 2.0) -> None: - self.socket_path = socket_path or os.getenv("OMNIDAEMON_SOCKET", DEFAULT_SOCKET_PATH) - self.timeout = timeout - self._local = threading.local() - - def get_health(self) -> dict[str, Any]: - return self._request_json("GET", "/v1/health") - - def get_state(self) -> dict[str, Any]: - return self._request_json("GET", "/v1/state") - - def get_video_frame(self) -> bytes: - return self._request_bytes("GET", "/v1/video/frame") - - def get_control_status(self) -> dict[str, Any]: - return self._request_json("GET", "/v1/control/status") - - def send_control_event( - self, - *, - source: str, - event_code: str, - drive_value: float = 1.0, - client_time_ms: int | None = None, - ) -> dict[str, Any]: - payload = { - "source": source, - "event_code": event_code, - "drive_value": float(drive_value), - "client_time_ms": client_time_ms, - } - return self._request_json("POST", "/v1/control/event", payload) - - def close(self) -> None: - self._reset_connection() - - def _request_json( - self, - method: str, - path: str, - payload: dict[str, Any] | None = None, - ) -> dict[str, Any]: - raw = self._request_bytes(method, path, payload) - if not raw: - return {} - try: - return json.loads(raw.decode("utf-8")) - except json.JSONDecodeError as error: - raise OmniDaemonError(f"invalid daemon JSON response: {error}") from error - - def _request_bytes( - self, - method: str, - path: str, - payload: dict[str, Any] | None = None, - ) -> bytes: - body = b"" - headers: dict[str, str] = {} - if payload is not None: - body = json.dumps(payload).encode("utf-8") - headers["Content-Type"] = "application/json" - headers["Content-Length"] = str(len(body)) - headers.setdefault("Connection", "keep-alive") - - for attempt in range(2): - connection = self._get_connection() - try: - connection.request(method, path, body=body, headers=headers) - response = connection.getresponse() - raw = response.read() - except FileNotFoundError as error: - self._reset_connection() - raise OmniDaemonError( - f"daemon socket not found: {self.socket_path}" - ) from error - except (OSError, http.client.HTTPException) as error: - self._reset_connection() - if attempt == 0: - continue - raise OmniDaemonError( - f"daemon request failed via {self.socket_path}: {error}" - ) from error - - if getattr(response, "will_close", False): - self._reset_connection() - - if response.status >= 400: - message = raw.decode("utf-8", errors="replace").strip() or response.reason - try: - parsed = json.loads(message) - if isinstance(parsed, dict) and "error" in parsed: - message = str(parsed["error"]) - except json.JSONDecodeError: - pass - raise OmniDaemonError(message, status_code=response.status) - return raw - - raise OmniDaemonError(f"daemon request failed via {self.socket_path}: retry exhausted") - - def _get_connection(self) -> UnixHTTPConnection: - connection = getattr(self._local, "connection", None) - if connection is None: - connection = UnixHTTPConnection(self.socket_path, timeout=self.timeout) - self._local.connection = connection - return connection - - def _reset_connection(self) -> None: - connection = getattr(self._local, "connection", None) - if connection is None: - return - try: - connection.close() - except OSError: - pass - self._local.connection = None diff --git a/python/omnisocket_a_side/control_codec.py b/python/omnisocket_a_side/control_codec.py deleted file mode 100644 index 0026355..0000000 --- a/python/omnisocket_a_side/control_codec.py +++ /dev/null @@ -1,90 +0,0 @@ -"""Binary control packet codec shared by the daemon and local clients.""" - -from __future__ import annotations - -from dataclasses import dataclass -import struct -import time - - -CONTROL_PACKET_VERSION = 1 -CONTROL_PACKET_STRUCT = struct.Struct("!BBHIfQ") - -EVENT_NAME_TO_ID = { - "pose_home": 1, - "pose_hold": 2, - "mode_stride": 3, - "surge_up": 6, - "surge_down": 7, - "sway_left": 8, - "sway_right": 9, - "spin_left": 10, - "spin_right": 11, - "set_surge": 12, - "set_sway": 13, - "set_spin": 14, - "set_lift": 15, - "lift_up": 16, - "lift_down": 17, - "trim_reset": 18, - "session_quit": 19, -} -EVENT_ID_TO_NAME = {value: key for key, value in EVENT_NAME_TO_ID.items()} -ANALOG_EVENT_CODES = {"set_surge", "set_sway", "set_spin"} - - -@dataclass(slots=True) -class ControlPacket: - seq_id: int - event_id: int - drive_value: float = 1.0 - sent_at_ns: int = 0 - - @property - def event_name(self) -> str: - return EVENT_ID_TO_NAME.get(self.event_id, f"unknown_{self.event_id}") - - def encode(self) -> bytes: - sent_at_ns = self.sent_at_ns or time.time_ns() - return CONTROL_PACKET_STRUCT.pack( - CONTROL_PACKET_VERSION, - self.event_id, - 0, - int(self.seq_id), - float(self.drive_value), - int(sent_at_ns), - ) - - @classmethod - def decode(cls, payload: bytes) -> "ControlPacket": - if len(payload) != CONTROL_PACKET_STRUCT.size: - raise ValueError( - f"invalid control packet length {len(payload)}; " - f"want {CONTROL_PACKET_STRUCT.size}" - ) - version, event_id, _reserved, seq_id, drive_value, sent_at_ns = ( - CONTROL_PACKET_STRUCT.unpack(payload) - ) - if version != CONTROL_PACKET_VERSION: - raise ValueError(f"unsupported control packet version {version}") - return cls( - seq_id=int(seq_id), - event_id=int(event_id), - drive_value=float(drive_value), - sent_at_ns=int(sent_at_ns), - ) - - -def make_control_packet( - seq_id: int, - event_name: str, - drive_value: float = 1.0, - sent_at_ns: int | None = None, -) -> ControlPacket: - event_id = EVENT_NAME_TO_ID[event_name] - return ControlPacket( - seq_id=seq_id, - event_id=event_id, - drive_value=drive_value, - sent_at_ns=time.time_ns() if sent_at_ns is None else sent_at_ns, - ) diff --git a/python/omnisocket_a_side/daemon.py b/python/omnisocket_a_side/daemon.py deleted file mode 100644 index ef953fa..0000000 --- a/python/omnisocket_a_side/daemon.py +++ /dev/null @@ -1,1063 +0,0 @@ -"""A-side OmniDaemon that owns local control/video OmniSocket sessions.""" - -from __future__ import annotations - -import argparse -import copy -import json -import os -from pathlib import Path -import queue -import signal -import socketserver -import threading -import time -from dataclasses import dataclass -from datetime import UTC, datetime -from http import HTTPStatus -from http.server import BaseHTTPRequestHandler -from typing import Any - -import yaml - -from . import DEFAULT_CONFIG_PATH, DEFAULT_SOCKET_PATH, VERSION -from .control_codec import ANALOG_EVENT_CODES, EVENT_NAME_TO_ID, make_control_packet - - -def utc_iso_now() -> str: - return datetime.now(UTC).isoformat(timespec="seconds").replace("+00:00", "Z") - - -def load_omnisocket_api(): - from omnisocket import CONTROL_DEFAULTS, MSG_TYPE_BINARY, Session, VIDEO_DEFAULTS - - return CONTROL_DEFAULTS, MSG_TYPE_BINARY, Session, VIDEO_DEFAULTS - - -def _merge_kcp_defaults(defaults: dict[str, Any], override: dict[str, Any]) -> dict[str, Any]: - merged = dict(defaults) - for key in ("nodelay", "interval_ms", "resend", "nc", "sndwnd", "rcvwnd", "mtu"): - if key in override: - merged[key] = int(override[key]) - return merged - - -def _load_config(config_path: str | None) -> dict[str, Any]: - path = Path(os.getenv("OMNIDAEMON_CONFIG", config_path or str(DEFAULT_CONFIG_PATH))) - raw: dict[str, Any] = {} - if path.exists(): - with path.open("r", encoding="utf-8") as file: - raw = yaml.safe_load(file) or {} - - control_defaults, _msg_type_binary, _session_cls, video_defaults = load_omnisocket_api() - - transport = dict(raw.get("transport", {})) - control = dict(raw.get("control_sender", {})) - video = dict(raw.get("video_receiver", {})) - daemon_cfg = dict(raw.get("daemon", {})) - policy = dict(raw.get("policy", {})) - - def _profile(name: str, fps: int, max_frame_kb: int) -> dict[str, int]: - section = dict(policy.get(name, {})) - return { - "fps": int(section.get("fps", fps)), - "max_frame_bytes": int(section.get("max_frame_kb", max_frame_kb)) * 1024, - } - - return { - "config_path": str(path), - "transport": { - "server_addr": str(transport.get("server_addr", "")), - "relay_via": str(transport.get("relay_via", "")), - "bind_ip": str(transport.get("bind_ip", "")), - "bind_device": str(transport.get("bind_device", "")), - }, - "control_sender": { - "peer_id": str(control.get("peer_id", "peer-a-ctrl")), - "target_peer": str(control.get("target_peer", "peer-b-ctrl")), - "stats_interval_ms": int(control.get("stats_interval_ms", 100)), - "kcp": _merge_kcp_defaults(control_defaults, control), - }, - "video_receiver": { - "peer_id": str(video.get("peer_id", "peer-a-video")), - "buffer_bytes": int(video.get("buffer_bytes", 1024 * 1024)), - "stats_interval_ms": int(video.get("stats_interval_ms", 100)), - "kcp": _merge_kcp_defaults(video_defaults, video), - }, - "daemon": { - "socket_path": os.getenv( - "OMNIDAEMON_SOCKET", - str(daemon_cfg.get("socket_path", DEFAULT_SOCKET_PATH)), - ), - "reconnect_delay_ms": int(daemon_cfg.get("reconnect_delay_ms", 2000)), - "telemetry_interval_ms": int(daemon_cfg.get("telemetry_interval_ms", 100)), - "analog_send_hz": int(daemon_cfg.get("analog_send_hz", 100)), - "frame_stale_ms": int(daemon_cfg.get("frame_stale_ms", 500)), - }, - "policy": { - "health_window_ms": int(policy.get("health_window_ms", 2000)), - "green_srtt_ms": int(policy.get("green_srtt_ms", 35)), - "yellow_srtt_ms": int(policy.get("yellow_srtt_ms", 60)), - "retrans_red_threshold": int(policy.get("retrans_red_threshold", 10)), - "profile_green": _profile("profile_green", 15, 60), - "profile_yellow": _profile("profile_yellow", 10, 40), - "profile_red": _profile("profile_red", 5, 20), - }, - } - - -def _default_stats() -> dict[str, int]: - return { - "send_calls": 0, - "send_bytes": 0, - "send_errors": 0, - "recv_calls": 0, - "recv_bytes": 0, - "recv_timeouts": 0, - "recv_errors": 0, - "connected": 0, - } - - -def _default_kcp_metrics() -> dict[str, Any]: - return { - "connected": 0, - "has_conv": 0, - "conv": 0, - "local_addr": "", - "remote_addr": "", - "rto_ms": 0, - "srtt_ms": 0, - "srttvar_ms": 0, - "bytes_sent": 0, - "bytes_received": 0, - "in_pkts": 0, - "out_pkts": 0, - "in_segs": 0, - "out_segs": 0, - "retrans_segs": 0, - "fast_retrans_segs": 0, - "early_retrans_segs": 0, - "lost_segs": 0, - "repeat_segs": 0, - "in_errs": 0, - "kcp_in_errs": 0, - "ring_buffer_snd_queue": 0, - "ring_buffer_rcv_queue": 0, - "ring_buffer_snd_buffer": 0, - } - - -@dataclass(slots=True) -class QueuedControlEvent: - seq_id: int - source: str - event_code: str - drive_value: float - client_time_ms: int - - -class ControlSessionManager: - def __init__(self, config: dict[str, Any]) -> None: - control_defaults, _msg_type_binary, session_cls, _video_defaults = load_omnisocket_api() - transport = config["transport"] - control_cfg = config["control_sender"] - daemon_cfg = config["daemon"] - - self._session_cls = session_cls - self._connect_kwargs = { - "server_addr": transport["server_addr"], - "relay_via": transport["relay_via"], - "bind_ip": transport["bind_ip"], - "bind_device": transport["bind_device"], - "peer_id": control_cfg["peer_id"], - "stats_interval_ms": control_cfg["stats_interval_ms"], - **_merge_kcp_defaults(control_defaults, control_cfg["kcp"]), - } - self._target_peer = control_cfg["target_peer"] - self._analog_interval = 1.0 / max(1, daemon_cfg["analog_send_hz"]) - self._reconnect_delay_s = max(0.25, daemon_cfg["reconnect_delay_ms"] / 1000.0) - - self._lock = threading.Lock() - self._stop_event = threading.Event() - self._wake_event = threading.Event() - self._discrete_queue: queue.Queue[QueuedControlEvent] = queue.Queue() - self._pending_analog: dict[str, QueuedControlEvent] = {} - self._session = None - self._connected = False - self._seq_id = 0 - self._last_seq_id: int | None = None - self._last_error = "" - self._last_connect_attempt = 0.0 - self._next_analog_flush = 0.0 - self._thread = threading.Thread( - target=self._run, - name="omni-a-side-control", - daemon=True, - ) - - def start(self) -> None: - self._thread.start() - - def stop(self) -> None: - self._stop_event.set() - self._wake_event.set() - if self._thread.is_alive(): - self._thread.join(timeout=2.0) - self._disconnect("control daemon stopped", drop_pending=True) - - def send_event( - self, - *, - source: str, - event_code: str, - drive_value: float, - client_time_ms: int | None, - ) -> dict[str, Any]: - if event_code not in EVENT_NAME_TO_ID: - return { - "accepted": False, - "assigned_seq_id": None, - "control_connected": self.is_connected(), - "queue_depth": self.queue_depth(), - "error": f"unknown event_code: {event_code}", - } - - if not self.is_connected(): - return { - "accepted": False, - "assigned_seq_id": None, - "control_connected": False, - "queue_depth": self.queue_depth(), - "error": self.last_error() or "control session is not connected", - } - - with self._lock: - seq_id = self._seq_id - self._seq_id += 1 - event = QueuedControlEvent( - seq_id=seq_id, - source=source, - event_code=event_code, - drive_value=float(drive_value), - client_time_ms=int(client_time_ms or time.time() * 1000), - ) - if event_code in ANALOG_EVENT_CODES: - self._pending_analog[event_code] = event - else: - self._discrete_queue.put_nowait(event) - self._wake_event.set() - return { - "accepted": True, - "assigned_seq_id": seq_id, - "control_connected": True, - "queue_depth": self.queue_depth(), - "error": "", - } - - def is_connected(self) -> bool: - with self._lock: - return self._connected and self._session is not None - - def queue_depth(self) -> int: - with self._lock: - return self._discrete_queue.qsize() + len(self._pending_analog) - - def last_error(self) -> str: - with self._lock: - return self._last_error - - def snapshot(self) -> dict[str, Any]: - with self._lock: - session = self._session - connected = self._connected and session is not None - last_error = self._last_error - last_seq_id = self._last_seq_id - queue_depth = self._discrete_queue.qsize() + len(self._pending_analog) - - stats = _default_stats() - metrics = _default_kcp_metrics() - if session is not None: - try: - stats = dict(session.stats()) - metrics = dict(session.kcp_metrics()) - connected = bool(stats.get("connected") and metrics.get("connected")) - except Exception as error: # pragma: no cover - runtime integration - connected = False - last_error = str(error) - - return { - "connected": connected, - "queue_depth": queue_depth, - "last_seq_id": last_seq_id, - "last_error": last_error, - "peer_id": self._connect_kwargs["peer_id"], - "target_peer": self._target_peer, - "server_addr": self._connect_kwargs["server_addr"], - "relay_via": self._connect_kwargs["relay_via"], - "stats": stats, - "kcp_metrics": metrics, - } - - def _run(self) -> None: - analog_order = ("set_surge", "set_sway", "set_spin") - while not self._stop_event.is_set(): - if not self.is_connected(): - now = time.monotonic() - if now - self._last_connect_attempt >= self._reconnect_delay_s: - self._last_connect_attempt = now - self._connect() - self._wake_event.wait(0.2) - self._wake_event.clear() - continue - - sent_any = False - while not self._stop_event.is_set(): - try: - event = self._discrete_queue.get_nowait() - except queue.Empty: - break - if not self._send_packet(event): - break - sent_any = True - - now = time.monotonic() - if now >= self._next_analog_flush: - analog_batch: list[QueuedControlEvent] = [] - with self._lock: - for event_code in analog_order: - event = self._pending_analog.pop(event_code, None) - if event is not None: - analog_batch.append(event) - if analog_batch: - for event in analog_batch: - if not self._send_packet(event): - break - sent_any = True - self._next_analog_flush = now + self._analog_interval - - if not sent_any: - wait_s = max(0.02, self._next_analog_flush - time.monotonic()) - self._wake_event.wait(wait_s) - self._wake_event.clear() - - def _connect(self) -> None: - session = self._session_cls() - try: - session.connect(**self._connect_kwargs) - except Exception as error: # pragma: no cover - runtime integration - with self._lock: - self._connected = False - self._last_error = str(error) - try: - session.close() - except Exception: - pass - return - - with self._lock: - self._session = session - self._connected = True - self._last_error = "" - self._next_analog_flush = time.monotonic() - - def _disconnect(self, error_message: str, *, drop_pending: bool) -> None: - with self._lock: - session = self._session - self._session = None - self._connected = False - if error_message: - self._last_error = error_message - if drop_pending: - self._pending_analog.clear() - while not self._discrete_queue.empty(): - try: - self._discrete_queue.get_nowait() - except queue.Empty: - break - if session is not None: - try: - session.close() - except Exception: - pass - - def _send_packet(self, event: QueuedControlEvent) -> bool: - with self._lock: - session = self._session - if session is None: - return False - - packet = make_control_packet(event.seq_id, event.event_code, event.drive_value) - try: - session.send(to=self._target_peer, data=packet.encode()) - except Exception as error: # pragma: no cover - runtime integration - self._disconnect(str(error), drop_pending=True) - return False - - with self._lock: - self._last_seq_id = event.seq_id - self._last_error = "" - return True - - -class VideoSessionManager: - def __init__(self, config: dict[str, Any]) -> None: - _control_defaults, msg_type_binary, session_cls, video_defaults = load_omnisocket_api() - transport = config["transport"] - video_cfg = config["video_receiver"] - daemon_cfg = config["daemon"] - - self._msg_type_binary = msg_type_binary - self._session_cls = session_cls - self._connect_kwargs = { - "server_addr": transport["server_addr"], - "relay_via": transport["relay_via"], - "bind_ip": transport["bind_ip"], - "bind_device": transport["bind_device"], - "peer_id": video_cfg["peer_id"], - "stats_interval_ms": video_cfg["stats_interval_ms"], - **_merge_kcp_defaults(video_defaults, video_cfg["kcp"]), - } - self._buffer_bytes = video_cfg["buffer_bytes"] - self._frame_stale_s = max(0.1, daemon_cfg["frame_stale_ms"] / 1000.0) - self._reconnect_delay_s = max(0.25, daemon_cfg["reconnect_delay_ms"] / 1000.0) - self._config_path = config["config_path"] - - self._lock = threading.Lock() - self._stop_event = threading.Event() - self._session = None - self._latest_frame: bytes | None = None - self._latest_received_at = 0.0 - self._latest_sequence: int | None = None - self._frames_received = 0 - self._last_error = "" - self._last_connect_attempt = 0.0 - self._thread = threading.Thread( - target=self._run, - name="omni-a-side-video", - daemon=True, - ) - - def start(self) -> None: - self._thread.start() - - def stop(self) -> None: - self._stop_event.set() - if self._thread.is_alive(): - self._thread.join(timeout=2.0) - self._disconnect("video daemon stopped") - - def get_latest_frame(self) -> bytes | None: - with self._lock: - if self._latest_frame is None: - return None - if time.time() - self._latest_received_at > self._frame_stale_s: - return None - return self._latest_frame - - def snapshot(self) -> dict[str, Any]: - with self._lock: - session = self._session - latest_received_at = self._latest_received_at - latest_sequence = self._latest_sequence - frames_received = self._frames_received - last_error = self._last_error - has_recent_frame = ( - self._latest_frame is not None - and time.time() - self._latest_received_at <= self._frame_stale_s - ) - - stats = _default_stats() - metrics = _default_kcp_metrics() - connected = session is not None - if session is not None: - try: - stats = dict(session.stats()) - metrics = dict(session.kcp_metrics()) - connected = bool(stats.get("connected") and metrics.get("connected")) - except Exception as error: # pragma: no cover - runtime integration - connected = False - last_error = str(error) - - return { - "connected": connected, - "has_recent_frame": has_recent_frame, - "latest_received_at": latest_received_at, - "latest_sequence": latest_sequence, - "frames_received": frames_received, - "last_error": last_error, - "buffer_bytes": self._buffer_bytes, - "peer_id": self._connect_kwargs["peer_id"], - "server_addr": self._connect_kwargs["server_addr"], - "relay_via": self._connect_kwargs["relay_via"], - "config_path": self._config_path, - "stats": stats, - "kcp_metrics": metrics, - } - - def _run(self) -> None: - while not self._stop_event.is_set(): - if not self._is_connected(): - now = time.monotonic() - if now - self._last_connect_attempt >= self._reconnect_delay_s: - self._last_connect_attempt = now - self._connect() - time.sleep(0.2) - continue - - with self._lock: - session = self._session - if session is None: - continue - - buffer = bytearray(self._buffer_bytes) - try: - while not self._stop_event.is_set(): - meta = session.recv_into(buffer, timeout_ms=200) - if meta is None: - continue - if meta.get("msg_type") != self._msg_type_binary: - continue - frame = bytes(buffer[: int(meta["body_len"])]) - jpeg_frame = self._extract_jpeg_frame(frame) - if jpeg_frame is None: - with self._lock: - self._last_error = "received non-JPEG binary frame" - continue - with self._lock: - self._latest_frame = jpeg_frame - self._latest_received_at = time.time() - self._latest_sequence = self._extract_sequence(frame) - self._frames_received += 1 - self._last_error = "" - except Exception as error: # pragma: no cover - runtime integration - self._disconnect(str(error)) - time.sleep(0.2) - - def _connect(self) -> None: - session = self._session_cls() - try: - session.connect(**self._connect_kwargs) - except Exception as error: # pragma: no cover - runtime integration - with self._lock: - self._session = None - self._last_error = str(error) - try: - session.close() - except Exception: - pass - return - - with self._lock: - self._session = session - self._last_error = "" - - def _disconnect(self, error_message: str) -> None: - with self._lock: - session = self._session - self._session = None - self._last_error = error_message - if session is not None: - try: - session.close() - except Exception: - pass - - def _is_connected(self) -> bool: - with self._lock: - return self._session is not None - - @staticmethod - def _extract_jpeg_frame(frame: bytes) -> bytes | None: - if frame.startswith(b"\xff\xd8"): - return frame - if len(frame) > 8 and frame[8:10] == b"\xff\xd8": - return frame[8:] - return None - - @staticmethod - def _extract_sequence(frame: bytes) -> int | None: - if len(frame) >= 8 and not frame.startswith(b"\xff\xd8"): - return int.from_bytes(frame[:8], "big") - return None - - -class PolicyEngine: - def __init__(self, policy_cfg: dict[str, Any]) -> None: - self._policy_cfg = policy_cfg - - def classify( - self, - *, - control_connected: bool, - avg_srtt_ms: float, - retrans_delta: int, - lost_delta: int, - ) -> tuple[str, dict[str, int]]: - if not control_connected: - return "red", dict(self._policy_cfg["profile_red"]) - if ( - avg_srtt_ms >= self._policy_cfg["yellow_srtt_ms"] - or lost_delta > 0 - or retrans_delta >= self._policy_cfg["retrans_red_threshold"] - ): - return "red", dict(self._policy_cfg["profile_red"]) - if avg_srtt_ms >= self._policy_cfg["green_srtt_ms"] or retrans_delta > 0: - return "yellow", dict(self._policy_cfg["profile_yellow"]) - return "green", dict(self._policy_cfg["profile_green"]) - - -class TelemetrySampler: - def __init__( - self, - config: dict[str, Any], - control_manager: ControlSessionManager, - video_manager: VideoSessionManager, - ) -> None: - self._config = config - self._control_manager = control_manager - self._video_manager = video_manager - self._policy_engine = PolicyEngine(config["policy"]) - self._interval_s = max(0.1, config["daemon"]["telemetry_interval_ms"] / 1000.0) - self._window_s = max(0.5, config["policy"]["health_window_ms"] / 1000.0) - - self._lock = threading.Lock() - self._stop_event = threading.Event() - self._thread = threading.Thread( - target=self._run, - name="omni-a-side-telemetry", - daemon=True, - ) - self._state = self._build_initial_state() - self._control_history: list[dict[str, Any]] = [] - self._last_totals: dict[str, Any] | None = None - self._started_at = utc_iso_now() - - def start(self) -> None: - self._thread.start() - - def stop(self) -> None: - self._stop_event.set() - if self._thread.is_alive(): - self._thread.join(timeout=2.0) - - def snapshot(self) -> dict[str, Any]: - with self._lock: - return copy.deepcopy(self._state) - - def _run(self) -> None: - while not self._stop_event.is_set(): - self._sample_once() - self._stop_event.wait(self._interval_s) - - def _sample_once(self) -> None: - now = time.time() - control = self._control_manager.snapshot() - video = self._video_manager.snapshot() - - control_metrics = control["kcp_metrics"] - video_metrics = video["kcp_metrics"] - - self._control_history.append( - { - "ts": now, - "connected": bool(control["connected"]), - "srtt_ms": float(control_metrics.get("srtt_ms", 0.0) or 0.0), - "retrans_segs": int(control_metrics.get("retrans_segs", 0) or 0), - "lost_segs": int(control_metrics.get("lost_segs", 0) or 0), - } - ) - self._control_history = [ - item for item in self._control_history if now - item["ts"] <= self._window_s - ] - - avg_srtt_ms = 0.0 - jitter_ms = 0.0 - retrans_delta_window = 0 - lost_delta_window = 0 - srtt_values = [item["srtt_ms"] for item in self._control_history if item["connected"]] - if srtt_values: - avg_srtt_ms = sum(srtt_values) / len(srtt_values) - if len(srtt_values) >= 2: - jitter_samples = [ - abs(srtt_values[index] - srtt_values[index - 1]) - for index in range(1, len(srtt_values)) - ] - jitter_ms = sum(jitter_samples) / len(jitter_samples) - if len(self._control_history) >= 2: - first = self._control_history[0] - last = self._control_history[-1] - retrans_delta_window = max(0, last["retrans_segs"] - first["retrans_segs"]) - lost_delta_window = max(0, last["lost_segs"] - first["lost_segs"]) - - health_band, profile = self._policy_engine.classify( - control_connected=bool(control["connected"]), - avg_srtt_ms=avg_srtt_ms, - retrans_delta=retrans_delta_window, - lost_delta=lost_delta_window, - ) - - total_bytes_sent = int(control_metrics.get("bytes_sent", 0)) + int(video_metrics.get("bytes_sent", 0)) - total_bytes_received = int(control_metrics.get("bytes_received", 0)) + int(video_metrics.get("bytes_received", 0)) - total_out_segs = int(control_metrics.get("out_segs", 0)) + int(video_metrics.get("out_segs", 0)) - total_retrans = int(control_metrics.get("retrans_segs", 0)) + int(video_metrics.get("retrans_segs", 0)) - tx_kbps = 0 - rx_kbps = 0 - retrans_pct = 0.0 - if self._last_totals is not None: - dt = max(0.001, now - self._last_totals["ts"]) - sent_delta = max(0, total_bytes_sent - self._last_totals["bytes_sent"]) - recv_delta = max(0, total_bytes_received - self._last_totals["bytes_received"]) - out_seg_delta = max(0, total_out_segs - self._last_totals["out_segs"]) - retrans_delta = max(0, total_retrans - self._last_totals["retrans"]) - tx_kbps = int((sent_delta * 8.0) / dt / 1000.0) - rx_kbps = int((recv_delta * 8.0) / dt / 1000.0) - if out_seg_delta > 0: - retrans_pct = round((retrans_delta / out_seg_delta) * 100.0, 2) - self._last_totals = { - "ts": now, - "bytes_sent": total_bytes_sent, - "bytes_received": total_bytes_received, - "out_segs": total_out_segs, - "retrans": total_retrans, - } - - network = { - "peer_status": "online" if control["connected"] or video["connected"] else "offline", - "latency_ms": round(avg_srtt_ms or float(control_metrics.get("srtt_ms", 0) or 0.0), 1), - "jitter_ms": round(jitter_ms, 1), - # Keep packet_loss_pct as a compatibility alias for existing clients. - "retrans_pct": round(retrans_pct, 2), - "packet_loss_pct": round(retrans_pct, 2), - "tx_kbps": tx_kbps, - "rx_kbps": rx_kbps, - "signal_dbm": None, - "transport": "OmniSocket / daemon", - "source_mode": "daemon-live", - "updated_at": utc_iso_now(), - } - - video_status = { - "available": bool(video["has_recent_frame"]), - "source_mode": "omnisocket-jpeg-live" if video["has_recent_frame"] else "omnisocket-waiting", - "frame_count": int(video["frames_received"]), - "fps": int(profile["fps"]), - "frame_dir": "omni-daemon://latest-frame", - "source_detail": ( - f"peer stream active, frames={video['frames_received']}" - if video["has_recent_frame"] - else (video["last_error"] or "waiting for latest JPEG frame from daemon") - ), - "receiver": { - "backend_ready": True, - "mode": "daemon", - "connected": bool(video["connected"]), - "has_recent_frame": bool(video["has_recent_frame"]), - "frames_received": int(video["frames_received"]), - "latest_sequence": video["latest_sequence"], - "last_error": video["last_error"], - "config_path": video["config_path"], - "server_addr": video["server_addr"], - "relay_via": video["relay_via"], - "peer_id": video["peer_id"], - "buffer_bytes": int(video["buffer_bytes"]), - "stats": video["stats"], - "kcp_metrics": video_metrics, - }, - } - - state = { - "network": network, - "video": video_status, - "control": { - "connected": bool(control["connected"]), - "queue_depth": int(control["queue_depth"]), - "last_seq_id": control["last_seq_id"], - "last_error": control["last_error"], - "peer_id": control["peer_id"], - "target_peer": control["target_peer"], - "server_addr": control["server_addr"], - "relay_via": control["relay_via"], - "stats": control["stats"], - "kcp_metrics": control_metrics, - }, - "policy": { - "health_band": health_band, - "recommended_video_profile": profile, - }, - "daemon": { - "started_at": self._started_at, - "socket_path": self._config["daemon"]["socket_path"], - "version": VERSION, - }, - } - - with self._lock: - self._state = state - - @staticmethod - def _build_initial_state() -> dict[str, Any]: - return { - "network": { - "peer_status": "offline", - "latency_ms": 0.0, - "jitter_ms": 0.0, - "retrans_pct": 0.0, - "packet_loss_pct": 0.0, - "tx_kbps": 0, - "rx_kbps": 0, - "signal_dbm": None, - "transport": "OmniSocket / daemon", - "source_mode": "daemon-starting", - "updated_at": utc_iso_now(), - }, - "video": { - "available": False, - "source_mode": "omnisocket-waiting", - "frame_count": 0, - "fps": 15, - "frame_dir": "omni-daemon://latest-frame", - "source_detail": "daemon is starting", - "receiver": { - "backend_ready": True, - "mode": "daemon", - "connected": False, - "has_recent_frame": False, - "frames_received": 0, - "latest_sequence": None, - "last_error": "", - "config_path": "", - "server_addr": "", - "relay_via": "", - "peer_id": "", - "buffer_bytes": 0, - }, - }, - "control": { - "connected": False, - "queue_depth": 0, - "last_seq_id": None, - "last_error": "", - "peer_id": "", - "target_peer": "", - "server_addr": "", - "relay_via": "", - "stats": _default_stats(), - "kcp_metrics": _default_kcp_metrics(), - }, - "policy": { - "health_band": "red", - "recommended_video_profile": { - "fps": 5, - "max_frame_bytes": 20 * 1024, - }, - }, - "daemon": { - "started_at": utc_iso_now(), - "socket_path": DEFAULT_SOCKET_PATH, - "version": VERSION, - }, - } - - -class ThreadingUnixHTTPServer(socketserver.ThreadingMixIn, socketserver.UnixStreamServer): - daemon_threads = True - - -class OmniDaemonHTTPHandler(BaseHTTPRequestHandler): - server_version = "OmniDaemonHTTP/1.0" - protocol_version = "HTTP/1.1" - - def do_GET(self) -> None: # pragma: no cover - exercised by runtime integration - app: ASideOmniDaemon = self.server.app # type: ignore[attr-defined] - if self.path == "/v1/health": - state = app.get_state() - control = state["control"] - video = state["video"]["receiver"] - policy = state["policy"] - status = "ok" if policy["health_band"] == "green" else "degraded" - if not control["connected"] and not video["connected"]: - status = "unavailable" - self._send_json( - HTTPStatus.OK, - { - "status": status, - "health_band": policy["health_band"], - "control_connected": control["connected"], - "video_connected": video["connected"], - "updated_at": state["network"]["updated_at"], - }, - ) - return - if self.path == "/v1/state": - self._send_json(HTTPStatus.OK, app.get_state()) - return - if self.path == "/v1/control/status": - self._send_json(HTTPStatus.OK, app.get_state()["control"]) - return - if self.path == "/v1/video/frame": - frame = app.get_latest_frame() - if frame is None: - self._send_json( - HTTPStatus.SERVICE_UNAVAILABLE, - {"error": "no fresh JPEG frame available"}, - ) - return - self._send_bytes(HTTPStatus.OK, frame, "image/jpeg") - return - self._send_json(HTTPStatus.NOT_FOUND, {"error": f"unknown path: {self.path}"}) - - def do_POST(self) -> None: # pragma: no cover - exercised by runtime integration - app: ASideOmniDaemon = self.server.app # type: ignore[attr-defined] - if self.path != "/v1/control/event": - self._send_json(HTTPStatus.NOT_FOUND, {"error": f"unknown path: {self.path}"}) - return - - try: - payload = self._read_json() - except ValueError as error: - self._send_json(HTTPStatus.BAD_REQUEST, {"error": str(error)}) - return - - result = app.send_control_event( - source=str(payload.get("source", "unknown")), - event_code=str(payload.get("event_code", "")), - drive_value=float(payload.get("drive_value", 1.0)), - client_time_ms=payload.get("client_time_ms"), - ) - status = HTTPStatus.OK if result.get("accepted") else HTTPStatus.SERVICE_UNAVAILABLE - self._send_json(status, result) - - def log_message(self, format: str, *args: Any) -> None: # noqa: A003 - return - - def _read_json(self) -> dict[str, Any]: - raw_length = self.headers.get("Content-Length") - if raw_length is None: - raise ValueError("missing Content-Length") - try: - length = int(raw_length) - except ValueError as error: - raise ValueError("invalid Content-Length") from error - raw = self.rfile.read(length) - try: - payload = json.loads(raw.decode("utf-8")) - except json.JSONDecodeError as error: - raise ValueError(f"invalid JSON body: {error}") from error - if not isinstance(payload, dict): - raise ValueError("request body must be a JSON object") - return payload - - def _send_json(self, status: HTTPStatus, payload: dict[str, Any]) -> None: - raw = json.dumps(payload).encode("utf-8") - self._send_bytes(status, raw, "application/json; charset=utf-8") - - def _send_bytes(self, status: HTTPStatus, payload: bytes, content_type: str) -> None: - self.send_response(status.value) - self.send_header("Content-Type", content_type) - self.send_header("Content-Length", str(len(payload))) - self.send_header("Cache-Control", "no-store") - self.send_header("Connection", "keep-alive") - self.end_headers() - self.wfile.write(payload) - - -class ASideOmniDaemon: - def __init__(self, config_path: str | None = None) -> None: - self._config = _load_config(config_path) - self._control_manager = ControlSessionManager(self._config) - self._video_manager = VideoSessionManager(self._config) - self._telemetry = TelemetrySampler( - self._config, - self._control_manager, - self._video_manager, - ) - self._server: ThreadingUnixHTTPServer | None = None - - @property - def socket_path(self) -> str: - return self._config["daemon"]["socket_path"] - - def start(self) -> None: - self._control_manager.start() - self._video_manager.start() - self._telemetry.start() - self._server = self._build_server() - - def stop(self) -> None: - if self._server is not None: - self._server.shutdown() - self._server.server_close() - self._server = None - try: - os.unlink(self.socket_path) - except FileNotFoundError: - pass - self._telemetry.stop() - self._video_manager.stop() - self._control_manager.stop() - - def serve_forever(self) -> None: - self.start() - assert self._server is not None - self._server.serve_forever() - - def get_state(self) -> dict[str, Any]: - return self._telemetry.snapshot() - - def get_latest_frame(self) -> bytes | None: - return self._video_manager.get_latest_frame() - - def send_control_event( - self, - *, - source: str, - event_code: str, - drive_value: float, - client_time_ms: int | None, - ) -> dict[str, Any]: - return self._control_manager.send_event( - source=source, - event_code=event_code, - drive_value=drive_value, - client_time_ms=client_time_ms, - ) - - def _build_server(self) -> ThreadingUnixHTTPServer: - socket_path = self.socket_path - socket_dir = os.path.dirname(socket_path) - if socket_dir: - os.makedirs(socket_dir, exist_ok=True) - try: - os.unlink(socket_path) - except FileNotFoundError: - pass - server = ThreadingUnixHTTPServer(socket_path, OmniDaemonHTTPHandler) - server.app = self # type: ignore[attr-defined] - return server - - -def main(argv: list[str] | None = None) -> None: - parser = argparse.ArgumentParser(description="Run the A-side OmniDaemon") - parser.add_argument("--config", dest="config_path", help="Path to daemon YAML config") - args = parser.parse_args(argv) - - app = ASideOmniDaemon(config_path=args.config_path) - - def _handle_signal(_signum: int, _frame: Any) -> None: - app.stop() - raise SystemExit(0) - - signal.signal(signal.SIGINT, _handle_signal) - signal.signal(signal.SIGTERM, _handle_signal) - - try: - app.serve_forever() - except KeyboardInterrupt: - pass - finally: - app.stop() - - -if __name__ == "__main__": - main() diff --git a/python/setup.py b/python/setup.py index 040ddaa..f302f32 100644 --- a/python/setup.py +++ b/python/setup.py @@ -35,12 +35,7 @@ COMMON_SOURCES = [ setup( name="omnisocket", version="0.1.0", - packages=["omnisocket", "omnisocket_a_side"], - entry_points={ - "console_scripts": [ - "omnisocket-a-side-daemon=omnisocket_a_side.daemon:main", - ] - }, + packages=["omnisocket"], ext_modules=[ Extension( "omnisocket._omnisocket", diff --git a/src/peer_kcp_client.c b/src/peer_kcp_client.c index f568272..ec969c0 100644 --- a/src/peer_kcp_client.c +++ b/src/peer_kcp_client.c @@ -294,14 +294,6 @@ int kcp_client_persist_message(kcp_client_t *client, const message_t *msg, const return 0; } -int kcp_client_metrics_snapshot(kcp_client_t *client, kcp_conn_metrics_t *out_metrics) { - if (client == NULL || out_metrics == NULL) { - errno = EINVAL; - return -1; - } - return kcp_conn_metrics_snapshot(client->conn, out_metrics); -} - int kcp_client_close(kcp_client_t *client) { return client == NULL ? 0 : kcp_conn_close(client->conn); } diff --git a/src/transport_kcp.c b/src/transport_kcp.c index ef95625..30778ac 100644 --- a/src/transport_kcp.c +++ b/src/transport_kcp.c @@ -1788,83 +1788,6 @@ int kcp_conn_local_addr(const kcp_conn_t *conn, struct sockaddr_storage *addr, s return 0; } -int kcp_conn_metrics_snapshot(kcp_conn_t *conn, kcp_conn_metrics_t *out_metrics) { - struct sockaddr_storage local_addr; - socklen_t local_len = sizeof(local_addr); - - if (conn == NULL || out_metrics == NULL) { - errno = EINVAL; - return -1; - } - - memset(out_metrics, 0, sizeof(*out_metrics)); - out_metrics->connected = !atomic_load(&conn->closed); - - if (conn->sock_state != NULL && !conn->socket_closed && - getsockname(conn->sock_state->fd, (struct sockaddr *) &local_addr, &local_len) == 0) { - omni_sockaddr_to_string( - (const struct sockaddr *) &local_addr, - local_len, - out_metrics->local_addr, - sizeof(out_metrics->local_addr) - ); - } - if (conn->remote_addr_len > 0) { - omni_sockaddr_to_string( - (const struct sockaddr *) &conn->remote_addr, - conn->remote_addr_len, - out_metrics->remote_addr, - sizeof(out_metrics->remote_addr) - ); - } - - if (conn->process_sampler != NULL) { - out_metrics->bytes_sent = atomic_load_explicit(&conn->process_sampler->bytes_sent, memory_order_relaxed); - out_metrics->bytes_received = atomic_load_explicit(&conn->process_sampler->bytes_received, memory_order_relaxed); - out_metrics->in_pkts = atomic_load_explicit(&conn->process_sampler->in_pkts, memory_order_relaxed); - out_metrics->out_pkts = atomic_load_explicit(&conn->process_sampler->out_pkts, memory_order_relaxed); - out_metrics->in_segs = atomic_load_explicit(&conn->process_sampler->in_segs, memory_order_relaxed); - out_metrics->out_segs = atomic_load_explicit(&conn->process_sampler->out_segs, memory_order_relaxed); - out_metrics->in_errs = atomic_load_explicit(&conn->process_sampler->in_errs, memory_order_relaxed); - out_metrics->kcp_in_errs = atomic_load_explicit(&conn->process_sampler->kcp_in_errs, memory_order_relaxed); - } else { - out_metrics->bytes_sent = conn->pending_bytes_sent; - out_metrics->bytes_received = conn->pending_bytes_received; - out_metrics->in_pkts = conn->pending_in_pkts; - out_metrics->out_pkts = conn->pending_out_pkts; - out_metrics->in_segs = conn->pending_in_segs; - out_metrics->out_segs = conn->pending_out_segs; - out_metrics->in_errs = conn->pending_in_errs; - out_metrics->kcp_in_errs = conn->pending_kcp_in_errs; - } - - pthread_mutex_lock(&conn->kcp_mu); - if (conn->kcp != NULL) { - out_metrics->has_conv = 1; - out_metrics->conv = conn->kcp->conv; - out_metrics->rto_ms = conn->kcp->rx_rto; - out_metrics->srtt_ms = conn->kcp->rx_srtt; - out_metrics->srttvar_ms = conn->kcp->rx_rttval; - out_metrics->ring_buffer_snd_queue = conn->kcp->nsnd_que; - out_metrics->ring_buffer_rcv_queue = conn->kcp->nrcv_que; - out_metrics->ring_buffer_snd_buffer = conn->kcp->nsnd_buf; - out_metrics->fast_retrans_segs = conn->kcp->fast_retrans_xmit; - /* This KCP fork does not implement early retransmit, so the counter stays zero. */ - out_metrics->early_retrans_segs = conn->kcp->early_retrans_xmit; - out_metrics->lost_segs = conn->kcp->lost_xmit; - out_metrics->repeat_segs = conn->kcp->repeat_xmit; - out_metrics->retrans_segs = - out_metrics->fast_retrans_segs + - out_metrics->early_retrans_segs + - out_metrics->lost_segs; - } else { - out_metrics->connected = 0; - } - pthread_mutex_unlock(&conn->kcp_mu); - - return 0; -} - int kcp_conn_close(kcp_conn_t *conn) { if (conn == NULL) { return 0; diff --git a/third_party/kcp/ikcp.c b/third_party/kcp/ikcp.c index 11503ca..b6176b9 100644 --- a/third_party/kcp/ikcp.c +++ b/third_party/kcp/ikcp.c @@ -298,10 +298,6 @@ ikcpcb *ikcp_create(IUINT32 conv, void *user) kcp->fastlimit = IKCP_FASTACK_LIMIT; kcp->nocwnd = 0; kcp->xmit = 0; - kcp->fast_retrans_xmit = 0; - kcp->early_retrans_xmit = 0; - kcp->lost_xmit = 0; - kcp->repeat_xmit = 0; kcp->dead_link = IKCP_DEADLINK; kcp->output = NULL; kcp->writelog = NULL; @@ -792,7 +788,6 @@ void ikcp_parse_data(ikcpcb *kcp, IKCPSEG *newseg) } else { - kcp->repeat_xmit++; ikcp_segment_delete(kcp, newseg); } @@ -1197,7 +1192,6 @@ void ikcp_flush(ikcpcb *kcp) needsend = 1; segment->xmit++; kcp->xmit++; - kcp->lost_xmit++; if (kcp->nodelay == 0) { segment->rto += _imax_(segment->rto, (IUINT32)kcp->rx_rto); @@ -1217,7 +1211,6 @@ void ikcp_flush(ikcpcb *kcp) { needsend = 1; segment->xmit++; - kcp->fast_retrans_xmit++; segment->fastack = 0; segment->resendts = current + segment->rto; change++; diff --git a/third_party/kcp/ikcp.h b/third_party/kcp/ikcp.h index 21ac51f..5630269 100644 --- a/third_party/kcp/ikcp.h +++ b/third_party/kcp/ikcp.h @@ -300,10 +300,6 @@ struct IKCPCB IINT32 rx_rttval, rx_srtt, rx_rto, rx_minrto; IUINT32 snd_wnd, rcv_wnd, rmt_wnd, cwnd, probe; IUINT32 current, interval, ts_flush, xmit; - IUINT32 fast_retrans_xmit; - IUINT32 early_retrans_xmit; - IUINT32 lost_xmit; - IUINT32 repeat_xmit; IUINT32 nrcv_buf, nsnd_buf; IUINT32 nrcv_que, nsnd_que; IUINT32 nodelay, updated;