Compare commits
2 Commits
0ae13b428e
...
878e11e597
| Author | SHA1 | Date | |
|---|---|---|---|
| 878e11e597 | |||
| 8ab12a0d69 |
@@ -26,6 +26,8 @@
|
||||
|
||||
#define WORKER_CONTROL_FD 3
|
||||
#define WORKER_TELEMETRY_FD 4
|
||||
#define WORKER_CONTROL_FD_ENV "OMNI_WORKER_CONTROL_FD"
|
||||
#define WORKER_TELEMETRY_FD_ENV "OMNI_WORKER_TELEMETRY_FD"
|
||||
#define NUM_BUFFERS 4
|
||||
#define CLEAR(x) memset(&(x), 0, sizeof(x))
|
||||
|
||||
@@ -238,11 +240,15 @@ static void telemetry_write_kcp_metrics(runtime_state_t *runtime, const kcp_conn
|
||||
|
||||
static int load_worker_config(worker_config_t *cfg) {
|
||||
const char *server_addr = getenv(VIDEO_SERVER_ADDR_ENV);
|
||||
const char *relay_addr = env_or_default(VIDEO_RELAY_ADDR_ENV, "");
|
||||
|
||||
if (cfg == NULL) {
|
||||
errno = EINVAL;
|
||||
return -1;
|
||||
}
|
||||
if ((server_addr == NULL || server_addr[0] == '\0') && relay_addr[0] != '\0') {
|
||||
server_addr = relay_addr;
|
||||
}
|
||||
if (server_addr == NULL || server_addr[0] == '\0') {
|
||||
fprintf(stderr, "%s is required\n", VIDEO_SERVER_ADDR_ENV);
|
||||
errno = EINVAL;
|
||||
@@ -251,7 +257,7 @@ static int load_worker_config(worker_config_t *cfg) {
|
||||
|
||||
CLEAR(*cfg);
|
||||
snprintf(cfg->server_addr, sizeof(cfg->server_addr), "%s", server_addr);
|
||||
snprintf(cfg->relay_addr, sizeof(cfg->relay_addr), "%s", env_or_default(VIDEO_RELAY_ADDR_ENV, ""));
|
||||
snprintf(cfg->relay_addr, sizeof(cfg->relay_addr), "%s", relay_addr);
|
||||
snprintf(cfg->bind_ip, sizeof(cfg->bind_ip), "%s", env_or_default(VIDEO_BIND_IP_ENV, ""));
|
||||
snprintf(cfg->bind_device, sizeof(cfg->bind_device), "%s", env_or_default(VIDEO_BIND_DEVICE_ENV, ""));
|
||||
snprintf(cfg->peer_id, sizeof(cfg->peer_id), "%s", env_or_default(VIDEO_PEER_ID_ENV, "peer-b-video"));
|
||||
@@ -607,6 +613,8 @@ int main(void) {
|
||||
Buffer *buffers = NULL;
|
||||
int num_buffers = 0;
|
||||
int camera_fd = -1;
|
||||
int control_fd = env_as_int(WORKER_CONTROL_FD_ENV, WORKER_CONTROL_FD);
|
||||
int telemetry_fd = env_as_int(WORKER_TELEMETRY_FD_ENV, WORKER_TELEMETRY_FD);
|
||||
enum v4l2_buf_type stream_type = V4L2_BUF_TYPE_VIDEO_CAPTURE;
|
||||
AVCodecContext *decoder = NULL;
|
||||
AVCodecContext *encoder = NULL;
|
||||
@@ -636,8 +644,8 @@ int main(void) {
|
||||
runtime.jpeg_quality_qscale = cfg.initial_qscale;
|
||||
runtime.max_frame_bytes = cfg.initial_max_frame_bytes;
|
||||
|
||||
control_stream = fdopen(WORKER_CONTROL_FD, "r");
|
||||
telemetry_stream = fdopen(WORKER_TELEMETRY_FD, "w");
|
||||
control_stream = fdopen(control_fd, "r");
|
||||
telemetry_stream = fdopen(telemetry_fd, "w");
|
||||
if (control_stream == NULL || telemetry_stream == NULL) {
|
||||
perror("fdopen worker control/telemetry");
|
||||
goto cleanup;
|
||||
|
||||
@@ -10,10 +10,11 @@ from pathlib import Path
|
||||
import queue
|
||||
import signal
|
||||
import socketserver
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
from dataclasses import dataclass
|
||||
from datetime import UTC, datetime
|
||||
from datetime import datetime, timezone
|
||||
from http import HTTPStatus
|
||||
from http.server import BaseHTTPRequestHandler
|
||||
from typing import Any
|
||||
@@ -25,7 +26,7 @@ from .control_codec import ANALOG_EVENT_CODES, EVENT_NAME_TO_ID, make_control_pa
|
||||
|
||||
|
||||
def utc_iso_now() -> str:
|
||||
return datetime.now(UTC).isoformat(timespec="seconds").replace("+00:00", "Z")
|
||||
return datetime.now(timezone.utc).isoformat(timespec="seconds").replace("+00:00", "Z")
|
||||
|
||||
|
||||
def load_omnisocket_api():
|
||||
@@ -1043,6 +1044,15 @@ def main(argv: list[str] | None = None) -> None:
|
||||
args = parser.parse_args(argv)
|
||||
|
||||
app = ASideOmniDaemon(config_path=args.config_path)
|
||||
print(
|
||||
(
|
||||
"A-side OmniDaemon starting "
|
||||
f"(config={app._config['config_path']}, "
|
||||
f"socket={app._config['daemon']['socket_path']})"
|
||||
),
|
||||
file=sys.stderr,
|
||||
flush=True,
|
||||
)
|
||||
|
||||
def _handle_signal(_signum: int, _frame: Any) -> None:
|
||||
app.stop()
|
||||
@@ -1052,7 +1062,17 @@ def main(argv: list[str] | None = None) -> None:
|
||||
signal.signal(signal.SIGTERM, _handle_signal)
|
||||
|
||||
try:
|
||||
app.serve_forever()
|
||||
app.start()
|
||||
print(
|
||||
(
|
||||
"A-side OmniDaemon ready "
|
||||
f"(state: curl --unix-socket {app.socket_path} http://localhost/v1/state)"
|
||||
),
|
||||
file=sys.stderr,
|
||||
flush=True,
|
||||
)
|
||||
assert app._server is not None
|
||||
app._server.serve_forever()
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
finally:
|
||||
|
||||
@@ -5,7 +5,7 @@ from __future__ import annotations
|
||||
import argparse
|
||||
import copy
|
||||
from dataclasses import dataclass
|
||||
from datetime import UTC, datetime
|
||||
from datetime import datetime, timezone
|
||||
from http import HTTPStatus
|
||||
from http.server import BaseHTTPRequestHandler
|
||||
import json
|
||||
@@ -16,6 +16,7 @@ import signal
|
||||
import socket
|
||||
import socketserver
|
||||
import subprocess
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
from typing import Any
|
||||
@@ -34,7 +35,7 @@ from . import (
|
||||
|
||||
|
||||
def utc_iso_now() -> str:
|
||||
return datetime.now(UTC).isoformat(timespec="seconds").replace("+00:00", "Z")
|
||||
return datetime.now(timezone.utc).isoformat(timespec="seconds").replace("+00:00", "Z")
|
||||
|
||||
|
||||
def load_omnisocket_api():
|
||||
@@ -540,6 +541,16 @@ class VideoWorkerManager:
|
||||
def start(self) -> None:
|
||||
if not self._enabled:
|
||||
return
|
||||
if not os.path.exists(self._video_cfg["binary_path"]):
|
||||
print(
|
||||
(
|
||||
"B-side video worker binary missing: "
|
||||
f"{self._video_cfg['binary_path']} "
|
||||
"(run `make b_side_video_sender` in OmniSocketGo)"
|
||||
),
|
||||
file=sys.stderr,
|
||||
flush=True,
|
||||
)
|
||||
self._thread.start()
|
||||
|
||||
def stop(self) -> None:
|
||||
@@ -650,16 +661,6 @@ class VideoWorkerManager:
|
||||
command_read_fd, command_write_fd = os.pipe()
|
||||
telemetry_read_fd, telemetry_write_fd = os.pipe()
|
||||
|
||||
def _preexec() -> None:
|
||||
os.dup2(command_read_fd, 3)
|
||||
os.dup2(telemetry_write_fd, 4)
|
||||
for fd in (command_read_fd, command_write_fd, telemetry_read_fd, telemetry_write_fd):
|
||||
if fd not in (3, 4):
|
||||
try:
|
||||
os.close(fd)
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
env = dict(os.environ)
|
||||
env.update(
|
||||
{
|
||||
@@ -675,6 +676,8 @@ class VideoWorkerManager:
|
||||
"OMNI_VIDEO_OUTPUT_WIDTH": str(self._video_cfg["output_width"]),
|
||||
"OMNI_VIDEO_OUTPUT_HEIGHT": str(self._video_cfg["output_height"]),
|
||||
"OMNI_VIDEO_STATS_INTERVAL_MS": str(self._video_cfg["stats_interval_ms"]),
|
||||
"OMNI_WORKER_CONTROL_FD": str(command_read_fd),
|
||||
"OMNI_WORKER_TELEMETRY_FD": str(telemetry_write_fd),
|
||||
}
|
||||
)
|
||||
with self._lock:
|
||||
@@ -692,7 +695,6 @@ class VideoWorkerManager:
|
||||
env=env,
|
||||
close_fds=True,
|
||||
pass_fds=(command_read_fd, telemetry_write_fd),
|
||||
preexec_fn=_preexec,
|
||||
)
|
||||
except Exception as error: # pragma: no cover - runtime integration
|
||||
for fd in (command_read_fd, command_write_fd, telemetry_read_fd, telemetry_write_fd):
|
||||
@@ -1259,6 +1261,14 @@ class BSideOmniDaemon:
|
||||
daemon=True,
|
||||
)
|
||||
self._server_thread.start()
|
||||
print(
|
||||
(
|
||||
"B-side OmniDaemon ready "
|
||||
f"(state: curl --unix-socket {self.socket_path} http://localhost/v1/state)"
|
||||
),
|
||||
file=sys.stderr,
|
||||
flush=True,
|
||||
)
|
||||
self._server_thread.join()
|
||||
|
||||
def get_state(self) -> dict[str, Any]:
|
||||
@@ -1299,6 +1309,17 @@ def main(argv: list[str] | None = None) -> None:
|
||||
args = parser.parse_args(argv)
|
||||
|
||||
app = BSideOmniDaemon(config_path=args.config_path)
|
||||
print(
|
||||
(
|
||||
"B-side OmniDaemon starting "
|
||||
f"(config={app._config['config_path']}, "
|
||||
f"socket={app._config['daemon']['socket_path']}, "
|
||||
f"ctrl_socket={app._config['daemon']['ctrl_socket_path']}, "
|
||||
f"video_enabled={app._config['video_sender']['enabled']})"
|
||||
),
|
||||
file=sys.stderr,
|
||||
flush=True,
|
||||
)
|
||||
|
||||
def _handle_signal(_signum: int, _frame: Any) -> None:
|
||||
app.stop()
|
||||
|
||||
@@ -36,6 +36,9 @@ setup(
|
||||
name="omnisocket",
|
||||
version="0.1.0",
|
||||
packages=["omnisocket", "omnisocket_a_side", "omnisocket_b_side"],
|
||||
install_requires=[
|
||||
"PyYAML>=6.0",
|
||||
],
|
||||
entry_points={
|
||||
"console_scripts": [
|
||||
"omnisocket-a-side-daemon=omnisocket_a_side.daemon:main",
|
||||
|
||||
Reference in New Issue
Block a user