Compare commits

...

5 Commits

Author SHA1 Message Date
878e11e597 Merge branch 'dev' of https://106.52.207.92:9103/limingjie/OmniSocketGo into dev 2026-04-02 17:45:00 +08:00
8ab12a0d69 fix: daemon终端无日志 2026-04-02 17:44:58 +08:00
0ae13b428e gitignore for .venv 2026-04-02 16:52:29 +08:00
nnbcccscdscdsc
0933692737 fix: start_b_side.sh 权限 2026-04-01 23:59:06 +08:00
aec42c83e4 fix: B端 relay_via 配置 2026-04-01 23:49:24 +08:00
7 changed files with 78 additions and 24 deletions

4
.gitignore vendored
View File

@@ -17,4 +17,6 @@ c/bin
/python/build
/python/omnisocket.egg-info
*.so*
*.so*
/.venv

View File

@@ -26,6 +26,8 @@
#define WORKER_CONTROL_FD 3
#define WORKER_TELEMETRY_FD 4
#define WORKER_CONTROL_FD_ENV "OMNI_WORKER_CONTROL_FD"
#define WORKER_TELEMETRY_FD_ENV "OMNI_WORKER_TELEMETRY_FD"
#define NUM_BUFFERS 4
#define CLEAR(x) memset(&(x), 0, sizeof(x))
@@ -238,11 +240,15 @@ static void telemetry_write_kcp_metrics(runtime_state_t *runtime, const kcp_conn
static int load_worker_config(worker_config_t *cfg) {
const char *server_addr = getenv(VIDEO_SERVER_ADDR_ENV);
const char *relay_addr = env_or_default(VIDEO_RELAY_ADDR_ENV, "");
if (cfg == NULL) {
errno = EINVAL;
return -1;
}
if ((server_addr == NULL || server_addr[0] == '\0') && relay_addr[0] != '\0') {
server_addr = relay_addr;
}
if (server_addr == NULL || server_addr[0] == '\0') {
fprintf(stderr, "%s is required\n", VIDEO_SERVER_ADDR_ENV);
errno = EINVAL;
@@ -251,7 +257,7 @@ static int load_worker_config(worker_config_t *cfg) {
CLEAR(*cfg);
snprintf(cfg->server_addr, sizeof(cfg->server_addr), "%s", server_addr);
snprintf(cfg->relay_addr, sizeof(cfg->relay_addr), "%s", env_or_default(VIDEO_RELAY_ADDR_ENV, ""));
snprintf(cfg->relay_addr, sizeof(cfg->relay_addr), "%s", relay_addr);
snprintf(cfg->bind_ip, sizeof(cfg->bind_ip), "%s", env_or_default(VIDEO_BIND_IP_ENV, ""));
snprintf(cfg->bind_device, sizeof(cfg->bind_device), "%s", env_or_default(VIDEO_BIND_DEVICE_ENV, ""));
snprintf(cfg->peer_id, sizeof(cfg->peer_id), "%s", env_or_default(VIDEO_PEER_ID_ENV, "peer-b-video"));
@@ -607,6 +613,8 @@ int main(void) {
Buffer *buffers = NULL;
int num_buffers = 0;
int camera_fd = -1;
int control_fd = env_as_int(WORKER_CONTROL_FD_ENV, WORKER_CONTROL_FD);
int telemetry_fd = env_as_int(WORKER_TELEMETRY_FD_ENV, WORKER_TELEMETRY_FD);
enum v4l2_buf_type stream_type = V4L2_BUF_TYPE_VIDEO_CAPTURE;
AVCodecContext *decoder = NULL;
AVCodecContext *encoder = NULL;
@@ -636,8 +644,8 @@ int main(void) {
runtime.jpeg_quality_qscale = cfg.initial_qscale;
runtime.max_frame_bytes = cfg.initial_max_frame_bytes;
control_stream = fdopen(WORKER_CONTROL_FD, "r");
telemetry_stream = fdopen(WORKER_TELEMETRY_FD, "w");
control_stream = fdopen(control_fd, "r");
telemetry_stream = fdopen(telemetry_fd, "w");
if (control_stream == NULL || telemetry_stream == NULL) {
perror("fdopen worker control/telemetry");
goto cleanup;

View File

@@ -1,6 +1,6 @@
transport:
server_addr: "81.70.156.140:10909"
relay_via: "106.55.173.235:10909"
server_addr: ""
relay_via: "81.70.156.140:10909"
bind_ip: ""
bind_device: ""
@@ -17,7 +17,7 @@ control_receiver:
queue_capacity: 256
video_sender:
enabled: false
enabled: true
peer_id: "peer-b-video"
target_peer: "peer-a-video"
binary_path: "bin/b_side_video_sender"
@@ -39,7 +39,7 @@ daemon:
worker_restart_delay_ms: 2000
policy:
mode: "auto"
mode: "manual"
health_window_ms: 2000
green_srtt_ms: 30
yellow_srtt_ms: 55

View File

@@ -10,10 +10,11 @@ from pathlib import Path
import queue
import signal
import socketserver
import sys
import threading
import time
from dataclasses import dataclass
from datetime import UTC, datetime
from datetime import datetime, timezone
from http import HTTPStatus
from http.server import BaseHTTPRequestHandler
from typing import Any
@@ -25,7 +26,7 @@ from .control_codec import ANALOG_EVENT_CODES, EVENT_NAME_TO_ID, make_control_pa
def utc_iso_now() -> str:
return datetime.now(UTC).isoformat(timespec="seconds").replace("+00:00", "Z")
return datetime.now(timezone.utc).isoformat(timespec="seconds").replace("+00:00", "Z")
def load_omnisocket_api():
@@ -1043,6 +1044,15 @@ def main(argv: list[str] | None = None) -> None:
args = parser.parse_args(argv)
app = ASideOmniDaemon(config_path=args.config_path)
print(
(
"A-side OmniDaemon starting "
f"(config={app._config['config_path']}, "
f"socket={app._config['daemon']['socket_path']})"
),
file=sys.stderr,
flush=True,
)
def _handle_signal(_signum: int, _frame: Any) -> None:
app.stop()
@@ -1052,7 +1062,17 @@ def main(argv: list[str] | None = None) -> None:
signal.signal(signal.SIGTERM, _handle_signal)
try:
app.serve_forever()
app.start()
print(
(
"A-side OmniDaemon ready "
f"(state: curl --unix-socket {app.socket_path} http://localhost/v1/state)"
),
file=sys.stderr,
flush=True,
)
assert app._server is not None
app._server.serve_forever()
except KeyboardInterrupt:
pass
finally:

View File

@@ -5,7 +5,7 @@ from __future__ import annotations
import argparse
import copy
from dataclasses import dataclass
from datetime import UTC, datetime
from datetime import datetime, timezone
from http import HTTPStatus
from http.server import BaseHTTPRequestHandler
import json
@@ -16,6 +16,7 @@ import signal
import socket
import socketserver
import subprocess
import sys
import threading
import time
from typing import Any
@@ -34,7 +35,7 @@ from . import (
def utc_iso_now() -> str:
return datetime.now(UTC).isoformat(timespec="seconds").replace("+00:00", "Z")
return datetime.now(timezone.utc).isoformat(timespec="seconds").replace("+00:00", "Z")
def load_omnisocket_api():
@@ -540,6 +541,16 @@ class VideoWorkerManager:
def start(self) -> None:
if not self._enabled:
return
if not os.path.exists(self._video_cfg["binary_path"]):
print(
(
"B-side video worker binary missing: "
f"{self._video_cfg['binary_path']} "
"(run `make b_side_video_sender` in OmniSocketGo)"
),
file=sys.stderr,
flush=True,
)
self._thread.start()
def stop(self) -> None:
@@ -650,16 +661,6 @@ class VideoWorkerManager:
command_read_fd, command_write_fd = os.pipe()
telemetry_read_fd, telemetry_write_fd = os.pipe()
def _preexec() -> None:
os.dup2(command_read_fd, 3)
os.dup2(telemetry_write_fd, 4)
for fd in (command_read_fd, command_write_fd, telemetry_read_fd, telemetry_write_fd):
if fd not in (3, 4):
try:
os.close(fd)
except OSError:
pass
env = dict(os.environ)
env.update(
{
@@ -675,6 +676,8 @@ class VideoWorkerManager:
"OMNI_VIDEO_OUTPUT_WIDTH": str(self._video_cfg["output_width"]),
"OMNI_VIDEO_OUTPUT_HEIGHT": str(self._video_cfg["output_height"]),
"OMNI_VIDEO_STATS_INTERVAL_MS": str(self._video_cfg["stats_interval_ms"]),
"OMNI_WORKER_CONTROL_FD": str(command_read_fd),
"OMNI_WORKER_TELEMETRY_FD": str(telemetry_write_fd),
}
)
with self._lock:
@@ -692,7 +695,6 @@ class VideoWorkerManager:
env=env,
close_fds=True,
pass_fds=(command_read_fd, telemetry_write_fd),
preexec_fn=_preexec,
)
except Exception as error: # pragma: no cover - runtime integration
for fd in (command_read_fd, command_write_fd, telemetry_read_fd, telemetry_write_fd):
@@ -1259,6 +1261,14 @@ class BSideOmniDaemon:
daemon=True,
)
self._server_thread.start()
print(
(
"B-side OmniDaemon ready "
f"(state: curl --unix-socket {self.socket_path} http://localhost/v1/state)"
),
file=sys.stderr,
flush=True,
)
self._server_thread.join()
def get_state(self) -> dict[str, Any]:
@@ -1299,6 +1309,17 @@ def main(argv: list[str] | None = None) -> None:
args = parser.parse_args(argv)
app = BSideOmniDaemon(config_path=args.config_path)
print(
(
"B-side OmniDaemon starting "
f"(config={app._config['config_path']}, "
f"socket={app._config['daemon']['socket_path']}, "
f"ctrl_socket={app._config['daemon']['ctrl_socket_path']}, "
f"video_enabled={app._config['video_sender']['enabled']})"
),
file=sys.stderr,
flush=True,
)
def _handle_signal(_signum: int, _frame: Any) -> None:
app.stop()

View File

@@ -36,6 +36,9 @@ setup(
name="omnisocket",
version="0.1.0",
packages=["omnisocket", "omnisocket_a_side", "omnisocket_b_side"],
install_requires=[
"PyYAML>=6.0",
],
entry_points={
"console_scripts": [
"omnisocket-a-side-daemon=omnisocket_a_side.daemon:main",

0
scripts/start_b_side.sh Normal file → Executable file
View File