Compare commits

...

2 Commits

4 changed files with 71 additions and 19 deletions

View File

@@ -26,6 +26,8 @@
#define WORKER_CONTROL_FD 3 #define WORKER_CONTROL_FD 3
#define WORKER_TELEMETRY_FD 4 #define WORKER_TELEMETRY_FD 4
#define WORKER_CONTROL_FD_ENV "OMNI_WORKER_CONTROL_FD"
#define WORKER_TELEMETRY_FD_ENV "OMNI_WORKER_TELEMETRY_FD"
#define NUM_BUFFERS 4 #define NUM_BUFFERS 4
#define CLEAR(x) memset(&(x), 0, sizeof(x)) #define CLEAR(x) memset(&(x), 0, sizeof(x))
@@ -238,11 +240,15 @@ static void telemetry_write_kcp_metrics(runtime_state_t *runtime, const kcp_conn
static int load_worker_config(worker_config_t *cfg) { static int load_worker_config(worker_config_t *cfg) {
const char *server_addr = getenv(VIDEO_SERVER_ADDR_ENV); const char *server_addr = getenv(VIDEO_SERVER_ADDR_ENV);
const char *relay_addr = env_or_default(VIDEO_RELAY_ADDR_ENV, "");
if (cfg == NULL) { if (cfg == NULL) {
errno = EINVAL; errno = EINVAL;
return -1; return -1;
} }
if ((server_addr == NULL || server_addr[0] == '\0') && relay_addr[0] != '\0') {
server_addr = relay_addr;
}
if (server_addr == NULL || server_addr[0] == '\0') { if (server_addr == NULL || server_addr[0] == '\0') {
fprintf(stderr, "%s is required\n", VIDEO_SERVER_ADDR_ENV); fprintf(stderr, "%s is required\n", VIDEO_SERVER_ADDR_ENV);
errno = EINVAL; errno = EINVAL;
@@ -251,7 +257,7 @@ static int load_worker_config(worker_config_t *cfg) {
CLEAR(*cfg); CLEAR(*cfg);
snprintf(cfg->server_addr, sizeof(cfg->server_addr), "%s", server_addr); snprintf(cfg->server_addr, sizeof(cfg->server_addr), "%s", server_addr);
snprintf(cfg->relay_addr, sizeof(cfg->relay_addr), "%s", env_or_default(VIDEO_RELAY_ADDR_ENV, "")); snprintf(cfg->relay_addr, sizeof(cfg->relay_addr), "%s", relay_addr);
snprintf(cfg->bind_ip, sizeof(cfg->bind_ip), "%s", env_or_default(VIDEO_BIND_IP_ENV, "")); snprintf(cfg->bind_ip, sizeof(cfg->bind_ip), "%s", env_or_default(VIDEO_BIND_IP_ENV, ""));
snprintf(cfg->bind_device, sizeof(cfg->bind_device), "%s", env_or_default(VIDEO_BIND_DEVICE_ENV, "")); snprintf(cfg->bind_device, sizeof(cfg->bind_device), "%s", env_or_default(VIDEO_BIND_DEVICE_ENV, ""));
snprintf(cfg->peer_id, sizeof(cfg->peer_id), "%s", env_or_default(VIDEO_PEER_ID_ENV, "peer-b-video")); snprintf(cfg->peer_id, sizeof(cfg->peer_id), "%s", env_or_default(VIDEO_PEER_ID_ENV, "peer-b-video"));
@@ -607,6 +613,8 @@ int main(void) {
Buffer *buffers = NULL; Buffer *buffers = NULL;
int num_buffers = 0; int num_buffers = 0;
int camera_fd = -1; int camera_fd = -1;
int control_fd = env_as_int(WORKER_CONTROL_FD_ENV, WORKER_CONTROL_FD);
int telemetry_fd = env_as_int(WORKER_TELEMETRY_FD_ENV, WORKER_TELEMETRY_FD);
enum v4l2_buf_type stream_type = V4L2_BUF_TYPE_VIDEO_CAPTURE; enum v4l2_buf_type stream_type = V4L2_BUF_TYPE_VIDEO_CAPTURE;
AVCodecContext *decoder = NULL; AVCodecContext *decoder = NULL;
AVCodecContext *encoder = NULL; AVCodecContext *encoder = NULL;
@@ -636,8 +644,8 @@ int main(void) {
runtime.jpeg_quality_qscale = cfg.initial_qscale; runtime.jpeg_quality_qscale = cfg.initial_qscale;
runtime.max_frame_bytes = cfg.initial_max_frame_bytes; runtime.max_frame_bytes = cfg.initial_max_frame_bytes;
control_stream = fdopen(WORKER_CONTROL_FD, "r"); control_stream = fdopen(control_fd, "r");
telemetry_stream = fdopen(WORKER_TELEMETRY_FD, "w"); telemetry_stream = fdopen(telemetry_fd, "w");
if (control_stream == NULL || telemetry_stream == NULL) { if (control_stream == NULL || telemetry_stream == NULL) {
perror("fdopen worker control/telemetry"); perror("fdopen worker control/telemetry");
goto cleanup; goto cleanup;

View File

@@ -10,10 +10,11 @@ from pathlib import Path
import queue import queue
import signal import signal
import socketserver import socketserver
import sys
import threading import threading
import time import time
from dataclasses import dataclass from dataclasses import dataclass
from datetime import UTC, datetime from datetime import datetime, timezone
from http import HTTPStatus from http import HTTPStatus
from http.server import BaseHTTPRequestHandler from http.server import BaseHTTPRequestHandler
from typing import Any from typing import Any
@@ -25,7 +26,7 @@ from .control_codec import ANALOG_EVENT_CODES, EVENT_NAME_TO_ID, make_control_pa
def utc_iso_now() -> str: def utc_iso_now() -> str:
return datetime.now(UTC).isoformat(timespec="seconds").replace("+00:00", "Z") return datetime.now(timezone.utc).isoformat(timespec="seconds").replace("+00:00", "Z")
def load_omnisocket_api(): def load_omnisocket_api():
@@ -1043,6 +1044,15 @@ def main(argv: list[str] | None = None) -> None:
args = parser.parse_args(argv) args = parser.parse_args(argv)
app = ASideOmniDaemon(config_path=args.config_path) app = ASideOmniDaemon(config_path=args.config_path)
print(
(
"A-side OmniDaemon starting "
f"(config={app._config['config_path']}, "
f"socket={app._config['daemon']['socket_path']})"
),
file=sys.stderr,
flush=True,
)
def _handle_signal(_signum: int, _frame: Any) -> None: def _handle_signal(_signum: int, _frame: Any) -> None:
app.stop() app.stop()
@@ -1052,7 +1062,17 @@ def main(argv: list[str] | None = None) -> None:
signal.signal(signal.SIGTERM, _handle_signal) signal.signal(signal.SIGTERM, _handle_signal)
try: try:
app.serve_forever() app.start()
print(
(
"A-side OmniDaemon ready "
f"(state: curl --unix-socket {app.socket_path} http://localhost/v1/state)"
),
file=sys.stderr,
flush=True,
)
assert app._server is not None
app._server.serve_forever()
except KeyboardInterrupt: except KeyboardInterrupt:
pass pass
finally: finally:

View File

@@ -5,7 +5,7 @@ from __future__ import annotations
import argparse import argparse
import copy import copy
from dataclasses import dataclass from dataclasses import dataclass
from datetime import UTC, datetime from datetime import datetime, timezone
from http import HTTPStatus from http import HTTPStatus
from http.server import BaseHTTPRequestHandler from http.server import BaseHTTPRequestHandler
import json import json
@@ -16,6 +16,7 @@ import signal
import socket import socket
import socketserver import socketserver
import subprocess import subprocess
import sys
import threading import threading
import time import time
from typing import Any from typing import Any
@@ -34,7 +35,7 @@ from . import (
def utc_iso_now() -> str: def utc_iso_now() -> str:
return datetime.now(UTC).isoformat(timespec="seconds").replace("+00:00", "Z") return datetime.now(timezone.utc).isoformat(timespec="seconds").replace("+00:00", "Z")
def load_omnisocket_api(): def load_omnisocket_api():
@@ -540,6 +541,16 @@ class VideoWorkerManager:
def start(self) -> None: def start(self) -> None:
if not self._enabled: if not self._enabled:
return return
if not os.path.exists(self._video_cfg["binary_path"]):
print(
(
"B-side video worker binary missing: "
f"{self._video_cfg['binary_path']} "
"(run `make b_side_video_sender` in OmniSocketGo)"
),
file=sys.stderr,
flush=True,
)
self._thread.start() self._thread.start()
def stop(self) -> None: def stop(self) -> None:
@@ -650,16 +661,6 @@ class VideoWorkerManager:
command_read_fd, command_write_fd = os.pipe() command_read_fd, command_write_fd = os.pipe()
telemetry_read_fd, telemetry_write_fd = os.pipe() telemetry_read_fd, telemetry_write_fd = os.pipe()
def _preexec() -> None:
os.dup2(command_read_fd, 3)
os.dup2(telemetry_write_fd, 4)
for fd in (command_read_fd, command_write_fd, telemetry_read_fd, telemetry_write_fd):
if fd not in (3, 4):
try:
os.close(fd)
except OSError:
pass
env = dict(os.environ) env = dict(os.environ)
env.update( env.update(
{ {
@@ -675,6 +676,8 @@ class VideoWorkerManager:
"OMNI_VIDEO_OUTPUT_WIDTH": str(self._video_cfg["output_width"]), "OMNI_VIDEO_OUTPUT_WIDTH": str(self._video_cfg["output_width"]),
"OMNI_VIDEO_OUTPUT_HEIGHT": str(self._video_cfg["output_height"]), "OMNI_VIDEO_OUTPUT_HEIGHT": str(self._video_cfg["output_height"]),
"OMNI_VIDEO_STATS_INTERVAL_MS": str(self._video_cfg["stats_interval_ms"]), "OMNI_VIDEO_STATS_INTERVAL_MS": str(self._video_cfg["stats_interval_ms"]),
"OMNI_WORKER_CONTROL_FD": str(command_read_fd),
"OMNI_WORKER_TELEMETRY_FD": str(telemetry_write_fd),
} }
) )
with self._lock: with self._lock:
@@ -692,7 +695,6 @@ class VideoWorkerManager:
env=env, env=env,
close_fds=True, close_fds=True,
pass_fds=(command_read_fd, telemetry_write_fd), pass_fds=(command_read_fd, telemetry_write_fd),
preexec_fn=_preexec,
) )
except Exception as error: # pragma: no cover - runtime integration except Exception as error: # pragma: no cover - runtime integration
for fd in (command_read_fd, command_write_fd, telemetry_read_fd, telemetry_write_fd): for fd in (command_read_fd, command_write_fd, telemetry_read_fd, telemetry_write_fd):
@@ -1259,6 +1261,14 @@ class BSideOmniDaemon:
daemon=True, daemon=True,
) )
self._server_thread.start() self._server_thread.start()
print(
(
"B-side OmniDaemon ready "
f"(state: curl --unix-socket {self.socket_path} http://localhost/v1/state)"
),
file=sys.stderr,
flush=True,
)
self._server_thread.join() self._server_thread.join()
def get_state(self) -> dict[str, Any]: def get_state(self) -> dict[str, Any]:
@@ -1299,6 +1309,17 @@ def main(argv: list[str] | None = None) -> None:
args = parser.parse_args(argv) args = parser.parse_args(argv)
app = BSideOmniDaemon(config_path=args.config_path) app = BSideOmniDaemon(config_path=args.config_path)
print(
(
"B-side OmniDaemon starting "
f"(config={app._config['config_path']}, "
f"socket={app._config['daemon']['socket_path']}, "
f"ctrl_socket={app._config['daemon']['ctrl_socket_path']}, "
f"video_enabled={app._config['video_sender']['enabled']})"
),
file=sys.stderr,
flush=True,
)
def _handle_signal(_signum: int, _frame: Any) -> None: def _handle_signal(_signum: int, _frame: Any) -> None:
app.stop() app.stop()

View File

@@ -36,6 +36,9 @@ setup(
name="omnisocket", name="omnisocket",
version="0.1.0", version="0.1.0",
packages=["omnisocket", "omnisocket_a_side", "omnisocket_b_side"], packages=["omnisocket", "omnisocket_a_side", "omnisocket_b_side"],
install_requires=[
"PyYAML>=6.0",
],
entry_points={ entry_points={
"console_scripts": [ "console_scripts": [
"omnisocket-a-side-daemon=omnisocket_a_side.daemon:main", "omnisocket-a-side-daemon=omnisocket_a_side.daemon:main",