From 583bcf120df337209dcaeb36f899086312ef29b7 Mon Sep 17 00:00:00 2001 From: Mock Date: Wed, 1 Apr 2026 21:01:47 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=8A=8A=20B=20=E7=AB=AF=E7=9A=84=20?= =?UTF-8?q?=E8=A7=86=E9=A2=91/=E6=8E=A7=E5=88=B6=20=E9=83=BD=E6=94=B6?= =?UTF-8?q?=E5=8F=A3=E5=88=B0=E4=B8=80=E4=B8=AA=E6=9C=AC=E5=9C=B0=20daemon?= =?UTF-8?q?=20=E8=BF=9B=E7=A8=8B=E9=87=8C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Makefile | 21 +- cmd/b_side_video_sender.c | 860 +++++++++++++++++ config/b_side_omnidaemon.yaml | 58 ++ python/omnisocket_b_side/__init__.py | 10 + python/omnisocket_b_side/__main__.py | 5 + python/omnisocket_b_side/client.py | 54 ++ python/omnisocket_b_side/daemon.py | 1318 ++++++++++++++++++++++++++ python/setup.py | 3 +- scripts/start_b_side.sh | 11 + 9 files changed, 2338 insertions(+), 2 deletions(-) create mode 100644 cmd/b_side_video_sender.c create mode 100644 config/b_side_omnidaemon.yaml create mode 100644 python/omnisocket_b_side/__init__.py create mode 100644 python/omnisocket_b_side/__main__.py create mode 100644 python/omnisocket_b_side/client.py create mode 100644 python/omnisocket_b_side/daemon.py create mode 100644 scripts/start_b_side.sh diff --git a/Makefile b/Makefile index 39cf485..6c4257e 100644 --- a/Makefile +++ b/Makefile @@ -50,6 +50,20 @@ CAMERA_VIDEO_SENDER_SRCS := \ third_party/cjson/cJSON.c \ third_party/kcp/ikcp.c +B_SIDE_VIDEO_SENDER := $(BIN_DIR)/b_side_video_sender +B_SIDE_VIDEO_SENDER_SRCS := \ + $(CMD_DIR)/b_side_video_sender.c \ + $(SRC_DIR)/omni_common.c \ + $(SRC_DIR)/protocol.c \ + $(SRC_DIR)/latencylog.c \ + $(SRC_DIR)/kcp_packet_debug.c \ + $(SRC_DIR)/kcp_session_stats.c \ + $(SRC_DIR)/linux_timestamping.c \ + $(SRC_DIR)/transport_kcp.c \ + $(SRC_DIR)/peer_kcp_client.c \ + third_party/cjson/cJSON.c \ + third_party/kcp/ikcp.c + all: $(TARGETS) $(BIN_DIR): @@ -81,6 +95,11 @@ $(CAMERA_VIDEO_SENDER): $(CAMERA_VIDEO_SENDER_SRCS) | $(BIN_DIR) camera_video_sender: $(CAMERA_VIDEO_SENDER) +$(B_SIDE_VIDEO_SENDER): $(B_SIDE_VIDEO_SENDER_SRCS) | $(BIN_DIR) + $(CC) $(CFLAGS) $(CPPFLAGS) $$(pkg-config --cflags libavformat libavcodec libavutil libswscale) -o $@ $^ $(LDFLAGS) $$(pkg-config --libs libavformat libavcodec libavutil libswscale) + +b_side_video_sender: $(B_SIDE_VIDEO_SENDER) + clean: rm -rf $(BIN_DIR) @@ -90,4 +109,4 @@ python-ext: python-install: cd python && $(PYTHON) -m pip install -e . -.PHONY: all clean python-ext python-install camera_video_sender +.PHONY: all clean python-ext python-install camera_video_sender b_side_video_sender diff --git a/cmd/b_side_video_sender.c b/cmd/b_side_video_sender.c new file mode 100644 index 0000000..518350d --- /dev/null +++ b/cmd/b_side_video_sender.c @@ -0,0 +1,860 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#include "cJSON.h" +#include "peer_kcp_client.h" + +#define WORKER_CONTROL_FD 3 +#define WORKER_TELEMETRY_FD 4 +#define NUM_BUFFERS 4 +#define CLEAR(x) memset(&(x), 0, sizeof(x)) + +#define VIDEO_SERVER_ADDR_ENV "OMNI_VIDEO_SERVER_ADDR" +#define VIDEO_RELAY_ADDR_ENV "OMNI_VIDEO_RELAY_VIA" +#define VIDEO_BIND_IP_ENV "OMNI_VIDEO_BIND_IP" +#define VIDEO_BIND_DEVICE_ENV "OMNI_VIDEO_BIND_DEVICE" +#define VIDEO_PEER_ID_ENV "OMNI_VIDEO_PEER_ID" +#define VIDEO_TARGET_PEER_ENV "OMNI_VIDEO_TARGET_PEER" +#define VIDEO_DEVICE_ENV "OMNI_VIDEO_DEVICE" +#define VIDEO_CAPTURE_WIDTH_ENV "OMNI_VIDEO_CAPTURE_WIDTH" +#define VIDEO_CAPTURE_HEIGHT_ENV "OMNI_VIDEO_CAPTURE_HEIGHT" +#define VIDEO_OUTPUT_WIDTH_ENV "OMNI_VIDEO_OUTPUT_WIDTH" +#define VIDEO_OUTPUT_HEIGHT_ENV "OMNI_VIDEO_OUTPUT_HEIGHT" +#define VIDEO_FPS_ENV "OMNI_VIDEO_FPS" +#define VIDEO_JPEG_QSCALE_ENV "OMNI_VIDEO_JPEG_QSCALE" +#define VIDEO_MAX_FRAME_BYTES_ENV "OMNI_VIDEO_MAX_FRAME_BYTES" +#define VIDEO_STATS_INTERVAL_ENV "OMNI_VIDEO_STATS_INTERVAL_MS" + +typedef struct { + void *start; + size_t length; +} Buffer; + +typedef struct { + char server_addr[OMNI_MAX_ADDR_TEXT]; + char relay_addr[OMNI_MAX_ADDR_TEXT]; + char bind_ip[OMNI_MAX_ADDR_TEXT]; + char bind_device[128]; + char peer_id[OMNI_MAX_PEER_ID]; + char target_peer[OMNI_MAX_PEER_ID]; + char video_device[256]; + int capture_width; + int capture_height; + int output_width; + int output_height; + int initial_fps; + int initial_qscale; + int initial_max_frame_bytes; + int stats_interval_ms; +} worker_config_t; + +typedef struct { + pthread_mutex_t lock; + pthread_mutex_t telemetry_lock; + FILE *telemetry_stream; + int target_fps; + int jpeg_quality_qscale; + int max_frame_bytes; + bool shutdown_requested; +} runtime_state_t; + +typedef struct { + kcp_client_t *client; + char target_peer[OMNI_MAX_PEER_ID]; +} video_sender_t; + +typedef struct { + FILE *control_stream; + runtime_state_t *runtime; +} control_thread_args_t; + +static volatile sig_atomic_t g_signal_stop = 0; + +static double monotonic_ms(void) { + struct timespec ts; + clock_gettime(CLOCK_MONOTONIC, &ts); + return (double) ts.tv_sec * 1000.0 + (double) ts.tv_nsec / 1000000.0; +} + +static int64_t realtime_ms(void) { + struct timespec ts; + clock_gettime(CLOCK_REALTIME, &ts); + return (int64_t) ts.tv_sec * 1000 + (int64_t) ts.tv_nsec / 1000000; +} + +static int env_as_int(const char *name, int fallback) { + const char *raw = getenv(name); + char *endptr = NULL; + long value; + + if (raw == NULL || raw[0] == '\0') { + return fallback; + } + errno = 0; + value = strtol(raw, &endptr, 10); + if (errno != 0 || endptr == raw || (endptr != NULL && *endptr != '\0')) { + return fallback; + } + return (int) value; +} + +static const char *env_or_default(const char *name, const char *fallback) { + const char *value = getenv(name); + if (value != NULL && value[0] != '\0') { + return value; + } + return fallback; +} + +static void handle_signal(int signum) { + (void) signum; + g_signal_stop = 1; +} + +static void runtime_set_shutdown(runtime_state_t *runtime, bool shutdown_requested) { + pthread_mutex_lock(&runtime->lock); + runtime->shutdown_requested = shutdown_requested; + pthread_mutex_unlock(&runtime->lock); +} + +static bool runtime_should_stop(runtime_state_t *runtime) { + bool shutdown_requested; + + pthread_mutex_lock(&runtime->lock); + shutdown_requested = runtime->shutdown_requested; + pthread_mutex_unlock(&runtime->lock); + return g_signal_stop != 0 || shutdown_requested; +} + +static void runtime_get_profile(runtime_state_t *runtime, int *fps, int *qscale, int *max_frame_bytes) { + pthread_mutex_lock(&runtime->lock); + *fps = runtime->target_fps; + *qscale = runtime->jpeg_quality_qscale; + *max_frame_bytes = runtime->max_frame_bytes; + pthread_mutex_unlock(&runtime->lock); +} + +static void runtime_set_profile(runtime_state_t *runtime, int fps, int qscale, int max_frame_bytes) { + pthread_mutex_lock(&runtime->lock); + if (fps > 0) { + runtime->target_fps = fps; + } + if (qscale > 0) { + runtime->jpeg_quality_qscale = qscale; + } + if (max_frame_bytes > 0) { + runtime->max_frame_bytes = max_frame_bytes; + } + pthread_mutex_unlock(&runtime->lock); +} + +static void telemetry_write_line(runtime_state_t *runtime, const char *line) { + pthread_mutex_lock(&runtime->telemetry_lock); + if (runtime->telemetry_stream != NULL) { + fputs(line, runtime->telemetry_stream); + fputc('\n', runtime->telemetry_stream); + fflush(runtime->telemetry_stream); + } + pthread_mutex_unlock(&runtime->telemetry_lock); +} + +static void telemetry_write_frame_stat( + runtime_state_t *runtime, + int frame_bytes, + int encode_us, + bool sent, + const char *drop_reason +) { + char line[512]; + int written; + + written = snprintf( + line, + sizeof(line), + "{\"type\":\"frame_stat\",\"frame_bytes\":%d,\"encode_us\":%d," + "\"sent\":%s,\"drop_reason\":\"%s\",\"ts_unix_ms\":%" PRId64 "}", + frame_bytes, + encode_us, + sent ? "true" : "false", + drop_reason != NULL ? drop_reason : "", + realtime_ms()); + if (written > 0 && written < (int) sizeof(line)) { + telemetry_write_line(runtime, line); + } +} + +static void telemetry_write_kcp_metrics(runtime_state_t *runtime, const kcp_conn_metrics_t *metrics) { + char line[1024]; + int written; + + written = snprintf( + line, + sizeof(line), + "{\"type\":\"kcp_metrics\",\"connected\":%d,\"srtt_ms\":%d,\"srttvar_ms\":%d," + "\"rto_ms\":%u,\"bytes_sent\":%" PRIu64 ",\"bytes_received\":%" PRIu64 "," + "\"out_pkts\":%" PRIu64 ",\"out_segs\":%" PRIu64 ",\"retrans_segs\":%" PRIu64 "," + "\"fast_retrans_segs\":%" PRIu64 ",\"early_retrans_segs\":%" PRIu64 "," + "\"lost_segs\":%" PRIu64 ",\"ring_buffer_snd_queue\":%" PRIu64 "," + "\"ring_buffer_snd_buffer\":%" PRIu64 ",\"ts_unix_ms\":%" PRId64 "}", + metrics->connected, + metrics->srtt_ms, + metrics->srttvar_ms, + metrics->rto_ms, + metrics->bytes_sent, + metrics->bytes_received, + metrics->out_pkts, + metrics->out_segs, + metrics->retrans_segs, + metrics->fast_retrans_segs, + metrics->early_retrans_segs, + metrics->lost_segs, + metrics->ring_buffer_snd_queue, + metrics->ring_buffer_snd_buffer, + realtime_ms()); + if (written > 0 && written < (int) sizeof(line)) { + telemetry_write_line(runtime, line); + } +} + +static int load_worker_config(worker_config_t *cfg) { + const char *server_addr = getenv(VIDEO_SERVER_ADDR_ENV); + + if (cfg == NULL) { + errno = EINVAL; + return -1; + } + if (server_addr == NULL || server_addr[0] == '\0') { + fprintf(stderr, "%s is required\n", VIDEO_SERVER_ADDR_ENV); + errno = EINVAL; + return -1; + } + + CLEAR(*cfg); + snprintf(cfg->server_addr, sizeof(cfg->server_addr), "%s", server_addr); + snprintf(cfg->relay_addr, sizeof(cfg->relay_addr), "%s", env_or_default(VIDEO_RELAY_ADDR_ENV, "")); + snprintf(cfg->bind_ip, sizeof(cfg->bind_ip), "%s", env_or_default(VIDEO_BIND_IP_ENV, "")); + snprintf(cfg->bind_device, sizeof(cfg->bind_device), "%s", env_or_default(VIDEO_BIND_DEVICE_ENV, "")); + snprintf(cfg->peer_id, sizeof(cfg->peer_id), "%s", env_or_default(VIDEO_PEER_ID_ENV, "peer-b-video")); + snprintf(cfg->target_peer, sizeof(cfg->target_peer), "%s", env_or_default(VIDEO_TARGET_PEER_ENV, "peer-a-video")); + snprintf(cfg->video_device, sizeof(cfg->video_device), "%s", env_or_default(VIDEO_DEVICE_ENV, "/dev/video0")); + + cfg->capture_width = env_as_int(VIDEO_CAPTURE_WIDTH_ENV, 1280); + cfg->capture_height = env_as_int(VIDEO_CAPTURE_HEIGHT_ENV, 720); + cfg->output_width = env_as_int(VIDEO_OUTPUT_WIDTH_ENV, 640); + cfg->output_height = env_as_int(VIDEO_OUTPUT_HEIGHT_ENV, 360); + cfg->initial_fps = env_as_int(VIDEO_FPS_ENV, 10); + cfg->initial_qscale = env_as_int(VIDEO_JPEG_QSCALE_ENV, 8); + cfg->initial_max_frame_bytes = env_as_int(VIDEO_MAX_FRAME_BYTES_ENV, 40960); + cfg->stats_interval_ms = env_as_int(VIDEO_STATS_INTERVAL_ENV, 100); + return 0; +} + +static int open_v4l2_device(const char *device) { + int fd = open(device, O_RDWR | O_NONBLOCK); + if (fd < 0) { + perror("open device"); + } + return fd; +} + +static int init_v4l2_device(int fd, const worker_config_t *cfg) { + struct v4l2_format fmt; + + CLEAR(fmt); + fmt.type = V4L2_BUF_TYPE_VIDEO_CAPTURE; + fmt.fmt.pix.width = cfg->capture_width; + fmt.fmt.pix.height = cfg->capture_height; + fmt.fmt.pix.pixelformat = V4L2_PIX_FMT_MJPEG; + fmt.fmt.pix.field = V4L2_FIELD_NONE; + + if (ioctl(fd, VIDIOC_S_FMT, &fmt) < 0) { + perror("VIDIOC_S_FMT"); + return -1; + } + return 0; +} + +static int init_mmap(int fd, Buffer **buffers, int *num_buffers) { + struct v4l2_requestbuffers req; + int index; + + CLEAR(req); + req.count = NUM_BUFFERS; + req.type = V4L2_BUF_TYPE_VIDEO_CAPTURE; + req.memory = V4L2_MEMORY_MMAP; + + if (ioctl(fd, VIDIOC_REQBUFS, &req) < 0) { + perror("VIDIOC_REQBUFS"); + return -1; + } + + *num_buffers = (int) req.count; + *buffers = (Buffer *) calloc(req.count, sizeof(Buffer)); + if (*buffers == NULL) { + return -1; + } + + for (index = 0; index < (int) req.count; ++index) { + struct v4l2_buffer buf; + CLEAR(buf); + buf.type = V4L2_BUF_TYPE_VIDEO_CAPTURE; + buf.memory = V4L2_MEMORY_MMAP; + buf.index = (unsigned int) index; + + if (ioctl(fd, VIDIOC_QUERYBUF, &buf) < 0) { + perror("VIDIOC_QUERYBUF"); + return -1; + } + + (*buffers)[index].length = buf.length; + (*buffers)[index].start = mmap(NULL, buf.length, PROT_READ | PROT_WRITE, MAP_SHARED, fd, buf.m.offset); + if ((*buffers)[index].start == MAP_FAILED) { + perror("mmap"); + return -1; + } + } + + return 0; +} + +static int queue_all_buffers(int fd, int num_buffers) { + int index; + for (index = 0; index < num_buffers; ++index) { + struct v4l2_buffer buf; + CLEAR(buf); + buf.type = V4L2_BUF_TYPE_VIDEO_CAPTURE; + buf.memory = V4L2_MEMORY_MMAP; + buf.index = (unsigned int) index; + if (ioctl(fd, VIDIOC_QBUF, &buf) < 0) { + perror("VIDIOC_QBUF"); + return -1; + } + } + return 0; +} + +static AVCodecContext *create_mjpeg_decoder(const worker_config_t *cfg) { + const AVCodec *decoder = avcodec_find_decoder(AV_CODEC_ID_MJPEG); + AVCodecContext *ctx; + + if (decoder == NULL) { + fprintf(stderr, "MJPEG decoder not found\n"); + return NULL; + } + ctx = avcodec_alloc_context3(decoder); + if (ctx == NULL) { + return NULL; + } + + ctx->width = cfg->capture_width; + ctx->height = cfg->capture_height; + ctx->pix_fmt = AV_PIX_FMT_YUVJ420P; + ctx->color_range = AVCOL_RANGE_JPEG; + ctx->thread_count = 1; + if (avcodec_open2(ctx, decoder, NULL) < 0) { + avcodec_free_context(&ctx); + return NULL; + } + return ctx; +} + +static AVCodecContext *create_mjpeg_encoder(const worker_config_t *cfg) { + const AVCodec *encoder = avcodec_find_encoder(AV_CODEC_ID_MJPEG); + AVCodecContext *ctx; + + if (encoder == NULL) { + fprintf(stderr, "MJPEG encoder not found\n"); + return NULL; + } + ctx = avcodec_alloc_context3(encoder); + if (ctx == NULL) { + return NULL; + } + + ctx->width = cfg->output_width; + ctx->height = cfg->output_height; + ctx->pix_fmt = AV_PIX_FMT_YUVJ420P; + ctx->time_base = (AVRational) {1, cfg->initial_fps > 0 ? cfg->initial_fps : 10}; + ctx->qmin = 1; + ctx->qmax = 31; + ctx->flags |= AV_CODEC_FLAG_QSCALE; + ctx->global_quality = FF_QP2LAMBDA * cfg->initial_qscale; + if (avcodec_open2(ctx, encoder, NULL) < 0) { + avcodec_free_context(&ctx); + return NULL; + } + return ctx; +} + +static int decode_mjpeg_frame(AVCodecContext *decoder, const uint8_t *data, int size, AVFrame **frame) { + AVPacket *pkt; + int ret; + + *frame = NULL; + pkt = av_packet_alloc(); + if (pkt == NULL) { + return -1; + } + + pkt->data = (uint8_t *) data; + pkt->size = size; + ret = avcodec_send_packet(decoder, pkt); + if (ret < 0) { + av_packet_free(&pkt); + return -1; + } + + *frame = av_frame_alloc(); + if (*frame == NULL) { + av_packet_free(&pkt); + return -1; + } + ret = avcodec_receive_frame(decoder, *frame); + av_packet_free(&pkt); + if (ret < 0) { + av_frame_free(frame); + return -1; + } + return 0; +} + +static struct SwsContext *create_scaler(AVFrame *src, const worker_config_t *cfg) { + return sws_getContext( + src->width, + src->height, + src->format, + cfg->output_width, + cfg->output_height, + AV_PIX_FMT_YUVJ420P, + SWS_BILINEAR, + NULL, + NULL, + NULL); +} + +static int scale_frame(struct SwsContext *sws_ctx, const worker_config_t *cfg, AVFrame *src, AVFrame **dst) { + int ret; + + if (sws_ctx == NULL) { + return -1; + } + + *dst = av_frame_alloc(); + if (*dst == NULL) { + return -1; + } + + (*dst)->width = cfg->output_width; + (*dst)->height = cfg->output_height; + (*dst)->format = AV_PIX_FMT_YUVJ420P; + if (av_frame_get_buffer(*dst, 0) < 0) { + av_frame_free(dst); + return -1; + } + + ret = sws_scale( + sws_ctx, + (const uint8_t *const *) src->data, + src->linesize, + 0, + src->height, + (*dst)->data, + (*dst)->linesize); + if (ret < 0) { + av_frame_free(dst); + return -1; + } + return 0; +} + +static int encode_frame(AVCodecContext *encoder, AVFrame *frame, int qscale, AVPacket **pkt) { + int ret; + + *pkt = av_packet_alloc(); + if (*pkt == NULL) { + return -1; + } + + encoder->global_quality = FF_QP2LAMBDA * qscale; + frame->quality = encoder->global_quality; + ret = avcodec_send_frame(encoder, frame); + if (ret < 0) { + av_packet_free(pkt); + return -1; + } + ret = avcodec_receive_packet(encoder, *pkt); + if (ret < 0) { + av_packet_free(pkt); + return -1; + } + return 0; +} + +static int video_sender_init(video_sender_t *sender, const worker_config_t *cfg) { + kcp_conn_options_t options; + + if (sender == NULL) { + errno = EINVAL; + return -1; + } + + CLEAR(*sender); + snprintf(sender->target_peer, sizeof(sender->target_peer), "%s", cfg->target_peer); + kcp_conn_options_set_video_defaults(&options); + sender->client = kcp_client_dial_with_options( + cfg->server_addr, + cfg->relay_addr, + cfg->peer_id, + cfg->bind_ip, + cfg->bind_device, + &options, + NULL, + NULL, + NULL, + cfg->stats_interval_ms); + if (sender->client == NULL) { + return -1; + } + + fprintf(stderr, "B-side video worker connected as %s -> %s\n", kcp_client_id(sender->client), sender->target_peer); + return 0; +} + +static int video_sender_send_packet(video_sender_t *sender, const AVPacket *encoded_pkt) { + if (sender == NULL || sender->client == NULL || encoded_pkt == NULL) { + errno = EINVAL; + return -1; + } + return kcp_client_send_binary(sender->client, sender->target_peer, encoded_pkt->data, (size_t) encoded_pkt->size); +} + +static void video_sender_close(video_sender_t *sender) { + if (sender == NULL || sender->client == NULL) { + return; + } + kcp_client_close(sender->client); + kcp_client_free(sender->client); + sender->client = NULL; +} + +static void *control_thread_main(void *opaque) { + control_thread_args_t *args = (control_thread_args_t *) opaque; + char line[512]; + + while (fgets(line, sizeof(line), args->control_stream) != NULL) { + cJSON *root = cJSON_Parse(line); + cJSON *type_item; + + if (root == NULL) { + continue; + } + type_item = cJSON_GetObjectItemCaseSensitive(root, "type"); + if (!cJSON_IsString(type_item) || type_item->valuestring == NULL) { + cJSON_Delete(root); + continue; + } + + if (strcmp(type_item->valuestring, "shutdown") == 0) { + runtime_set_shutdown(args->runtime, true); + cJSON_Delete(root); + return NULL; + } + if (strcmp(type_item->valuestring, "set_profile") == 0) { + cJSON *fps = cJSON_GetObjectItemCaseSensitive(root, "fps"); + cJSON *qscale = cJSON_GetObjectItemCaseSensitive(root, "jpeg_quality_qscale"); + cJSON *max_frame_bytes = cJSON_GetObjectItemCaseSensitive(root, "max_frame_bytes"); + runtime_set_profile( + args->runtime, + cJSON_IsNumber(fps) ? fps->valueint : -1, + cJSON_IsNumber(qscale) ? qscale->valueint : -1, + cJSON_IsNumber(max_frame_bytes) ? max_frame_bytes->valueint : -1); + } + cJSON_Delete(root); + } + + runtime_set_shutdown(args->runtime, true); + return NULL; +} + +int main(void) { + worker_config_t cfg; + runtime_state_t runtime; + video_sender_t sender; + control_thread_args_t control_args; + pthread_t control_thread; + FILE *control_stream = NULL; + FILE *telemetry_stream = NULL; + Buffer *buffers = NULL; + int num_buffers = 0; + int camera_fd = -1; + enum v4l2_buf_type stream_type = V4L2_BUF_TYPE_VIDEO_CAPTURE; + AVCodecContext *decoder = NULL; + AVCodecContext *encoder = NULL; + struct SwsContext *sws_ctx = NULL; + double next_deadline_ms = 0.0; + double next_metrics_ms = 0.0; + int exit_code = 1; + int index; + int sws_src_width = 0; + int sws_src_height = 0; + int sws_src_format = -1; + bool control_thread_started = false; + + av_log_set_level(AV_LOG_ERROR); + signal(SIGINT, handle_signal); + signal(SIGTERM, handle_signal); + CLEAR(sender); + + if (load_worker_config(&cfg) != 0) { + return 1; + } + + CLEAR(runtime); + pthread_mutex_init(&runtime.lock, NULL); + pthread_mutex_init(&runtime.telemetry_lock, NULL); + runtime.target_fps = cfg.initial_fps; + runtime.jpeg_quality_qscale = cfg.initial_qscale; + runtime.max_frame_bytes = cfg.initial_max_frame_bytes; + + control_stream = fdopen(WORKER_CONTROL_FD, "r"); + telemetry_stream = fdopen(WORKER_TELEMETRY_FD, "w"); + if (control_stream == NULL || telemetry_stream == NULL) { + perror("fdopen worker control/telemetry"); + goto cleanup; + } + setvbuf(telemetry_stream, NULL, _IOLBF, 0); + runtime.telemetry_stream = telemetry_stream; + + control_args.control_stream = control_stream; + control_args.runtime = &runtime; + if (pthread_create(&control_thread, NULL, control_thread_main, &control_args) != 0) { + perror("pthread_create"); + goto cleanup; + } + control_thread_started = true; + + camera_fd = open_v4l2_device(cfg.video_device); + if (camera_fd < 0) { + goto cleanup_join_thread; + } + if (init_v4l2_device(camera_fd, &cfg) != 0) { + goto cleanup_join_thread; + } + if (init_mmap(camera_fd, &buffers, &num_buffers) != 0) { + goto cleanup_join_thread; + } + if (queue_all_buffers(camera_fd, num_buffers) != 0) { + goto cleanup_join_thread; + } + if (ioctl(camera_fd, VIDIOC_STREAMON, &stream_type) < 0) { + perror("VIDIOC_STREAMON"); + goto cleanup_join_thread; + } + + decoder = create_mjpeg_decoder(&cfg); + encoder = create_mjpeg_encoder(&cfg); + if (decoder == NULL || encoder == NULL) { + fprintf(stderr, "failed to create codecs\n"); + goto cleanup_join_thread; + } + if (video_sender_init(&sender, &cfg) != 0) { + perror("video_sender_init"); + goto cleanup_join_thread; + } + + next_deadline_ms = monotonic_ms(); + next_metrics_ms = next_deadline_ms + 100.0; + + while (!runtime_should_stop(&runtime)) { + AVFrame *decoded_frame = NULL; + AVFrame *scaled_frame = NULL; + AVPacket *encoded_pkt = NULL; + struct v4l2_buffer buf; + fd_set fds; + struct timeval tv; + int select_result; + int fps; + int qscale; + int max_frame_bytes; + int encode_us = 0; + bool sent = false; + const char *drop_reason = ""; + double now_ms = monotonic_ms(); + double encode_start_ms; + double encode_end_ms; + + runtime_get_profile(&runtime, &fps, &qscale, &max_frame_bytes); + if (fps < 1) { + fps = 1; + } + if (next_deadline_ms > now_ms) { + usleep((useconds_t) ((next_deadline_ms - now_ms) * 1000.0)); + } + next_deadline_ms = monotonic_ms() + (1000.0 / (double) fps); + + FD_ZERO(&fds); + FD_SET(camera_fd, &fds); + tv.tv_sec = 2; + tv.tv_usec = 0; + select_result = select(camera_fd + 1, &fds, NULL, NULL, &tv); + if (select_result <= 0) { + if (select_result < 0 && errno == EINTR) { + continue; + } + fprintf(stderr, "select timeout/error on camera\n"); + continue; + } + + CLEAR(buf); + buf.type = V4L2_BUF_TYPE_VIDEO_CAPTURE; + buf.memory = V4L2_MEMORY_MMAP; + if (ioctl(camera_fd, VIDIOC_DQBUF, &buf) < 0) { + if (errno == EAGAIN) { + continue; + } + perror("VIDIOC_DQBUF"); + break; + } + + if (decode_mjpeg_frame(decoder, (uint8_t *) buffers[buf.index].start, (int) buf.bytesused, &decoded_frame) != 0) { + drop_reason = "decode_failed"; + goto requeue_and_report; + } + + if ( + sws_ctx == NULL + || sws_src_width != decoded_frame->width + || sws_src_height != decoded_frame->height + || sws_src_format != decoded_frame->format + ) { + sws_freeContext(sws_ctx); + sws_ctx = create_scaler(decoded_frame, &cfg); + sws_src_width = decoded_frame->width; + sws_src_height = decoded_frame->height; + sws_src_format = decoded_frame->format; + } + if (scale_frame(sws_ctx, &cfg, decoded_frame, &scaled_frame) != 0) { + drop_reason = "scale_failed"; + goto requeue_and_report; + } + + encode_start_ms = monotonic_ms(); + if (encode_frame(encoder, scaled_frame, qscale, &encoded_pkt) != 0) { + drop_reason = "encode_failed"; + goto requeue_and_report; + } + encode_end_ms = monotonic_ms(); + encode_us = (int) ((encode_end_ms - encode_start_ms) * 1000.0); + + if (encoded_pkt->size > max_frame_bytes) { + drop_reason = "frame_too_large"; + goto requeue_and_report; + } + if (video_sender_send_packet(&sender, encoded_pkt) != 0) { + perror("video_sender_send_packet"); + drop_reason = "send_failed"; + goto requeue_and_report; + } + sent = true; + +requeue_and_report: + telemetry_write_frame_stat( + &runtime, + encoded_pkt != NULL ? encoded_pkt->size : 0, + encode_us, + sent, + drop_reason); + + if (ioctl(camera_fd, VIDIOC_QBUF, &buf) < 0) { + perror("VIDIOC_QBUF"); + av_frame_free(&decoded_frame); + av_frame_free(&scaled_frame); + if (encoded_pkt != NULL) { + av_packet_free(&encoded_pkt); + } + break; + } + + av_frame_free(&decoded_frame); + av_frame_free(&scaled_frame); + if (encoded_pkt != NULL) { + av_packet_free(&encoded_pkt); + } + + now_ms = monotonic_ms(); + if (sender.client != NULL && now_ms >= next_metrics_ms) { + kcp_conn_metrics_t metrics; + if (kcp_client_metrics_snapshot(sender.client, &metrics) == 0) { + telemetry_write_kcp_metrics(&runtime, &metrics); + } + next_metrics_ms = now_ms + 100.0; + } + } + + exit_code = 0; + +cleanup_join_thread: + runtime_set_shutdown(&runtime, true); + if (control_stream != NULL) { + fclose(control_stream); + control_stream = NULL; + } + if (control_thread_started) { + pthread_join(control_thread, NULL); + } + +cleanup: + if (control_stream != NULL) { + fclose(control_stream); + control_stream = NULL; + } + if (camera_fd >= 0) { + ioctl(camera_fd, VIDIOC_STREAMOFF, &stream_type); + } + if (buffers != NULL) { + for (index = 0; index < num_buffers; ++index) { + if (buffers[index].start != NULL && buffers[index].start != MAP_FAILED) { + munmap(buffers[index].start, buffers[index].length); + } + } + free(buffers); + } + video_sender_close(&sender); + if (encoder != NULL) { + avcodec_free_context(&encoder); + } + if (decoder != NULL) { + avcodec_free_context(&decoder); + } + sws_freeContext(sws_ctx); + if (camera_fd >= 0) { + close(camera_fd); + } + if (telemetry_stream != NULL) { + fclose(telemetry_stream); + telemetry_stream = NULL; + } + pthread_mutex_destroy(&runtime.lock); + pthread_mutex_destroy(&runtime.telemetry_lock); + return exit_code; +} diff --git a/config/b_side_omnidaemon.yaml b/config/b_side_omnidaemon.yaml new file mode 100644 index 0000000..633a8f6 --- /dev/null +++ b/config/b_side_omnidaemon.yaml @@ -0,0 +1,58 @@ +transport: + server_addr: "81.70.156.140:10909" + relay_via: "106.55.173.235:10909" + bind_ip: "" + bind_device: "" + +control_receiver: + peer_id: "peer-b-ctrl" + nodelay: 1 + interval_ms: 5 + resend: 2 + nc: 1 + sndwnd: 32 + rcvwnd: 32 + mtu: 1400 + stats_interval_ms: 100 + queue_capacity: 256 + +video_sender: + enabled: false + peer_id: "peer-b-video" + target_peer: "peer-a-video" + binary_path: "bin/b_side_video_sender" + device: "/dev/video0" + capture_width: 1280 + capture_height: 720 + output_width: 640 + output_height: 360 + fps: 10 + jpeg_quality_qscale: 8 + max_frame_bytes: 40960 + stats_interval_ms: 100 + +daemon: + socket_path: "/tmp/omnisocket-b-side.sock" + ctrl_socket_path: "/tmp/omnisocket-b-ctrl.sock" + reconnect_delay_ms: 2000 + telemetry_interval_ms: 100 + worker_restart_delay_ms: 2000 + +policy: + mode: "auto" + health_window_ms: 2000 + green_srtt_ms: 30 + yellow_srtt_ms: 55 + retrans_red_threshold: 8 + profile_green: + fps: 10 + jpeg_quality_qscale: 8 + max_frame_bytes: 40960 + profile_yellow: + fps: 7 + jpeg_quality_qscale: 12 + max_frame_bytes: 28672 + profile_red: + fps: 5 + jpeg_quality_qscale: 16 + max_frame_bytes: 20480 diff --git a/python/omnisocket_b_side/__init__.py b/python/omnisocket_b_side/__init__.py new file mode 100644 index 0000000..5d76152 --- /dev/null +++ b/python/omnisocket_b_side/__init__.py @@ -0,0 +1,10 @@ +from pathlib import Path + + +PACKAGE_ROOT = Path(__file__).resolve().parent +PYTHON_ROOT = PACKAGE_ROOT.parent +REPO_ROOT = PYTHON_ROOT.parent +DEFAULT_SOCKET_PATH = "/tmp/omnisocket-b-side.sock" +DEFAULT_CTRL_SOCKET_PATH = "/tmp/omnisocket-b-ctrl.sock" +DEFAULT_CONFIG_PATH = REPO_ROOT / "config" / "b_side_omnidaemon.yaml" +VERSION = "0.1.0" diff --git a/python/omnisocket_b_side/__main__.py b/python/omnisocket_b_side/__main__.py new file mode 100644 index 0000000..c5a0ee7 --- /dev/null +++ b/python/omnisocket_b_side/__main__.py @@ -0,0 +1,5 @@ +from .daemon import main + + +if __name__ == "__main__": + main() diff --git a/python/omnisocket_b_side/client.py b/python/omnisocket_b_side/client.py new file mode 100644 index 0000000..2020efa --- /dev/null +++ b/python/omnisocket_b_side/client.py @@ -0,0 +1,54 @@ +"""Local AF_UNIX SOCK_SEQPACKET client for B-side control delivery.""" + +from __future__ import annotations + +import os +import socket + +from omnisocket_a_side.control_codec import CONTROL_PACKET_STRUCT + +from . import DEFAULT_CTRL_SOCKET_PATH + + +class BSideControlClient: + def __init__(self, socket_path: str | None = None) -> None: + self.socket_path = socket_path or os.getenv( + "OMNIBDAEMON_CTRL_SOCKET", + DEFAULT_CTRL_SOCKET_PATH, + ) + self._sock: socket.socket | None = None + + def connect(self) -> None: + if not hasattr(socket, "AF_UNIX"): + raise OSError("AF_UNIX sockets are not available on this platform") + if not hasattr(socket, "SOCK_SEQPACKET"): + raise OSError("SOCK_SEQPACKET is not available on this platform") + sock = socket.socket(socket.AF_UNIX, socket.SOCK_SEQPACKET) + sock.connect(self.socket_path) + self._sock = sock + + def recv_control_packet(self, timeout_ms: int = 100) -> bytes | None: + if self._sock is None: + raise RuntimeError("B-side control client is not connected") + + self._sock.settimeout(max(0.001, timeout_ms / 1000.0)) + try: + payload = self._sock.recv(CONTROL_PACKET_STRUCT.size) + except socket.timeout: + return None + except BlockingIOError: + return None + + if payload == b"": + raise ConnectionResetError("daemon control socket closed") + if len(payload) != CONTROL_PACKET_STRUCT.size: + return None + return payload + + def close(self) -> None: + if self._sock is None: + return + try: + self._sock.close() + finally: + self._sock = None diff --git a/python/omnisocket_b_side/daemon.py b/python/omnisocket_b_side/daemon.py new file mode 100644 index 0000000..162786d --- /dev/null +++ b/python/omnisocket_b_side/daemon.py @@ -0,0 +1,1318 @@ +"""B-side OmniDaemon that owns control receive and manages video send workers.""" + +from __future__ import annotations + +import argparse +import copy +from dataclasses import dataclass +from datetime import UTC, datetime +from http import HTTPStatus +from http.server import BaseHTTPRequestHandler +import json +import os +from pathlib import Path +import queue +import signal +import socket +import socketserver +import subprocess +import threading +import time +from typing import Any + +import yaml + +from omnisocket_a_side.control_codec import CONTROL_PACKET_STRUCT + +from . import ( + DEFAULT_CONFIG_PATH, + DEFAULT_CTRL_SOCKET_PATH, + DEFAULT_SOCKET_PATH, + REPO_ROOT, + VERSION, +) + + +def utc_iso_now() -> str: + return datetime.now(UTC).isoformat(timespec="seconds").replace("+00:00", "Z") + + +def load_omnisocket_api(): + from omnisocket import CONTROL_DEFAULTS, MSG_TYPE_BINARY, Session + + return CONTROL_DEFAULTS, MSG_TYPE_BINARY, Session + + +def _merge_kcp_defaults(defaults: dict[str, Any], override: dict[str, Any]) -> dict[str, Any]: + merged = dict(defaults) + for key in ("nodelay", "interval_ms", "resend", "nc", "sndwnd", "rcvwnd", "mtu"): + if key in override: + merged[key] = int(override[key]) + return merged + + +def _default_stats() -> dict[str, int]: + return { + "send_calls": 0, + "send_bytes": 0, + "send_errors": 0, + "recv_calls": 0, + "recv_bytes": 0, + "recv_timeouts": 0, + "recv_errors": 0, + "connected": 0, + } + + +def _default_kcp_metrics() -> dict[str, Any]: + return { + "connected": 0, + "has_conv": 0, + "conv": 0, + "local_addr": "", + "remote_addr": "", + "rto_ms": 0, + "srtt_ms": 0, + "srttvar_ms": 0, + "bytes_sent": 0, + "bytes_received": 0, + "in_pkts": 0, + "out_pkts": 0, + "in_segs": 0, + "out_segs": 0, + "retrans_segs": 0, + "fast_retrans_segs": 0, + "early_retrans_segs": 0, + "lost_segs": 0, + "repeat_segs": 0, + "in_errs": 0, + "kcp_in_errs": 0, + "ring_buffer_snd_queue": 0, + "ring_buffer_rcv_queue": 0, + "ring_buffer_snd_buffer": 0, + } + + +@dataclass(slots=True) +class VideoProfile: + fps: int + jpeg_quality_qscale: int + max_frame_bytes: int + + def as_dict(self) -> dict[str, int]: + return { + "fps": int(self.fps), + "jpeg_quality_qscale": int(self.jpeg_quality_qscale), + "max_frame_bytes": int(self.max_frame_bytes), + } + + +def _load_config(config_path: str | None) -> dict[str, Any]: + path = Path(os.getenv("OMNIBDAEMON_CONFIG", config_path or str(DEFAULT_CONFIG_PATH))) + raw: dict[str, Any] = {} + if path.exists(): + with path.open("r", encoding="utf-8") as file: + raw = yaml.safe_load(file) or {} + + control_defaults, _msg_type_binary, _session_cls = load_omnisocket_api() + transport = dict(raw.get("transport", {})) + control = dict(raw.get("control_receiver", {})) + video = dict(raw.get("video_sender", {})) + daemon_cfg = dict(raw.get("daemon", {})) + policy = dict(raw.get("policy", {})) + + def _profile(name: str, fps: int, qscale: int, max_frame_bytes: int) -> dict[str, int]: + section = dict(policy.get(name, {})) + return { + "fps": int(section.get("fps", fps)), + "jpeg_quality_qscale": int(section.get("jpeg_quality_qscale", qscale)), + "max_frame_bytes": int(section.get("max_frame_bytes", max_frame_bytes)), + } + + binary_path = str(video.get("binary_path", "bin/b_side_video_sender")) + if not os.path.isabs(binary_path): + binary_path = str((REPO_ROOT / binary_path).resolve()) + + return { + "config_path": str(path), + "transport": { + "server_addr": str(transport.get("server_addr", "")), + "relay_via": str(transport.get("relay_via", "")), + "bind_ip": str(transport.get("bind_ip", "")), + "bind_device": str(transport.get("bind_device", "")), + }, + "control_receiver": { + "peer_id": str(control.get("peer_id", "peer-b-ctrl")), + "stats_interval_ms": int(control.get("stats_interval_ms", 100)), + "queue_capacity": int(control.get("queue_capacity", 256)), + "kcp": _merge_kcp_defaults(control_defaults, control), + }, + "video_sender": { + "enabled": bool(video.get("enabled", False)), + "binary_path": binary_path, + "peer_id": str(video.get("peer_id", "peer-b-video")), + "target_peer": str(video.get("target_peer", "peer-a-video")), + "stats_interval_ms": int(video.get("stats_interval_ms", 100)), + "device": str(video.get("device", "/dev/video0")), + "capture_width": int(video.get("capture_width", 1280)), + "capture_height": int(video.get("capture_height", 720)), + "output_width": int(video.get("output_width", 640)), + "output_height": int(video.get("output_height", 360)), + "initial_profile": { + "fps": int(video.get("fps", 10)), + "jpeg_quality_qscale": int(video.get("jpeg_quality_qscale", 8)), + "max_frame_bytes": int(video.get("max_frame_bytes", 40960)), + }, + }, + "daemon": { + "socket_path": os.getenv( + "OMNIBDAEMON_SOCKET", + str(daemon_cfg.get("socket_path", DEFAULT_SOCKET_PATH)), + ), + "ctrl_socket_path": os.getenv( + "OMNIBDAEMON_CTRL_SOCKET", + str(daemon_cfg.get("ctrl_socket_path", DEFAULT_CTRL_SOCKET_PATH)), + ), + "reconnect_delay_ms": int(daemon_cfg.get("reconnect_delay_ms", 2000)), + "telemetry_interval_ms": int(daemon_cfg.get("telemetry_interval_ms", 100)), + "worker_restart_delay_ms": int(daemon_cfg.get("worker_restart_delay_ms", 2000)), + }, + "policy": { + "mode": str(policy.get("mode", "auto")).lower(), + "health_window_ms": int(policy.get("health_window_ms", 2000)), + "green_srtt_ms": int(policy.get("green_srtt_ms", 30)), + "yellow_srtt_ms": int(policy.get("yellow_srtt_ms", 55)), + "retrans_red_threshold": int(policy.get("retrans_red_threshold", 8)), + "profile_green": _profile("profile_green", 10, 8, 40960), + "profile_yellow": _profile("profile_yellow", 7, 12, 28672), + "profile_red": _profile("profile_red", 5, 16, 20480), + }, + } + + +class ControlRecvManager: + def __init__(self, config: dict[str, Any]) -> None: + control_defaults, msg_type_binary, session_cls = load_omnisocket_api() + transport = config["transport"] + control_cfg = config["control_receiver"] + daemon_cfg = config["daemon"] + + self._msg_type_binary = msg_type_binary + self._session_cls = session_cls + self._connect_kwargs = { + "server_addr": transport["server_addr"], + "relay_via": transport["relay_via"], + "bind_ip": transport["bind_ip"], + "bind_device": transport["bind_device"], + "peer_id": control_cfg["peer_id"], + "stats_interval_ms": control_cfg["stats_interval_ms"], + **_merge_kcp_defaults(control_defaults, control_cfg["kcp"]), + } + self._reconnect_delay_s = max(0.25, daemon_cfg["reconnect_delay_ms"] / 1000.0) + self._queue: queue.Queue[bytes] = queue.Queue(maxsize=control_cfg["queue_capacity"]) + + self._lock = threading.Lock() + self._stop_event = threading.Event() + self._thread = threading.Thread(target=self._run, name="omni-b-side-control", daemon=True) + self._session = None + self._connected = False + self._last_error = "" + self._last_connect_attempt = 0.0 + self._packets_received = 0 + self._packets_enqueued = 0 + self._ignored_non_binary = 0 + self._ignored_bad_length = 0 + self._dropped_queue_full = 0 + + @property + def packet_queue(self) -> queue.Queue[bytes]: + return self._queue + + def start(self) -> None: + self._thread.start() + + def stop(self) -> None: + self._stop_event.set() + if self._thread.is_alive(): + self._thread.join(timeout=2.0) + self._disconnect("control receiver stopped") + + def snapshot(self) -> dict[str, Any]: + with self._lock: + session = self._session + connected = self._connected and session is not None + last_error = self._last_error + packets_received = self._packets_received + packets_enqueued = self._packets_enqueued + ignored_non_binary = self._ignored_non_binary + ignored_bad_length = self._ignored_bad_length + dropped_queue_full = self._dropped_queue_full + + stats = _default_stats() + metrics = _default_kcp_metrics() + if session is not None: + try: + stats = dict(session.stats()) + metrics = dict(session.kcp_metrics()) + connected = bool(stats.get("connected") and metrics.get("connected")) + except Exception as error: # pragma: no cover - runtime integration + connected = False + last_error = str(error) + + return { + "connected": connected, + "peer_id": self._connect_kwargs["peer_id"], + "server_addr": self._connect_kwargs["server_addr"], + "relay_via": self._connect_kwargs["relay_via"], + "queue_depth": self._queue.qsize(), + "packets_received": packets_received, + "packets_enqueued": packets_enqueued, + "ignored_non_binary": ignored_non_binary, + "ignored_bad_length": ignored_bad_length, + "dropped_queue_full": dropped_queue_full, + "last_error": last_error, + "stats": stats, + "kcp_metrics": metrics, + } + + def _run(self) -> None: + while not self._stop_event.is_set(): + if not self._is_connected(): + now = time.monotonic() + if now - self._last_connect_attempt >= self._reconnect_delay_s: + self._last_connect_attempt = now + self._connect() + time.sleep(0.2) + continue + + with self._lock: + session = self._session + if session is None: + continue + + try: + item = session.recv(timeout_ms=200) + except Exception as error: # pragma: no cover - runtime integration + self._disconnect(str(error)) + continue + + if item is None: + continue + + _from_peer, msg_type, payload = item + with self._lock: + self._packets_received += 1 + + if msg_type != self._msg_type_binary: + with self._lock: + self._ignored_non_binary += 1 + continue + if len(payload) != CONTROL_PACKET_STRUCT.size: + with self._lock: + self._ignored_bad_length += 1 + continue + self._enqueue_packet(bytes(payload)) + + def _connect(self) -> None: + session = self._session_cls() + try: + session.connect(**self._connect_kwargs) + except Exception as error: # pragma: no cover - runtime integration + with self._lock: + self._connected = False + self._last_error = str(error) + try: + session.close() + except Exception: + pass + return + + with self._lock: + self._session = session + self._connected = True + self._last_error = "" + + def _disconnect(self, error_message: str) -> None: + with self._lock: + session = self._session + self._session = None + self._connected = False + if error_message: + self._last_error = error_message + if session is not None: + try: + session.close() + except Exception: + pass + + def _enqueue_packet(self, payload: bytes) -> None: + try: + self._queue.put_nowait(payload) + except queue.Full: + try: + self._queue.get_nowait() + except queue.Empty: + pass + with self._lock: + self._dropped_queue_full += 1 + try: + self._queue.put_nowait(payload) + except queue.Full: + with self._lock: + self._dropped_queue_full += 1 + return + with self._lock: + self._packets_enqueued += 1 + + def _is_connected(self) -> bool: + with self._lock: + return self._connected and self._session is not None + + +class ControlFanout: + def __init__(self, socket_path: str, packet_queue: queue.Queue[bytes]) -> None: + self._socket_path = socket_path + self._packet_queue = packet_queue + self._lock = threading.Lock() + self._stop_event = threading.Event() + self._accept_thread = threading.Thread(target=self._accept_loop, name="omni-b-side-ctrl-accept", daemon=True) + self._send_thread = threading.Thread(target=self._send_loop, name="omni-b-side-ctrl-send", daemon=True) + self._server_socket: socket.socket | None = None + self._consumer_socket: socket.socket | None = None + self._sent_packets = 0 + self._accepted_connections = 0 + self._dropped_no_consumer = 0 + self._dropped_send_errors = 0 + + def start(self) -> None: + socket_dir = os.path.dirname(self._socket_path) + if socket_dir: + os.makedirs(socket_dir, exist_ok=True) + try: + os.unlink(self._socket_path) + except FileNotFoundError: + pass + server_socket = socket.socket(socket.AF_UNIX, socket.SOCK_SEQPACKET) + server_socket.bind(self._socket_path) + server_socket.listen(1) + server_socket.settimeout(0.5) + self._server_socket = server_socket + self._accept_thread.start() + self._send_thread.start() + + def stop(self) -> None: + self._stop_event.set() + with self._lock: + consumer = self._consumer_socket + self._consumer_socket = None + if consumer is not None: + try: + consumer.close() + except OSError: + pass + if self._server_socket is not None: + try: + self._server_socket.close() + except OSError: + pass + self._server_socket = None + if self._accept_thread.is_alive(): + self._accept_thread.join(timeout=2.0) + if self._send_thread.is_alive(): + self._send_thread.join(timeout=2.0) + try: + os.unlink(self._socket_path) + except FileNotFoundError: + pass + + def snapshot(self) -> dict[str, Any]: + with self._lock: + consumer_connected = self._consumer_socket is not None + sent_packets = self._sent_packets + accepted_connections = self._accepted_connections + dropped_no_consumer = self._dropped_no_consumer + dropped_send_errors = self._dropped_send_errors + return { + "consumer_connected": consumer_connected, + "queue_depth": self._packet_queue.qsize(), + "sent_packets": sent_packets, + "accepted_connections": accepted_connections, + "dropped_no_consumer": dropped_no_consumer, + "dropped_send_errors": dropped_send_errors, + "socket_path": self._socket_path, + } + + def _accept_loop(self) -> None: + while not self._stop_event.is_set(): + server_socket = self._server_socket + if server_socket is None: + return + try: + consumer, _addr = server_socket.accept() + except socket.timeout: + continue + except OSError: + if self._stop_event.is_set(): + return + time.sleep(0.2) + continue + + with self._lock: + old_consumer = self._consumer_socket + self._consumer_socket = consumer + self._accepted_connections += 1 + if old_consumer is not None: + try: + old_consumer.close() + except OSError: + pass + + def _send_loop(self) -> None: + while not self._stop_event.is_set(): + try: + payload = self._packet_queue.get(timeout=0.2) + except queue.Empty: + continue + with self._lock: + consumer = self._consumer_socket + if consumer is None: + with self._lock: + self._dropped_no_consumer += 1 + continue + + try: + consumer.sendall(payload) + except OSError: + with self._lock: + self._dropped_send_errors += 1 + if self._consumer_socket is consumer: + self._consumer_socket = None + try: + consumer.close() + except OSError: + pass + continue + + with self._lock: + self._sent_packets += 1 + + +class VideoWorkerManager: + def __init__(self, config: dict[str, Any]) -> None: + self._config = config + self._transport_cfg = config["transport"] + self._video_cfg = config["video_sender"] + self._daemon_cfg = config["daemon"] + self._policy_cfg = config["policy"] + self._enabled = bool(self._video_cfg["enabled"]) + self._restart_delay_s = max(0.5, self._daemon_cfg["worker_restart_delay_ms"] / 1000.0) + + initial = self._video_cfg["initial_profile"] + initial_profile = VideoProfile( + fps=int(initial["fps"]), + jpeg_quality_qscale=int(initial["jpeg_quality_qscale"]), + max_frame_bytes=int(initial["max_frame_bytes"]), + ) + + self._lock = threading.Lock() + self._stop_event = threading.Event() + self._thread = threading.Thread(target=self._run, name="omni-b-side-video-worker", daemon=True) + self._reader_thread: threading.Thread | None = None + self._process: subprocess.Popen[str] | None = None + self._control_stream = None + self._telemetry_stream = None + self._mode = self._policy_cfg["mode"] if self._policy_cfg["mode"] in {"auto", "manual"} else "auto" + self._desired_profile = initial_profile + self._active_profile = initial_profile + self._restart_count = 0 + self._last_exit_code: int | None = None + self._last_error = "" + self._frames_total = 0 + self._frames_sent = 0 + self._frames_dropped = 0 + self._oversized_drops = 0 + self._last_frame_bytes = 0 + self._last_encode_us = 0 + self._last_drop_reason = "" + self._last_frame_ts = 0 + self._kcp_metrics = _default_kcp_metrics() + + def start(self) -> None: + if not self._enabled: + return + self._thread.start() + + def stop(self) -> None: + self._stop_event.set() + self._send_worker_command({"type": "shutdown"}) + if self._thread.is_alive(): + self._thread.join(timeout=3.0) + self._terminate_worker() + + def snapshot(self) -> dict[str, Any]: + with self._lock: + process = self._process + running = process is not None and process.poll() is None + return { + "enabled": self._enabled, + "running": running, + "mode": self._mode, + "desired_profile": self._desired_profile.as_dict(), + "active_profile": self._active_profile.as_dict(), + "restart_count": self._restart_count, + "last_exit_code": self._last_exit_code, + "last_error": self._last_error, + "frames_total": self._frames_total, + "frames_sent": self._frames_sent, + "frames_dropped": self._frames_dropped, + "oversized_drops": self._oversized_drops, + "last_frame_bytes": self._last_frame_bytes, + "last_encode_us": self._last_encode_us, + "last_drop_reason": self._last_drop_reason, + "last_frame_ts": self._last_frame_ts, + "kcp_metrics": dict(self._kcp_metrics), + "binary_path": self._video_cfg["binary_path"], + } + + def set_profile( + self, + *, + mode: str, + fps: int | None = None, + jpeg_quality_qscale: int | None = None, + max_frame_bytes: int | None = None, + ) -> dict[str, Any]: + mode = mode.lower() + if mode not in {"auto", "manual"}: + raise ValueError("mode must be auto or manual") + + if mode == "auto": + with self._lock: + self._mode = "auto" + profile = self._desired_profile + self._send_worker_command({"type": "set_profile", **profile.as_dict()}) + return self.snapshot() + + if fps is None or jpeg_quality_qscale is None or max_frame_bytes is None: + raise ValueError("manual mode requires fps, jpeg_quality_qscale, and max_frame_bytes") + + profile = VideoProfile( + fps=max(1, int(fps)), + jpeg_quality_qscale=max(1, int(jpeg_quality_qscale)), + max_frame_bytes=max(1024, int(max_frame_bytes)), + ) + with self._lock: + self._mode = "manual" + self._desired_profile = profile + self._send_worker_command({"type": "set_profile", **profile.as_dict()}) + return self.snapshot() + + def apply_auto_profile(self, profile: VideoProfile) -> None: + with self._lock: + if self._mode != "auto": + return + if self._desired_profile.as_dict() == profile.as_dict(): + return + self._desired_profile = profile + self._send_worker_command({"type": "set_profile", **profile.as_dict()}) + + def _run(self) -> None: + while not self._stop_event.is_set(): + process = self._process + if process is None: + self._spawn_worker() + time.sleep(0.2) + continue + + exit_code = process.poll() + if exit_code is None: + time.sleep(0.5) + continue + + with self._lock: + self._last_exit_code = int(exit_code) + self._last_error = f"video worker exited with code {exit_code}" + self._restart_count += 1 + self._close_worker_handles() + with self._lock: + self._process = None + if self._stop_event.wait(self._restart_delay_s): + return + + def _spawn_worker(self) -> None: + binary_path = self._video_cfg["binary_path"] + if not os.path.exists(binary_path): + with self._lock: + self._last_error = f"video worker binary not found: {binary_path}" + self._stop_event.wait(self._restart_delay_s) + return + + command_read_fd, command_write_fd = os.pipe() + telemetry_read_fd, telemetry_write_fd = os.pipe() + + def _preexec() -> None: + os.dup2(command_read_fd, 3) + os.dup2(telemetry_write_fd, 4) + for fd in (command_read_fd, command_write_fd, telemetry_read_fd, telemetry_write_fd): + if fd not in (3, 4): + try: + os.close(fd) + except OSError: + pass + + env = dict(os.environ) + env.update( + { + "OMNI_VIDEO_SERVER_ADDR": self._transport_cfg["server_addr"], + "OMNI_VIDEO_RELAY_VIA": self._transport_cfg["relay_via"], + "OMNI_VIDEO_BIND_IP": self._transport_cfg["bind_ip"], + "OMNI_VIDEO_BIND_DEVICE": self._transport_cfg["bind_device"], + "OMNI_VIDEO_PEER_ID": self._video_cfg["peer_id"], + "OMNI_VIDEO_TARGET_PEER": self._video_cfg["target_peer"], + "OMNI_VIDEO_DEVICE": self._video_cfg["device"], + "OMNI_VIDEO_CAPTURE_WIDTH": str(self._video_cfg["capture_width"]), + "OMNI_VIDEO_CAPTURE_HEIGHT": str(self._video_cfg["capture_height"]), + "OMNI_VIDEO_OUTPUT_WIDTH": str(self._video_cfg["output_width"]), + "OMNI_VIDEO_OUTPUT_HEIGHT": str(self._video_cfg["output_height"]), + "OMNI_VIDEO_STATS_INTERVAL_MS": str(self._video_cfg["stats_interval_ms"]), + } + ) + with self._lock: + profile = self._desired_profile + env["OMNI_VIDEO_FPS"] = str(profile.fps) + env["OMNI_VIDEO_JPEG_QSCALE"] = str(profile.jpeg_quality_qscale) + env["OMNI_VIDEO_MAX_FRAME_BYTES"] = str(profile.max_frame_bytes) + + try: + process = subprocess.Popen( + [binary_path], + stdin=subprocess.DEVNULL, + stdout=subprocess.DEVNULL, + stderr=None, + env=env, + close_fds=True, + pass_fds=(command_read_fd, telemetry_write_fd), + preexec_fn=_preexec, + ) + except Exception as error: # pragma: no cover - runtime integration + for fd in (command_read_fd, command_write_fd, telemetry_read_fd, telemetry_write_fd): + try: + os.close(fd) + except OSError: + pass + with self._lock: + self._last_error = str(error) + self._stop_event.wait(self._restart_delay_s) + return + + os.close(command_read_fd) + os.close(telemetry_write_fd) + control_stream = os.fdopen(command_write_fd, "w", buffering=1, encoding="utf-8") + telemetry_stream = os.fdopen( + telemetry_read_fd, + "r", + buffering=1, + encoding="utf-8", + errors="replace", + ) + + with self._lock: + self._process = process + self._control_stream = control_stream + self._telemetry_stream = telemetry_stream + self._active_profile = profile + self._last_error = "" + + reader_thread = threading.Thread( + target=self._telemetry_loop, + args=(telemetry_stream, process), + name="omni-b-side-video-telemetry", + daemon=True, + ) + self._reader_thread = reader_thread + reader_thread.start() + self._send_worker_command({"type": "set_profile", **profile.as_dict()}) + + def _telemetry_loop(self, telemetry_stream, process: subprocess.Popen[Any]) -> None: + while not self._stop_event.is_set(): + line = telemetry_stream.readline() + if not line: + return + try: + payload = json.loads(line) + except json.JSONDecodeError: + continue + if not isinstance(payload, dict): + continue + payload_type = payload.get("type") + with self._lock: + if payload_type == "frame_stat": + self._frames_total += 1 + self._last_frame_bytes = int(payload.get("frame_bytes", 0) or 0) + self._last_encode_us = int(payload.get("encode_us", 0) or 0) + self._last_drop_reason = str(payload.get("drop_reason", "")) + self._last_frame_ts = int(payload.get("ts_unix_ms", 0) or 0) + if bool(payload.get("sent", False)): + self._frames_sent += 1 + else: + self._frames_dropped += 1 + if self._last_drop_reason == "frame_too_large": + self._oversized_drops += 1 + elif payload_type == "kcp_metrics": + merged = _default_kcp_metrics() + for key in merged: + if key in payload: + merged[key] = payload[key] + self._kcp_metrics = merged + else: + self._last_error = f"unknown worker telemetry type: {payload_type}" + if process.poll() is not None: + return + + def _send_worker_command(self, payload: dict[str, Any]) -> None: + with self._lock: + control_stream = self._control_stream + if control_stream is None: + return + try: + control_stream.write(json.dumps(payload) + "\n") + control_stream.flush() + except OSError as error: + with self._lock: + self._last_error = str(error) + + def _close_worker_handles(self) -> None: + with self._lock: + control_stream = self._control_stream + telemetry_stream = self._telemetry_stream + self._control_stream = None + self._telemetry_stream = None + if control_stream is not None: + try: + control_stream.close() + except OSError: + pass + if telemetry_stream is not None: + try: + telemetry_stream.close() + except OSError: + pass + + def _terminate_worker(self) -> None: + self._close_worker_handles() + with self._lock: + process = self._process + self._process = None + if process is None: + return + if process.poll() is None: + process.terminate() + try: + process.wait(timeout=2.0) + except subprocess.TimeoutExpired: + process.kill() + try: + process.wait(timeout=2.0) + except subprocess.TimeoutExpired: + pass + + +class PolicyEngine: + def __init__(self, policy_cfg: dict[str, Any]) -> None: + self._policy_cfg = policy_cfg + + def classify( + self, + *, + control_connected: bool, + consumer_connected: bool, + avg_srtt_ms: float, + retrans_delta: int, + video_oversized_recent: bool, + video_backlogged: bool, + ) -> tuple[str, VideoProfile]: + if not control_connected or not consumer_connected: + band = "red" + elif avg_srtt_ms >= self._policy_cfg["yellow_srtt_ms"] or retrans_delta >= self._policy_cfg["retrans_red_threshold"]: + band = "red" + elif avg_srtt_ms >= self._policy_cfg["green_srtt_ms"] or retrans_delta > 0: + band = "yellow" + else: + band = "green" + + if band == "green" and (video_oversized_recent or video_backlogged): + band = "yellow" + elif band == "yellow" and (video_oversized_recent or video_backlogged): + band = "red" + + profile_raw = self._policy_cfg[f"profile_{band}"] + return band, VideoProfile( + fps=int(profile_raw["fps"]), + jpeg_quality_qscale=int(profile_raw["jpeg_quality_qscale"]), + max_frame_bytes=int(profile_raw["max_frame_bytes"]), + ) + + +class TelemetrySampler: + def __init__( + self, + config: dict[str, Any], + control_manager: ControlRecvManager, + fanout: ControlFanout, + video_worker: VideoWorkerManager, + ) -> None: + self._config = config + self._control_manager = control_manager + self._fanout = fanout + self._video_worker = video_worker + self._policy_engine = PolicyEngine(config["policy"]) + self._interval_s = max(0.1, config["daemon"]["telemetry_interval_ms"] / 1000.0) + self._window_s = max(self._interval_s, config["policy"]["health_window_ms"] / 1000.0) + + self._lock = threading.Lock() + self._stop_event = threading.Event() + self._thread = threading.Thread(target=self._run, name="omni-b-side-telemetry", daemon=True) + self._started_at = utc_iso_now() + self._state = self._build_initial_state(config) + self._history: list[dict[str, Any]] = [] + self._last_totals: dict[str, float] | None = None + + def start(self) -> None: + self._thread.start() + + def stop(self) -> None: + self._stop_event.set() + if self._thread.is_alive(): + self._thread.join(timeout=2.0) + + def snapshot(self) -> dict[str, Any]: + with self._lock: + return copy.deepcopy(self._state) + + def _run(self) -> None: + while not self._stop_event.is_set(): + self._sample_once() + self._stop_event.wait(self._interval_s) + + def _sample_once(self) -> None: + now = time.time() + control = self._control_manager.snapshot() + fanout = self._fanout.snapshot() + video = self._video_worker.snapshot() + control_metrics = control["kcp_metrics"] + video_metrics = video["kcp_metrics"] + + self._history.append( + { + "ts": now, + "connected": bool(control["connected"]), + "srtt_ms": float(control_metrics.get("srtt_ms", 0.0) or 0.0), + "retrans_segs": int(control_metrics.get("retrans_segs", 0) or 0), + } + ) + self._history = [sample for sample in self._history if now - sample["ts"] <= self._window_s] + + srtt_values = [sample["srtt_ms"] for sample in self._history if sample["connected"]] + avg_srtt_ms = sum(srtt_values) / len(srtt_values) if srtt_values else 0.0 + jitter_ms = 0.0 + if len(srtt_values) >= 2: + deltas = [abs(srtt_values[index] - srtt_values[index - 1]) for index in range(1, len(srtt_values))] + jitter_ms = sum(deltas) / len(deltas) + retrans_delta = 0 + if len(self._history) >= 2: + retrans_delta = max(0, int(self._history[-1]["retrans_segs"] - self._history[0]["retrans_segs"])) + + last_frame_ts_ms = int(video["last_frame_ts"] or 0) + video_oversized_recent = bool( + video["last_drop_reason"] == "frame_too_large" + and last_frame_ts_ms > 0 + and now - (last_frame_ts_ms / 1000.0) <= self._window_s + ) + video_backlogged = bool( + int(video_metrics.get("ring_buffer_snd_queue", 0) or 0) > 128 + or int(video_metrics.get("ring_buffer_snd_buffer", 0) or 0) > 128 + ) + + health_band, recommended_profile = self._policy_engine.classify( + control_connected=bool(control["connected"]), + consumer_connected=bool(fanout["consumer_connected"]), + avg_srtt_ms=avg_srtt_ms, + retrans_delta=retrans_delta, + video_oversized_recent=video_oversized_recent, + video_backlogged=video_backlogged, + ) + self._video_worker.apply_auto_profile(recommended_profile) + + total_bytes_sent = int(control_metrics.get("bytes_sent", 0) or 0) + int(video_metrics.get("bytes_sent", 0) or 0) + total_bytes_received = int(control_metrics.get("bytes_received", 0) or 0) + int(video_metrics.get("bytes_received", 0) or 0) + total_out_segs = int(control_metrics.get("out_segs", 0) or 0) + int(video_metrics.get("out_segs", 0) or 0) + total_retrans = int(control_metrics.get("retrans_segs", 0) or 0) + int(video_metrics.get("retrans_segs", 0) or 0) + tx_kbps = 0 + rx_kbps = 0 + retrans_pct = 0.0 + if self._last_totals is not None: + dt = max(0.001, now - self._last_totals["ts"]) + sent_delta = max(0, total_bytes_sent - int(self._last_totals["bytes_sent"])) + recv_delta = max(0, total_bytes_received - int(self._last_totals["bytes_received"])) + out_seg_delta = max(0, total_out_segs - int(self._last_totals["out_segs"])) + retrans_total_delta = max(0, total_retrans - int(self._last_totals["retrans"])) + tx_kbps = int((sent_delta * 8.0) / dt / 1000.0) + rx_kbps = int((recv_delta * 8.0) / dt / 1000.0) + if out_seg_delta > 0: + retrans_pct = round((retrans_total_delta / out_seg_delta) * 100.0, 2) + self._last_totals = { + "ts": now, + "bytes_sent": float(total_bytes_sent), + "bytes_received": float(total_bytes_received), + "out_segs": float(total_out_segs), + "retrans": float(total_retrans), + } + + state = { + "network": { + "peer_status": "online" if control["connected"] else "offline", + "latency_ms": round(avg_srtt_ms or float(control_metrics.get("srtt_ms", 0) or 0.0), 1), + "jitter_ms": round(jitter_ms, 1), + "retrans_pct": round(retrans_pct, 2), + "packet_loss_pct": round(retrans_pct, 2), + "tx_kbps": tx_kbps, + "rx_kbps": rx_kbps, + "signal_dbm": None, + "transport": "OmniSocket / daemon", + "source_mode": "daemon-live", + "updated_at": utc_iso_now(), + }, + "control": { + "connected": bool(control["connected"]), + "consumer_connected": bool(fanout["consumer_connected"]), + "queue_depth": int(control["queue_depth"]), + "fanout_queue_depth": int(fanout["queue_depth"]), + "packets_received": int(control["packets_received"]), + "packets_enqueued": int(control["packets_enqueued"]), + "sent_to_consumer": int(fanout["sent_packets"]), + "dropped_no_consumer": int(fanout["dropped_no_consumer"]), + "dropped_queue_full": int(control["dropped_queue_full"]), + "ignored_non_binary": int(control["ignored_non_binary"]), + "ignored_bad_length": int(control["ignored_bad_length"]), + "last_error": control["last_error"], + "peer_id": control["peer_id"], + "server_addr": control["server_addr"], + "relay_via": control["relay_via"], + "stats": control["stats"], + "kcp_metrics": control_metrics, + }, + "video": { + "enabled": bool(video["enabled"]), + "running": bool(video["running"]), + "mode": str(video["mode"]), + "desired_profile": video["desired_profile"], + "active_profile": video["active_profile"], + "restart_count": int(video["restart_count"]), + "last_exit_code": video["last_exit_code"], + "last_error": video["last_error"], + "frames_total": int(video["frames_total"]), + "frames_sent": int(video["frames_sent"]), + "frames_dropped": int(video["frames_dropped"]), + "oversized_drops": int(video["oversized_drops"]), + "last_frame_bytes": int(video["last_frame_bytes"]), + "last_encode_us": int(video["last_encode_us"]), + "last_drop_reason": str(video["last_drop_reason"]), + "last_frame_ts": int(video["last_frame_ts"]), + "binary_path": video["binary_path"], + "kcp_metrics": video_metrics, + }, + "policy": { + "mode": video["mode"], + "health_band": health_band, + "recommended_video_profile": recommended_profile.as_dict(), + }, + "daemon": { + "started_at": self._started_at, + "version": VERSION, + "config_path": self._config["config_path"], + "socket_path": self._config["daemon"]["socket_path"], + "ctrl_socket_path": self._config["daemon"]["ctrl_socket_path"], + }, + } + + with self._lock: + self._state = state + + @staticmethod + def _build_initial_state(config: dict[str, Any]) -> dict[str, Any]: + return { + "network": { + "peer_status": "offline", + "latency_ms": 0.0, + "jitter_ms": 0.0, + "retrans_pct": 0.0, + "packet_loss_pct": 0.0, + "tx_kbps": 0, + "rx_kbps": 0, + "signal_dbm": None, + "transport": "OmniSocket / daemon", + "source_mode": "daemon-starting", + "updated_at": utc_iso_now(), + }, + "control": { + "connected": False, + "consumer_connected": False, + "queue_depth": 0, + "fanout_queue_depth": 0, + "packets_received": 0, + "packets_enqueued": 0, + "sent_to_consumer": 0, + "dropped_no_consumer": 0, + "dropped_queue_full": 0, + "ignored_non_binary": 0, + "ignored_bad_length": 0, + "last_error": "", + "peer_id": "", + "server_addr": "", + "relay_via": "", + "stats": _default_stats(), + "kcp_metrics": _default_kcp_metrics(), + }, + "video": { + "enabled": bool(config["video_sender"]["enabled"]), + "running": False, + "mode": config["policy"]["mode"], + "desired_profile": dict(config["video_sender"]["initial_profile"]), + "active_profile": dict(config["video_sender"]["initial_profile"]), + "restart_count": 0, + "last_exit_code": None, + "last_error": "", + "frames_total": 0, + "frames_sent": 0, + "frames_dropped": 0, + "oversized_drops": 0, + "last_frame_bytes": 0, + "last_encode_us": 0, + "last_drop_reason": "", + "last_frame_ts": 0, + "binary_path": config["video_sender"]["binary_path"], + "kcp_metrics": _default_kcp_metrics(), + }, + "policy": { + "mode": config["policy"]["mode"], + "health_band": "red", + "recommended_video_profile": dict(config["policy"]["profile_red"]), + }, + "daemon": { + "started_at": utc_iso_now(), + "version": VERSION, + "config_path": config["config_path"], + "socket_path": config["daemon"]["socket_path"], + "ctrl_socket_path": config["daemon"]["ctrl_socket_path"], + }, + } + + +class ThreadingUnixHTTPServer(socketserver.ThreadingMixIn, socketserver.UnixStreamServer): + daemon_threads = True + + +class OmniDaemonHTTPHandler(BaseHTTPRequestHandler): + server_version = "BsideOmniDaemonHTTP/1.0" + protocol_version = "HTTP/1.1" + + def do_GET(self) -> None: # pragma: no cover - exercised by runtime integration + app: BSideOmniDaemon = self.server.app # type: ignore[attr-defined] + if self.path == "/v1/health": + state = app.get_state() + band = state["policy"]["health_band"] + status = "ok" if band == "green" else "degraded" + if not state["control"]["connected"]: + status = "unavailable" + self._send_json( + HTTPStatus.OK, + { + "status": status, + "health_band": band, + "control_connected": state["control"]["connected"], + "consumer_connected": state["control"]["consumer_connected"], + "video_running": state["video"]["running"], + "updated_at": state["network"]["updated_at"], + }, + ) + return + if self.path == "/v1/state": + self._send_json(HTTPStatus.OK, app.get_state()) + return + self._send_json(HTTPStatus.NOT_FOUND, {"error": f"unknown path: {self.path}"}) + + def do_POST(self) -> None: # pragma: no cover - exercised by runtime integration + app: BSideOmniDaemon = self.server.app # type: ignore[attr-defined] + if self.path != "/v1/video/profile": + self._send_json(HTTPStatus.NOT_FOUND, {"error": f"unknown path: {self.path}"}) + return + try: + payload = self._read_json() + except ValueError as error: + self._send_json(HTTPStatus.BAD_REQUEST, {"error": str(error)}) + return + + try: + mode = str(payload.get("mode", "auto")).lower() + if mode == "auto": + state = app.set_video_profile(mode="auto") + else: + state = app.set_video_profile( + mode="manual", + fps=int(payload.get("fps", 0)), + jpeg_quality_qscale=int(payload.get("jpeg_quality_qscale", 0)), + max_frame_bytes=int(payload.get("max_frame_bytes", 0)), + ) + except (TypeError, ValueError) as error: + self._send_json(HTTPStatus.BAD_REQUEST, {"error": str(error)}) + return + + self._send_json(HTTPStatus.OK, state) + + def log_message(self, format: str, *args: Any) -> None: # noqa: A003 + return + + def _read_json(self) -> dict[str, Any]: + raw_length = self.headers.get("Content-Length") + if raw_length is None: + raise ValueError("missing Content-Length") + try: + length = int(raw_length) + except ValueError as error: + raise ValueError("invalid Content-Length") from error + raw = self.rfile.read(length) + try: + payload = json.loads(raw.decode("utf-8")) + except json.JSONDecodeError as error: + raise ValueError(f"invalid JSON body: {error}") from error + if not isinstance(payload, dict): + raise ValueError("request body must be a JSON object") + return payload + + def _send_json(self, status: HTTPStatus, payload: dict[str, Any]) -> None: + body = json.dumps(payload).encode("utf-8") + self.send_response(status.value) + self.send_header("Content-Type", "application/json; charset=utf-8") + self.send_header("Content-Length", str(len(body))) + self.send_header("Cache-Control", "no-store") + self.send_header("Connection", "keep-alive") + self.end_headers() + self.wfile.write(body) + + +class BSideOmniDaemon: + def __init__(self, config_path: str | None = None) -> None: + self._config = _load_config(config_path) + self._control_manager = ControlRecvManager(self._config) + self._fanout = ControlFanout( + self._config["daemon"]["ctrl_socket_path"], + self._control_manager.packet_queue, + ) + self._video_worker = VideoWorkerManager(self._config) + self._telemetry = TelemetrySampler( + self._config, + self._control_manager, + self._fanout, + self._video_worker, + ) + self._server: ThreadingUnixHTTPServer | None = None + self._server_thread: threading.Thread | None = None + + @property + def socket_path(self) -> str: + return self._config["daemon"]["socket_path"] + + def start(self) -> None: + self._fanout.start() + self._control_manager.start() + self._video_worker.start() + self._telemetry.start() + self._server = self._build_server() + + def stop(self) -> None: + server_thread = None + if self._server is not None: + if self._server_thread is not None and self._server_thread.is_alive(): + self._server.shutdown() + self._server.server_close() + self._server = None + if self._server_thread is not None and self._server_thread.is_alive(): + server_thread = self._server_thread + self._server_thread = None + try: + os.unlink(self.socket_path) + except FileNotFoundError: + pass + if server_thread is not None and server_thread is not threading.current_thread(): + server_thread.join(timeout=2.0) + self._telemetry.stop() + self._video_worker.stop() + self._control_manager.stop() + self._fanout.stop() + + def serve_forever(self) -> None: + self.start() + assert self._server is not None + self._server_thread = threading.Thread( + target=self._server.serve_forever, + name="omni-b-side-http", + daemon=True, + ) + self._server_thread.start() + self._server_thread.join() + + def get_state(self) -> dict[str, Any]: + return self._telemetry.snapshot() + + def set_video_profile( + self, + *, + mode: str, + fps: int | None = None, + jpeg_quality_qscale: int | None = None, + max_frame_bytes: int | None = None, + ) -> dict[str, Any]: + return self._video_worker.set_profile( + mode=mode, + fps=fps, + jpeg_quality_qscale=jpeg_quality_qscale, + max_frame_bytes=max_frame_bytes, + ) + + def _build_server(self) -> ThreadingUnixHTTPServer: + socket_path = self.socket_path + socket_dir = os.path.dirname(socket_path) + if socket_dir: + os.makedirs(socket_dir, exist_ok=True) + try: + os.unlink(socket_path) + except FileNotFoundError: + pass + server = ThreadingUnixHTTPServer(socket_path, OmniDaemonHTTPHandler) + server.app = self # type: ignore[attr-defined] + return server + + +def main(argv: list[str] | None = None) -> None: + parser = argparse.ArgumentParser(description="Run the B-side OmniDaemon") + parser.add_argument("--config", dest="config_path", help="Path to daemon YAML config") + args = parser.parse_args(argv) + + app = BSideOmniDaemon(config_path=args.config_path) + + def _handle_signal(_signum: int, _frame: Any) -> None: + app.stop() + + signal.signal(signal.SIGINT, _handle_signal) + signal.signal(signal.SIGTERM, _handle_signal) + + try: + app.serve_forever() + except KeyboardInterrupt: + pass + finally: + app.stop() + + +if __name__ == "__main__": + main() diff --git a/python/setup.py b/python/setup.py index 040ddaa..234684d 100644 --- a/python/setup.py +++ b/python/setup.py @@ -35,10 +35,11 @@ COMMON_SOURCES = [ setup( name="omnisocket", version="0.1.0", - packages=["omnisocket", "omnisocket_a_side"], + packages=["omnisocket", "omnisocket_a_side", "omnisocket_b_side"], entry_points={ "console_scripts": [ "omnisocket-a-side-daemon=omnisocket_a_side.daemon:main", + "omnisocket-b-side-daemon=omnisocket_b_side.daemon:main", ] }, ext_modules=[ diff --git a/scripts/start_b_side.sh b/scripts/start_b_side.sh new file mode 100644 index 0000000..a03b246 --- /dev/null +++ b/scripts/start_b_side.sh @@ -0,0 +1,11 @@ +#!/usr/bin/env bash +set -euo pipefail + +ROOT="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)" +CONFIG_PATH="${1:-$ROOT/config/b_side_omnidaemon.yaml}" + +export OMNIBDAEMON_CONFIG="$CONFIG_PATH" +export PYTHONPATH="$ROOT/python${PYTHONPATH:+:$PYTHONPATH}" + +cd "$ROOT" +exec python3 -m omnisocket_b_side.daemon --config "$CONFIG_PATH"