diff --git a/cmd/b_side_video_sender.c b/cmd/b_side_video_sender.c index 518350d..473260a 100644 --- a/cmd/b_side_video_sender.c +++ b/cmd/b_side_video_sender.c @@ -26,6 +26,8 @@ #define WORKER_CONTROL_FD 3 #define WORKER_TELEMETRY_FD 4 +#define WORKER_CONTROL_FD_ENV "OMNI_WORKER_CONTROL_FD" +#define WORKER_TELEMETRY_FD_ENV "OMNI_WORKER_TELEMETRY_FD" #define NUM_BUFFERS 4 #define CLEAR(x) memset(&(x), 0, sizeof(x)) @@ -238,11 +240,15 @@ static void telemetry_write_kcp_metrics(runtime_state_t *runtime, const kcp_conn static int load_worker_config(worker_config_t *cfg) { const char *server_addr = getenv(VIDEO_SERVER_ADDR_ENV); + const char *relay_addr = env_or_default(VIDEO_RELAY_ADDR_ENV, ""); if (cfg == NULL) { errno = EINVAL; return -1; } + if ((server_addr == NULL || server_addr[0] == '\0') && relay_addr[0] != '\0') { + server_addr = relay_addr; + } if (server_addr == NULL || server_addr[0] == '\0') { fprintf(stderr, "%s is required\n", VIDEO_SERVER_ADDR_ENV); errno = EINVAL; @@ -251,7 +257,7 @@ static int load_worker_config(worker_config_t *cfg) { CLEAR(*cfg); snprintf(cfg->server_addr, sizeof(cfg->server_addr), "%s", server_addr); - snprintf(cfg->relay_addr, sizeof(cfg->relay_addr), "%s", env_or_default(VIDEO_RELAY_ADDR_ENV, "")); + snprintf(cfg->relay_addr, sizeof(cfg->relay_addr), "%s", relay_addr); snprintf(cfg->bind_ip, sizeof(cfg->bind_ip), "%s", env_or_default(VIDEO_BIND_IP_ENV, "")); snprintf(cfg->bind_device, sizeof(cfg->bind_device), "%s", env_or_default(VIDEO_BIND_DEVICE_ENV, "")); snprintf(cfg->peer_id, sizeof(cfg->peer_id), "%s", env_or_default(VIDEO_PEER_ID_ENV, "peer-b-video")); @@ -607,6 +613,8 @@ int main(void) { Buffer *buffers = NULL; int num_buffers = 0; int camera_fd = -1; + int control_fd = env_as_int(WORKER_CONTROL_FD_ENV, WORKER_CONTROL_FD); + int telemetry_fd = env_as_int(WORKER_TELEMETRY_FD_ENV, WORKER_TELEMETRY_FD); enum v4l2_buf_type stream_type = V4L2_BUF_TYPE_VIDEO_CAPTURE; AVCodecContext *decoder = NULL; AVCodecContext *encoder = NULL; @@ -636,8 +644,8 @@ int main(void) { runtime.jpeg_quality_qscale = cfg.initial_qscale; runtime.max_frame_bytes = cfg.initial_max_frame_bytes; - control_stream = fdopen(WORKER_CONTROL_FD, "r"); - telemetry_stream = fdopen(WORKER_TELEMETRY_FD, "w"); + control_stream = fdopen(control_fd, "r"); + telemetry_stream = fdopen(telemetry_fd, "w"); if (control_stream == NULL || telemetry_stream == NULL) { perror("fdopen worker control/telemetry"); goto cleanup; diff --git a/python/omnisocket_a_side/daemon.py b/python/omnisocket_a_side/daemon.py index ef953fa..f9d9dae 100644 --- a/python/omnisocket_a_side/daemon.py +++ b/python/omnisocket_a_side/daemon.py @@ -10,10 +10,11 @@ from pathlib import Path import queue import signal import socketserver +import sys import threading import time from dataclasses import dataclass -from datetime import UTC, datetime +from datetime import datetime, timezone from http import HTTPStatus from http.server import BaseHTTPRequestHandler from typing import Any @@ -25,7 +26,7 @@ from .control_codec import ANALOG_EVENT_CODES, EVENT_NAME_TO_ID, make_control_pa def utc_iso_now() -> str: - return datetime.now(UTC).isoformat(timespec="seconds").replace("+00:00", "Z") + return datetime.now(timezone.utc).isoformat(timespec="seconds").replace("+00:00", "Z") def load_omnisocket_api(): @@ -1043,6 +1044,15 @@ def main(argv: list[str] | None = None) -> None: args = parser.parse_args(argv) app = ASideOmniDaemon(config_path=args.config_path) + print( + ( + "A-side OmniDaemon starting " + f"(config={app._config['config_path']}, " + f"socket={app._config['daemon']['socket_path']})" + ), + file=sys.stderr, + flush=True, + ) def _handle_signal(_signum: int, _frame: Any) -> None: app.stop() @@ -1052,7 +1062,17 @@ def main(argv: list[str] | None = None) -> None: signal.signal(signal.SIGTERM, _handle_signal) try: - app.serve_forever() + app.start() + print( + ( + "A-side OmniDaemon ready " + f"(state: curl --unix-socket {app.socket_path} http://localhost/v1/state)" + ), + file=sys.stderr, + flush=True, + ) + assert app._server is not None + app._server.serve_forever() except KeyboardInterrupt: pass finally: diff --git a/python/omnisocket_b_side/daemon.py b/python/omnisocket_b_side/daemon.py index 162786d..bd254e8 100644 --- a/python/omnisocket_b_side/daemon.py +++ b/python/omnisocket_b_side/daemon.py @@ -5,7 +5,7 @@ from __future__ import annotations import argparse import copy from dataclasses import dataclass -from datetime import UTC, datetime +from datetime import datetime, timezone from http import HTTPStatus from http.server import BaseHTTPRequestHandler import json @@ -16,6 +16,7 @@ import signal import socket import socketserver import subprocess +import sys import threading import time from typing import Any @@ -34,7 +35,7 @@ from . import ( def utc_iso_now() -> str: - return datetime.now(UTC).isoformat(timespec="seconds").replace("+00:00", "Z") + return datetime.now(timezone.utc).isoformat(timespec="seconds").replace("+00:00", "Z") def load_omnisocket_api(): @@ -540,6 +541,16 @@ class VideoWorkerManager: def start(self) -> None: if not self._enabled: return + if not os.path.exists(self._video_cfg["binary_path"]): + print( + ( + "B-side video worker binary missing: " + f"{self._video_cfg['binary_path']} " + "(run `make b_side_video_sender` in OmniSocketGo)" + ), + file=sys.stderr, + flush=True, + ) self._thread.start() def stop(self) -> None: @@ -650,16 +661,6 @@ class VideoWorkerManager: command_read_fd, command_write_fd = os.pipe() telemetry_read_fd, telemetry_write_fd = os.pipe() - def _preexec() -> None: - os.dup2(command_read_fd, 3) - os.dup2(telemetry_write_fd, 4) - for fd in (command_read_fd, command_write_fd, telemetry_read_fd, telemetry_write_fd): - if fd not in (3, 4): - try: - os.close(fd) - except OSError: - pass - env = dict(os.environ) env.update( { @@ -675,6 +676,8 @@ class VideoWorkerManager: "OMNI_VIDEO_OUTPUT_WIDTH": str(self._video_cfg["output_width"]), "OMNI_VIDEO_OUTPUT_HEIGHT": str(self._video_cfg["output_height"]), "OMNI_VIDEO_STATS_INTERVAL_MS": str(self._video_cfg["stats_interval_ms"]), + "OMNI_WORKER_CONTROL_FD": str(command_read_fd), + "OMNI_WORKER_TELEMETRY_FD": str(telemetry_write_fd), } ) with self._lock: @@ -692,7 +695,6 @@ class VideoWorkerManager: env=env, close_fds=True, pass_fds=(command_read_fd, telemetry_write_fd), - preexec_fn=_preexec, ) except Exception as error: # pragma: no cover - runtime integration for fd in (command_read_fd, command_write_fd, telemetry_read_fd, telemetry_write_fd): @@ -1259,6 +1261,14 @@ class BSideOmniDaemon: daemon=True, ) self._server_thread.start() + print( + ( + "B-side OmniDaemon ready " + f"(state: curl --unix-socket {self.socket_path} http://localhost/v1/state)" + ), + file=sys.stderr, + flush=True, + ) self._server_thread.join() def get_state(self) -> dict[str, Any]: @@ -1299,6 +1309,17 @@ def main(argv: list[str] | None = None) -> None: args = parser.parse_args(argv) app = BSideOmniDaemon(config_path=args.config_path) + print( + ( + "B-side OmniDaemon starting " + f"(config={app._config['config_path']}, " + f"socket={app._config['daemon']['socket_path']}, " + f"ctrl_socket={app._config['daemon']['ctrl_socket_path']}, " + f"video_enabled={app._config['video_sender']['enabled']})" + ), + file=sys.stderr, + flush=True, + ) def _handle_signal(_signum: int, _frame: Any) -> None: app.stop() diff --git a/python/setup.py b/python/setup.py index 234684d..c08246a 100644 --- a/python/setup.py +++ b/python/setup.py @@ -36,6 +36,9 @@ setup( name="omnisocket", version="0.1.0", packages=["omnisocket", "omnisocket_a_side", "omnisocket_b_side"], + install_requires=[ + "PyYAML>=6.0", + ], entry_points={ "console_scripts": [ "omnisocket-a-side-daemon=omnisocket_a_side.daemon:main",