diff --git a/Makefile b/Makefile index 6c4257e..39cf485 100644 --- a/Makefile +++ b/Makefile @@ -50,20 +50,6 @@ CAMERA_VIDEO_SENDER_SRCS := \ third_party/cjson/cJSON.c \ third_party/kcp/ikcp.c -B_SIDE_VIDEO_SENDER := $(BIN_DIR)/b_side_video_sender -B_SIDE_VIDEO_SENDER_SRCS := \ - $(CMD_DIR)/b_side_video_sender.c \ - $(SRC_DIR)/omni_common.c \ - $(SRC_DIR)/protocol.c \ - $(SRC_DIR)/latencylog.c \ - $(SRC_DIR)/kcp_packet_debug.c \ - $(SRC_DIR)/kcp_session_stats.c \ - $(SRC_DIR)/linux_timestamping.c \ - $(SRC_DIR)/transport_kcp.c \ - $(SRC_DIR)/peer_kcp_client.c \ - third_party/cjson/cJSON.c \ - third_party/kcp/ikcp.c - all: $(TARGETS) $(BIN_DIR): @@ -95,11 +81,6 @@ $(CAMERA_VIDEO_SENDER): $(CAMERA_VIDEO_SENDER_SRCS) | $(BIN_DIR) camera_video_sender: $(CAMERA_VIDEO_SENDER) -$(B_SIDE_VIDEO_SENDER): $(B_SIDE_VIDEO_SENDER_SRCS) | $(BIN_DIR) - $(CC) $(CFLAGS) $(CPPFLAGS) $$(pkg-config --cflags libavformat libavcodec libavutil libswscale) -o $@ $^ $(LDFLAGS) $$(pkg-config --libs libavformat libavcodec libavutil libswscale) - -b_side_video_sender: $(B_SIDE_VIDEO_SENDER) - clean: rm -rf $(BIN_DIR) @@ -109,4 +90,4 @@ python-ext: python-install: cd python && $(PYTHON) -m pip install -e . -.PHONY: all clean python-ext python-install camera_video_sender b_side_video_sender +.PHONY: all clean python-ext python-install camera_video_sender diff --git a/cmd/b_side_video_sender.c b/cmd/b_side_video_sender.c deleted file mode 100644 index 518350d..0000000 --- a/cmd/b_side_video_sender.c +++ /dev/null @@ -1,860 +0,0 @@ -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include -#include -#include -#include -#include - -#include "cJSON.h" -#include "peer_kcp_client.h" - -#define WORKER_CONTROL_FD 3 -#define WORKER_TELEMETRY_FD 4 -#define NUM_BUFFERS 4 -#define CLEAR(x) memset(&(x), 0, sizeof(x)) - -#define VIDEO_SERVER_ADDR_ENV "OMNI_VIDEO_SERVER_ADDR" -#define VIDEO_RELAY_ADDR_ENV "OMNI_VIDEO_RELAY_VIA" -#define VIDEO_BIND_IP_ENV "OMNI_VIDEO_BIND_IP" -#define VIDEO_BIND_DEVICE_ENV "OMNI_VIDEO_BIND_DEVICE" -#define VIDEO_PEER_ID_ENV "OMNI_VIDEO_PEER_ID" -#define VIDEO_TARGET_PEER_ENV "OMNI_VIDEO_TARGET_PEER" -#define VIDEO_DEVICE_ENV "OMNI_VIDEO_DEVICE" -#define VIDEO_CAPTURE_WIDTH_ENV "OMNI_VIDEO_CAPTURE_WIDTH" -#define VIDEO_CAPTURE_HEIGHT_ENV "OMNI_VIDEO_CAPTURE_HEIGHT" -#define VIDEO_OUTPUT_WIDTH_ENV "OMNI_VIDEO_OUTPUT_WIDTH" -#define VIDEO_OUTPUT_HEIGHT_ENV "OMNI_VIDEO_OUTPUT_HEIGHT" -#define VIDEO_FPS_ENV "OMNI_VIDEO_FPS" -#define VIDEO_JPEG_QSCALE_ENV "OMNI_VIDEO_JPEG_QSCALE" -#define VIDEO_MAX_FRAME_BYTES_ENV "OMNI_VIDEO_MAX_FRAME_BYTES" -#define VIDEO_STATS_INTERVAL_ENV "OMNI_VIDEO_STATS_INTERVAL_MS" - -typedef struct { - void *start; - size_t length; -} Buffer; - -typedef struct { - char server_addr[OMNI_MAX_ADDR_TEXT]; - char relay_addr[OMNI_MAX_ADDR_TEXT]; - char bind_ip[OMNI_MAX_ADDR_TEXT]; - char bind_device[128]; - char peer_id[OMNI_MAX_PEER_ID]; - char target_peer[OMNI_MAX_PEER_ID]; - char video_device[256]; - int capture_width; - int capture_height; - int output_width; - int output_height; - int initial_fps; - int initial_qscale; - int initial_max_frame_bytes; - int stats_interval_ms; -} worker_config_t; - -typedef struct { - pthread_mutex_t lock; - pthread_mutex_t telemetry_lock; - FILE *telemetry_stream; - int target_fps; - int jpeg_quality_qscale; - int max_frame_bytes; - bool shutdown_requested; -} runtime_state_t; - -typedef struct { - kcp_client_t *client; - char target_peer[OMNI_MAX_PEER_ID]; -} video_sender_t; - -typedef struct { - FILE *control_stream; - runtime_state_t *runtime; -} control_thread_args_t; - -static volatile sig_atomic_t g_signal_stop = 0; - -static double monotonic_ms(void) { - struct timespec ts; - clock_gettime(CLOCK_MONOTONIC, &ts); - return (double) ts.tv_sec * 1000.0 + (double) ts.tv_nsec / 1000000.0; -} - -static int64_t realtime_ms(void) { - struct timespec ts; - clock_gettime(CLOCK_REALTIME, &ts); - return (int64_t) ts.tv_sec * 1000 + (int64_t) ts.tv_nsec / 1000000; -} - -static int env_as_int(const char *name, int fallback) { - const char *raw = getenv(name); - char *endptr = NULL; - long value; - - if (raw == NULL || raw[0] == '\0') { - return fallback; - } - errno = 0; - value = strtol(raw, &endptr, 10); - if (errno != 0 || endptr == raw || (endptr != NULL && *endptr != '\0')) { - return fallback; - } - return (int) value; -} - -static const char *env_or_default(const char *name, const char *fallback) { - const char *value = getenv(name); - if (value != NULL && value[0] != '\0') { - return value; - } - return fallback; -} - -static void handle_signal(int signum) { - (void) signum; - g_signal_stop = 1; -} - -static void runtime_set_shutdown(runtime_state_t *runtime, bool shutdown_requested) { - pthread_mutex_lock(&runtime->lock); - runtime->shutdown_requested = shutdown_requested; - pthread_mutex_unlock(&runtime->lock); -} - -static bool runtime_should_stop(runtime_state_t *runtime) { - bool shutdown_requested; - - pthread_mutex_lock(&runtime->lock); - shutdown_requested = runtime->shutdown_requested; - pthread_mutex_unlock(&runtime->lock); - return g_signal_stop != 0 || shutdown_requested; -} - -static void runtime_get_profile(runtime_state_t *runtime, int *fps, int *qscale, int *max_frame_bytes) { - pthread_mutex_lock(&runtime->lock); - *fps = runtime->target_fps; - *qscale = runtime->jpeg_quality_qscale; - *max_frame_bytes = runtime->max_frame_bytes; - pthread_mutex_unlock(&runtime->lock); -} - -static void runtime_set_profile(runtime_state_t *runtime, int fps, int qscale, int max_frame_bytes) { - pthread_mutex_lock(&runtime->lock); - if (fps > 0) { - runtime->target_fps = fps; - } - if (qscale > 0) { - runtime->jpeg_quality_qscale = qscale; - } - if (max_frame_bytes > 0) { - runtime->max_frame_bytes = max_frame_bytes; - } - pthread_mutex_unlock(&runtime->lock); -} - -static void telemetry_write_line(runtime_state_t *runtime, const char *line) { - pthread_mutex_lock(&runtime->telemetry_lock); - if (runtime->telemetry_stream != NULL) { - fputs(line, runtime->telemetry_stream); - fputc('\n', runtime->telemetry_stream); - fflush(runtime->telemetry_stream); - } - pthread_mutex_unlock(&runtime->telemetry_lock); -} - -static void telemetry_write_frame_stat( - runtime_state_t *runtime, - int frame_bytes, - int encode_us, - bool sent, - const char *drop_reason -) { - char line[512]; - int written; - - written = snprintf( - line, - sizeof(line), - "{\"type\":\"frame_stat\",\"frame_bytes\":%d,\"encode_us\":%d," - "\"sent\":%s,\"drop_reason\":\"%s\",\"ts_unix_ms\":%" PRId64 "}", - frame_bytes, - encode_us, - sent ? "true" : "false", - drop_reason != NULL ? drop_reason : "", - realtime_ms()); - if (written > 0 && written < (int) sizeof(line)) { - telemetry_write_line(runtime, line); - } -} - -static void telemetry_write_kcp_metrics(runtime_state_t *runtime, const kcp_conn_metrics_t *metrics) { - char line[1024]; - int written; - - written = snprintf( - line, - sizeof(line), - "{\"type\":\"kcp_metrics\",\"connected\":%d,\"srtt_ms\":%d,\"srttvar_ms\":%d," - "\"rto_ms\":%u,\"bytes_sent\":%" PRIu64 ",\"bytes_received\":%" PRIu64 "," - "\"out_pkts\":%" PRIu64 ",\"out_segs\":%" PRIu64 ",\"retrans_segs\":%" PRIu64 "," - "\"fast_retrans_segs\":%" PRIu64 ",\"early_retrans_segs\":%" PRIu64 "," - "\"lost_segs\":%" PRIu64 ",\"ring_buffer_snd_queue\":%" PRIu64 "," - "\"ring_buffer_snd_buffer\":%" PRIu64 ",\"ts_unix_ms\":%" PRId64 "}", - metrics->connected, - metrics->srtt_ms, - metrics->srttvar_ms, - metrics->rto_ms, - metrics->bytes_sent, - metrics->bytes_received, - metrics->out_pkts, - metrics->out_segs, - metrics->retrans_segs, - metrics->fast_retrans_segs, - metrics->early_retrans_segs, - metrics->lost_segs, - metrics->ring_buffer_snd_queue, - metrics->ring_buffer_snd_buffer, - realtime_ms()); - if (written > 0 && written < (int) sizeof(line)) { - telemetry_write_line(runtime, line); - } -} - -static int load_worker_config(worker_config_t *cfg) { - const char *server_addr = getenv(VIDEO_SERVER_ADDR_ENV); - - if (cfg == NULL) { - errno = EINVAL; - return -1; - } - if (server_addr == NULL || server_addr[0] == '\0') { - fprintf(stderr, "%s is required\n", VIDEO_SERVER_ADDR_ENV); - errno = EINVAL; - return -1; - } - - CLEAR(*cfg); - snprintf(cfg->server_addr, sizeof(cfg->server_addr), "%s", server_addr); - snprintf(cfg->relay_addr, sizeof(cfg->relay_addr), "%s", env_or_default(VIDEO_RELAY_ADDR_ENV, "")); - snprintf(cfg->bind_ip, sizeof(cfg->bind_ip), "%s", env_or_default(VIDEO_BIND_IP_ENV, "")); - snprintf(cfg->bind_device, sizeof(cfg->bind_device), "%s", env_or_default(VIDEO_BIND_DEVICE_ENV, "")); - snprintf(cfg->peer_id, sizeof(cfg->peer_id), "%s", env_or_default(VIDEO_PEER_ID_ENV, "peer-b-video")); - snprintf(cfg->target_peer, sizeof(cfg->target_peer), "%s", env_or_default(VIDEO_TARGET_PEER_ENV, "peer-a-video")); - snprintf(cfg->video_device, sizeof(cfg->video_device), "%s", env_or_default(VIDEO_DEVICE_ENV, "/dev/video0")); - - cfg->capture_width = env_as_int(VIDEO_CAPTURE_WIDTH_ENV, 1280); - cfg->capture_height = env_as_int(VIDEO_CAPTURE_HEIGHT_ENV, 720); - cfg->output_width = env_as_int(VIDEO_OUTPUT_WIDTH_ENV, 640); - cfg->output_height = env_as_int(VIDEO_OUTPUT_HEIGHT_ENV, 360); - cfg->initial_fps = env_as_int(VIDEO_FPS_ENV, 10); - cfg->initial_qscale = env_as_int(VIDEO_JPEG_QSCALE_ENV, 8); - cfg->initial_max_frame_bytes = env_as_int(VIDEO_MAX_FRAME_BYTES_ENV, 40960); - cfg->stats_interval_ms = env_as_int(VIDEO_STATS_INTERVAL_ENV, 100); - return 0; -} - -static int open_v4l2_device(const char *device) { - int fd = open(device, O_RDWR | O_NONBLOCK); - if (fd < 0) { - perror("open device"); - } - return fd; -} - -static int init_v4l2_device(int fd, const worker_config_t *cfg) { - struct v4l2_format fmt; - - CLEAR(fmt); - fmt.type = V4L2_BUF_TYPE_VIDEO_CAPTURE; - fmt.fmt.pix.width = cfg->capture_width; - fmt.fmt.pix.height = cfg->capture_height; - fmt.fmt.pix.pixelformat = V4L2_PIX_FMT_MJPEG; - fmt.fmt.pix.field = V4L2_FIELD_NONE; - - if (ioctl(fd, VIDIOC_S_FMT, &fmt) < 0) { - perror("VIDIOC_S_FMT"); - return -1; - } - return 0; -} - -static int init_mmap(int fd, Buffer **buffers, int *num_buffers) { - struct v4l2_requestbuffers req; - int index; - - CLEAR(req); - req.count = NUM_BUFFERS; - req.type = V4L2_BUF_TYPE_VIDEO_CAPTURE; - req.memory = V4L2_MEMORY_MMAP; - - if (ioctl(fd, VIDIOC_REQBUFS, &req) < 0) { - perror("VIDIOC_REQBUFS"); - return -1; - } - - *num_buffers = (int) req.count; - *buffers = (Buffer *) calloc(req.count, sizeof(Buffer)); - if (*buffers == NULL) { - return -1; - } - - for (index = 0; index < (int) req.count; ++index) { - struct v4l2_buffer buf; - CLEAR(buf); - buf.type = V4L2_BUF_TYPE_VIDEO_CAPTURE; - buf.memory = V4L2_MEMORY_MMAP; - buf.index = (unsigned int) index; - - if (ioctl(fd, VIDIOC_QUERYBUF, &buf) < 0) { - perror("VIDIOC_QUERYBUF"); - return -1; - } - - (*buffers)[index].length = buf.length; - (*buffers)[index].start = mmap(NULL, buf.length, PROT_READ | PROT_WRITE, MAP_SHARED, fd, buf.m.offset); - if ((*buffers)[index].start == MAP_FAILED) { - perror("mmap"); - return -1; - } - } - - return 0; -} - -static int queue_all_buffers(int fd, int num_buffers) { - int index; - for (index = 0; index < num_buffers; ++index) { - struct v4l2_buffer buf; - CLEAR(buf); - buf.type = V4L2_BUF_TYPE_VIDEO_CAPTURE; - buf.memory = V4L2_MEMORY_MMAP; - buf.index = (unsigned int) index; - if (ioctl(fd, VIDIOC_QBUF, &buf) < 0) { - perror("VIDIOC_QBUF"); - return -1; - } - } - return 0; -} - -static AVCodecContext *create_mjpeg_decoder(const worker_config_t *cfg) { - const AVCodec *decoder = avcodec_find_decoder(AV_CODEC_ID_MJPEG); - AVCodecContext *ctx; - - if (decoder == NULL) { - fprintf(stderr, "MJPEG decoder not found\n"); - return NULL; - } - ctx = avcodec_alloc_context3(decoder); - if (ctx == NULL) { - return NULL; - } - - ctx->width = cfg->capture_width; - ctx->height = cfg->capture_height; - ctx->pix_fmt = AV_PIX_FMT_YUVJ420P; - ctx->color_range = AVCOL_RANGE_JPEG; - ctx->thread_count = 1; - if (avcodec_open2(ctx, decoder, NULL) < 0) { - avcodec_free_context(&ctx); - return NULL; - } - return ctx; -} - -static AVCodecContext *create_mjpeg_encoder(const worker_config_t *cfg) { - const AVCodec *encoder = avcodec_find_encoder(AV_CODEC_ID_MJPEG); - AVCodecContext *ctx; - - if (encoder == NULL) { - fprintf(stderr, "MJPEG encoder not found\n"); - return NULL; - } - ctx = avcodec_alloc_context3(encoder); - if (ctx == NULL) { - return NULL; - } - - ctx->width = cfg->output_width; - ctx->height = cfg->output_height; - ctx->pix_fmt = AV_PIX_FMT_YUVJ420P; - ctx->time_base = (AVRational) {1, cfg->initial_fps > 0 ? cfg->initial_fps : 10}; - ctx->qmin = 1; - ctx->qmax = 31; - ctx->flags |= AV_CODEC_FLAG_QSCALE; - ctx->global_quality = FF_QP2LAMBDA * cfg->initial_qscale; - if (avcodec_open2(ctx, encoder, NULL) < 0) { - avcodec_free_context(&ctx); - return NULL; - } - return ctx; -} - -static int decode_mjpeg_frame(AVCodecContext *decoder, const uint8_t *data, int size, AVFrame **frame) { - AVPacket *pkt; - int ret; - - *frame = NULL; - pkt = av_packet_alloc(); - if (pkt == NULL) { - return -1; - } - - pkt->data = (uint8_t *) data; - pkt->size = size; - ret = avcodec_send_packet(decoder, pkt); - if (ret < 0) { - av_packet_free(&pkt); - return -1; - } - - *frame = av_frame_alloc(); - if (*frame == NULL) { - av_packet_free(&pkt); - return -1; - } - ret = avcodec_receive_frame(decoder, *frame); - av_packet_free(&pkt); - if (ret < 0) { - av_frame_free(frame); - return -1; - } - return 0; -} - -static struct SwsContext *create_scaler(AVFrame *src, const worker_config_t *cfg) { - return sws_getContext( - src->width, - src->height, - src->format, - cfg->output_width, - cfg->output_height, - AV_PIX_FMT_YUVJ420P, - SWS_BILINEAR, - NULL, - NULL, - NULL); -} - -static int scale_frame(struct SwsContext *sws_ctx, const worker_config_t *cfg, AVFrame *src, AVFrame **dst) { - int ret; - - if (sws_ctx == NULL) { - return -1; - } - - *dst = av_frame_alloc(); - if (*dst == NULL) { - return -1; - } - - (*dst)->width = cfg->output_width; - (*dst)->height = cfg->output_height; - (*dst)->format = AV_PIX_FMT_YUVJ420P; - if (av_frame_get_buffer(*dst, 0) < 0) { - av_frame_free(dst); - return -1; - } - - ret = sws_scale( - sws_ctx, - (const uint8_t *const *) src->data, - src->linesize, - 0, - src->height, - (*dst)->data, - (*dst)->linesize); - if (ret < 0) { - av_frame_free(dst); - return -1; - } - return 0; -} - -static int encode_frame(AVCodecContext *encoder, AVFrame *frame, int qscale, AVPacket **pkt) { - int ret; - - *pkt = av_packet_alloc(); - if (*pkt == NULL) { - return -1; - } - - encoder->global_quality = FF_QP2LAMBDA * qscale; - frame->quality = encoder->global_quality; - ret = avcodec_send_frame(encoder, frame); - if (ret < 0) { - av_packet_free(pkt); - return -1; - } - ret = avcodec_receive_packet(encoder, *pkt); - if (ret < 0) { - av_packet_free(pkt); - return -1; - } - return 0; -} - -static int video_sender_init(video_sender_t *sender, const worker_config_t *cfg) { - kcp_conn_options_t options; - - if (sender == NULL) { - errno = EINVAL; - return -1; - } - - CLEAR(*sender); - snprintf(sender->target_peer, sizeof(sender->target_peer), "%s", cfg->target_peer); - kcp_conn_options_set_video_defaults(&options); - sender->client = kcp_client_dial_with_options( - cfg->server_addr, - cfg->relay_addr, - cfg->peer_id, - cfg->bind_ip, - cfg->bind_device, - &options, - NULL, - NULL, - NULL, - cfg->stats_interval_ms); - if (sender->client == NULL) { - return -1; - } - - fprintf(stderr, "B-side video worker connected as %s -> %s\n", kcp_client_id(sender->client), sender->target_peer); - return 0; -} - -static int video_sender_send_packet(video_sender_t *sender, const AVPacket *encoded_pkt) { - if (sender == NULL || sender->client == NULL || encoded_pkt == NULL) { - errno = EINVAL; - return -1; - } - return kcp_client_send_binary(sender->client, sender->target_peer, encoded_pkt->data, (size_t) encoded_pkt->size); -} - -static void video_sender_close(video_sender_t *sender) { - if (sender == NULL || sender->client == NULL) { - return; - } - kcp_client_close(sender->client); - kcp_client_free(sender->client); - sender->client = NULL; -} - -static void *control_thread_main(void *opaque) { - control_thread_args_t *args = (control_thread_args_t *) opaque; - char line[512]; - - while (fgets(line, sizeof(line), args->control_stream) != NULL) { - cJSON *root = cJSON_Parse(line); - cJSON *type_item; - - if (root == NULL) { - continue; - } - type_item = cJSON_GetObjectItemCaseSensitive(root, "type"); - if (!cJSON_IsString(type_item) || type_item->valuestring == NULL) { - cJSON_Delete(root); - continue; - } - - if (strcmp(type_item->valuestring, "shutdown") == 0) { - runtime_set_shutdown(args->runtime, true); - cJSON_Delete(root); - return NULL; - } - if (strcmp(type_item->valuestring, "set_profile") == 0) { - cJSON *fps = cJSON_GetObjectItemCaseSensitive(root, "fps"); - cJSON *qscale = cJSON_GetObjectItemCaseSensitive(root, "jpeg_quality_qscale"); - cJSON *max_frame_bytes = cJSON_GetObjectItemCaseSensitive(root, "max_frame_bytes"); - runtime_set_profile( - args->runtime, - cJSON_IsNumber(fps) ? fps->valueint : -1, - cJSON_IsNumber(qscale) ? qscale->valueint : -1, - cJSON_IsNumber(max_frame_bytes) ? max_frame_bytes->valueint : -1); - } - cJSON_Delete(root); - } - - runtime_set_shutdown(args->runtime, true); - return NULL; -} - -int main(void) { - worker_config_t cfg; - runtime_state_t runtime; - video_sender_t sender; - control_thread_args_t control_args; - pthread_t control_thread; - FILE *control_stream = NULL; - FILE *telemetry_stream = NULL; - Buffer *buffers = NULL; - int num_buffers = 0; - int camera_fd = -1; - enum v4l2_buf_type stream_type = V4L2_BUF_TYPE_VIDEO_CAPTURE; - AVCodecContext *decoder = NULL; - AVCodecContext *encoder = NULL; - struct SwsContext *sws_ctx = NULL; - double next_deadline_ms = 0.0; - double next_metrics_ms = 0.0; - int exit_code = 1; - int index; - int sws_src_width = 0; - int sws_src_height = 0; - int sws_src_format = -1; - bool control_thread_started = false; - - av_log_set_level(AV_LOG_ERROR); - signal(SIGINT, handle_signal); - signal(SIGTERM, handle_signal); - CLEAR(sender); - - if (load_worker_config(&cfg) != 0) { - return 1; - } - - CLEAR(runtime); - pthread_mutex_init(&runtime.lock, NULL); - pthread_mutex_init(&runtime.telemetry_lock, NULL); - runtime.target_fps = cfg.initial_fps; - runtime.jpeg_quality_qscale = cfg.initial_qscale; - runtime.max_frame_bytes = cfg.initial_max_frame_bytes; - - control_stream = fdopen(WORKER_CONTROL_FD, "r"); - telemetry_stream = fdopen(WORKER_TELEMETRY_FD, "w"); - if (control_stream == NULL || telemetry_stream == NULL) { - perror("fdopen worker control/telemetry"); - goto cleanup; - } - setvbuf(telemetry_stream, NULL, _IOLBF, 0); - runtime.telemetry_stream = telemetry_stream; - - control_args.control_stream = control_stream; - control_args.runtime = &runtime; - if (pthread_create(&control_thread, NULL, control_thread_main, &control_args) != 0) { - perror("pthread_create"); - goto cleanup; - } - control_thread_started = true; - - camera_fd = open_v4l2_device(cfg.video_device); - if (camera_fd < 0) { - goto cleanup_join_thread; - } - if (init_v4l2_device(camera_fd, &cfg) != 0) { - goto cleanup_join_thread; - } - if (init_mmap(camera_fd, &buffers, &num_buffers) != 0) { - goto cleanup_join_thread; - } - if (queue_all_buffers(camera_fd, num_buffers) != 0) { - goto cleanup_join_thread; - } - if (ioctl(camera_fd, VIDIOC_STREAMON, &stream_type) < 0) { - perror("VIDIOC_STREAMON"); - goto cleanup_join_thread; - } - - decoder = create_mjpeg_decoder(&cfg); - encoder = create_mjpeg_encoder(&cfg); - if (decoder == NULL || encoder == NULL) { - fprintf(stderr, "failed to create codecs\n"); - goto cleanup_join_thread; - } - if (video_sender_init(&sender, &cfg) != 0) { - perror("video_sender_init"); - goto cleanup_join_thread; - } - - next_deadline_ms = monotonic_ms(); - next_metrics_ms = next_deadline_ms + 100.0; - - while (!runtime_should_stop(&runtime)) { - AVFrame *decoded_frame = NULL; - AVFrame *scaled_frame = NULL; - AVPacket *encoded_pkt = NULL; - struct v4l2_buffer buf; - fd_set fds; - struct timeval tv; - int select_result; - int fps; - int qscale; - int max_frame_bytes; - int encode_us = 0; - bool sent = false; - const char *drop_reason = ""; - double now_ms = monotonic_ms(); - double encode_start_ms; - double encode_end_ms; - - runtime_get_profile(&runtime, &fps, &qscale, &max_frame_bytes); - if (fps < 1) { - fps = 1; - } - if (next_deadline_ms > now_ms) { - usleep((useconds_t) ((next_deadline_ms - now_ms) * 1000.0)); - } - next_deadline_ms = monotonic_ms() + (1000.0 / (double) fps); - - FD_ZERO(&fds); - FD_SET(camera_fd, &fds); - tv.tv_sec = 2; - tv.tv_usec = 0; - select_result = select(camera_fd + 1, &fds, NULL, NULL, &tv); - if (select_result <= 0) { - if (select_result < 0 && errno == EINTR) { - continue; - } - fprintf(stderr, "select timeout/error on camera\n"); - continue; - } - - CLEAR(buf); - buf.type = V4L2_BUF_TYPE_VIDEO_CAPTURE; - buf.memory = V4L2_MEMORY_MMAP; - if (ioctl(camera_fd, VIDIOC_DQBUF, &buf) < 0) { - if (errno == EAGAIN) { - continue; - } - perror("VIDIOC_DQBUF"); - break; - } - - if (decode_mjpeg_frame(decoder, (uint8_t *) buffers[buf.index].start, (int) buf.bytesused, &decoded_frame) != 0) { - drop_reason = "decode_failed"; - goto requeue_and_report; - } - - if ( - sws_ctx == NULL - || sws_src_width != decoded_frame->width - || sws_src_height != decoded_frame->height - || sws_src_format != decoded_frame->format - ) { - sws_freeContext(sws_ctx); - sws_ctx = create_scaler(decoded_frame, &cfg); - sws_src_width = decoded_frame->width; - sws_src_height = decoded_frame->height; - sws_src_format = decoded_frame->format; - } - if (scale_frame(sws_ctx, &cfg, decoded_frame, &scaled_frame) != 0) { - drop_reason = "scale_failed"; - goto requeue_and_report; - } - - encode_start_ms = monotonic_ms(); - if (encode_frame(encoder, scaled_frame, qscale, &encoded_pkt) != 0) { - drop_reason = "encode_failed"; - goto requeue_and_report; - } - encode_end_ms = monotonic_ms(); - encode_us = (int) ((encode_end_ms - encode_start_ms) * 1000.0); - - if (encoded_pkt->size > max_frame_bytes) { - drop_reason = "frame_too_large"; - goto requeue_and_report; - } - if (video_sender_send_packet(&sender, encoded_pkt) != 0) { - perror("video_sender_send_packet"); - drop_reason = "send_failed"; - goto requeue_and_report; - } - sent = true; - -requeue_and_report: - telemetry_write_frame_stat( - &runtime, - encoded_pkt != NULL ? encoded_pkt->size : 0, - encode_us, - sent, - drop_reason); - - if (ioctl(camera_fd, VIDIOC_QBUF, &buf) < 0) { - perror("VIDIOC_QBUF"); - av_frame_free(&decoded_frame); - av_frame_free(&scaled_frame); - if (encoded_pkt != NULL) { - av_packet_free(&encoded_pkt); - } - break; - } - - av_frame_free(&decoded_frame); - av_frame_free(&scaled_frame); - if (encoded_pkt != NULL) { - av_packet_free(&encoded_pkt); - } - - now_ms = monotonic_ms(); - if (sender.client != NULL && now_ms >= next_metrics_ms) { - kcp_conn_metrics_t metrics; - if (kcp_client_metrics_snapshot(sender.client, &metrics) == 0) { - telemetry_write_kcp_metrics(&runtime, &metrics); - } - next_metrics_ms = now_ms + 100.0; - } - } - - exit_code = 0; - -cleanup_join_thread: - runtime_set_shutdown(&runtime, true); - if (control_stream != NULL) { - fclose(control_stream); - control_stream = NULL; - } - if (control_thread_started) { - pthread_join(control_thread, NULL); - } - -cleanup: - if (control_stream != NULL) { - fclose(control_stream); - control_stream = NULL; - } - if (camera_fd >= 0) { - ioctl(camera_fd, VIDIOC_STREAMOFF, &stream_type); - } - if (buffers != NULL) { - for (index = 0; index < num_buffers; ++index) { - if (buffers[index].start != NULL && buffers[index].start != MAP_FAILED) { - munmap(buffers[index].start, buffers[index].length); - } - } - free(buffers); - } - video_sender_close(&sender); - if (encoder != NULL) { - avcodec_free_context(&encoder); - } - if (decoder != NULL) { - avcodec_free_context(&decoder); - } - sws_freeContext(sws_ctx); - if (camera_fd >= 0) { - close(camera_fd); - } - if (telemetry_stream != NULL) { - fclose(telemetry_stream); - telemetry_stream = NULL; - } - pthread_mutex_destroy(&runtime.lock); - pthread_mutex_destroy(&runtime.telemetry_lock); - return exit_code; -} diff --git a/config/b_side_omnidaemon.yaml b/config/b_side_omnidaemon.yaml deleted file mode 100644 index 633a8f6..0000000 --- a/config/b_side_omnidaemon.yaml +++ /dev/null @@ -1,58 +0,0 @@ -transport: - server_addr: "81.70.156.140:10909" - relay_via: "106.55.173.235:10909" - bind_ip: "" - bind_device: "" - -control_receiver: - peer_id: "peer-b-ctrl" - nodelay: 1 - interval_ms: 5 - resend: 2 - nc: 1 - sndwnd: 32 - rcvwnd: 32 - mtu: 1400 - stats_interval_ms: 100 - queue_capacity: 256 - -video_sender: - enabled: false - peer_id: "peer-b-video" - target_peer: "peer-a-video" - binary_path: "bin/b_side_video_sender" - device: "/dev/video0" - capture_width: 1280 - capture_height: 720 - output_width: 640 - output_height: 360 - fps: 10 - jpeg_quality_qscale: 8 - max_frame_bytes: 40960 - stats_interval_ms: 100 - -daemon: - socket_path: "/tmp/omnisocket-b-side.sock" - ctrl_socket_path: "/tmp/omnisocket-b-ctrl.sock" - reconnect_delay_ms: 2000 - telemetry_interval_ms: 100 - worker_restart_delay_ms: 2000 - -policy: - mode: "auto" - health_window_ms: 2000 - green_srtt_ms: 30 - yellow_srtt_ms: 55 - retrans_red_threshold: 8 - profile_green: - fps: 10 - jpeg_quality_qscale: 8 - max_frame_bytes: 40960 - profile_yellow: - fps: 7 - jpeg_quality_qscale: 12 - max_frame_bytes: 28672 - profile_red: - fps: 5 - jpeg_quality_qscale: 16 - max_frame_bytes: 20480 diff --git a/python/omnisocket_b_side/__init__.py b/python/omnisocket_b_side/__init__.py deleted file mode 100644 index 5d76152..0000000 --- a/python/omnisocket_b_side/__init__.py +++ /dev/null @@ -1,10 +0,0 @@ -from pathlib import Path - - -PACKAGE_ROOT = Path(__file__).resolve().parent -PYTHON_ROOT = PACKAGE_ROOT.parent -REPO_ROOT = PYTHON_ROOT.parent -DEFAULT_SOCKET_PATH = "/tmp/omnisocket-b-side.sock" -DEFAULT_CTRL_SOCKET_PATH = "/tmp/omnisocket-b-ctrl.sock" -DEFAULT_CONFIG_PATH = REPO_ROOT / "config" / "b_side_omnidaemon.yaml" -VERSION = "0.1.0" diff --git a/python/omnisocket_b_side/__main__.py b/python/omnisocket_b_side/__main__.py deleted file mode 100644 index c5a0ee7..0000000 --- a/python/omnisocket_b_side/__main__.py +++ /dev/null @@ -1,5 +0,0 @@ -from .daemon import main - - -if __name__ == "__main__": - main() diff --git a/python/omnisocket_b_side/client.py b/python/omnisocket_b_side/client.py deleted file mode 100644 index 2020efa..0000000 --- a/python/omnisocket_b_side/client.py +++ /dev/null @@ -1,54 +0,0 @@ -"""Local AF_UNIX SOCK_SEQPACKET client for B-side control delivery.""" - -from __future__ import annotations - -import os -import socket - -from omnisocket_a_side.control_codec import CONTROL_PACKET_STRUCT - -from . import DEFAULT_CTRL_SOCKET_PATH - - -class BSideControlClient: - def __init__(self, socket_path: str | None = None) -> None: - self.socket_path = socket_path or os.getenv( - "OMNIBDAEMON_CTRL_SOCKET", - DEFAULT_CTRL_SOCKET_PATH, - ) - self._sock: socket.socket | None = None - - def connect(self) -> None: - if not hasattr(socket, "AF_UNIX"): - raise OSError("AF_UNIX sockets are not available on this platform") - if not hasattr(socket, "SOCK_SEQPACKET"): - raise OSError("SOCK_SEQPACKET is not available on this platform") - sock = socket.socket(socket.AF_UNIX, socket.SOCK_SEQPACKET) - sock.connect(self.socket_path) - self._sock = sock - - def recv_control_packet(self, timeout_ms: int = 100) -> bytes | None: - if self._sock is None: - raise RuntimeError("B-side control client is not connected") - - self._sock.settimeout(max(0.001, timeout_ms / 1000.0)) - try: - payload = self._sock.recv(CONTROL_PACKET_STRUCT.size) - except socket.timeout: - return None - except BlockingIOError: - return None - - if payload == b"": - raise ConnectionResetError("daemon control socket closed") - if len(payload) != CONTROL_PACKET_STRUCT.size: - return None - return payload - - def close(self) -> None: - if self._sock is None: - return - try: - self._sock.close() - finally: - self._sock = None diff --git a/python/omnisocket_b_side/daemon.py b/python/omnisocket_b_side/daemon.py deleted file mode 100644 index 162786d..0000000 --- a/python/omnisocket_b_side/daemon.py +++ /dev/null @@ -1,1318 +0,0 @@ -"""B-side OmniDaemon that owns control receive and manages video send workers.""" - -from __future__ import annotations - -import argparse -import copy -from dataclasses import dataclass -from datetime import UTC, datetime -from http import HTTPStatus -from http.server import BaseHTTPRequestHandler -import json -import os -from pathlib import Path -import queue -import signal -import socket -import socketserver -import subprocess -import threading -import time -from typing import Any - -import yaml - -from omnisocket_a_side.control_codec import CONTROL_PACKET_STRUCT - -from . import ( - DEFAULT_CONFIG_PATH, - DEFAULT_CTRL_SOCKET_PATH, - DEFAULT_SOCKET_PATH, - REPO_ROOT, - VERSION, -) - - -def utc_iso_now() -> str: - return datetime.now(UTC).isoformat(timespec="seconds").replace("+00:00", "Z") - - -def load_omnisocket_api(): - from omnisocket import CONTROL_DEFAULTS, MSG_TYPE_BINARY, Session - - return CONTROL_DEFAULTS, MSG_TYPE_BINARY, Session - - -def _merge_kcp_defaults(defaults: dict[str, Any], override: dict[str, Any]) -> dict[str, Any]: - merged = dict(defaults) - for key in ("nodelay", "interval_ms", "resend", "nc", "sndwnd", "rcvwnd", "mtu"): - if key in override: - merged[key] = int(override[key]) - return merged - - -def _default_stats() -> dict[str, int]: - return { - "send_calls": 0, - "send_bytes": 0, - "send_errors": 0, - "recv_calls": 0, - "recv_bytes": 0, - "recv_timeouts": 0, - "recv_errors": 0, - "connected": 0, - } - - -def _default_kcp_metrics() -> dict[str, Any]: - return { - "connected": 0, - "has_conv": 0, - "conv": 0, - "local_addr": "", - "remote_addr": "", - "rto_ms": 0, - "srtt_ms": 0, - "srttvar_ms": 0, - "bytes_sent": 0, - "bytes_received": 0, - "in_pkts": 0, - "out_pkts": 0, - "in_segs": 0, - "out_segs": 0, - "retrans_segs": 0, - "fast_retrans_segs": 0, - "early_retrans_segs": 0, - "lost_segs": 0, - "repeat_segs": 0, - "in_errs": 0, - "kcp_in_errs": 0, - "ring_buffer_snd_queue": 0, - "ring_buffer_rcv_queue": 0, - "ring_buffer_snd_buffer": 0, - } - - -@dataclass(slots=True) -class VideoProfile: - fps: int - jpeg_quality_qscale: int - max_frame_bytes: int - - def as_dict(self) -> dict[str, int]: - return { - "fps": int(self.fps), - "jpeg_quality_qscale": int(self.jpeg_quality_qscale), - "max_frame_bytes": int(self.max_frame_bytes), - } - - -def _load_config(config_path: str | None) -> dict[str, Any]: - path = Path(os.getenv("OMNIBDAEMON_CONFIG", config_path or str(DEFAULT_CONFIG_PATH))) - raw: dict[str, Any] = {} - if path.exists(): - with path.open("r", encoding="utf-8") as file: - raw = yaml.safe_load(file) or {} - - control_defaults, _msg_type_binary, _session_cls = load_omnisocket_api() - transport = dict(raw.get("transport", {})) - control = dict(raw.get("control_receiver", {})) - video = dict(raw.get("video_sender", {})) - daemon_cfg = dict(raw.get("daemon", {})) - policy = dict(raw.get("policy", {})) - - def _profile(name: str, fps: int, qscale: int, max_frame_bytes: int) -> dict[str, int]: - section = dict(policy.get(name, {})) - return { - "fps": int(section.get("fps", fps)), - "jpeg_quality_qscale": int(section.get("jpeg_quality_qscale", qscale)), - "max_frame_bytes": int(section.get("max_frame_bytes", max_frame_bytes)), - } - - binary_path = str(video.get("binary_path", "bin/b_side_video_sender")) - if not os.path.isabs(binary_path): - binary_path = str((REPO_ROOT / binary_path).resolve()) - - return { - "config_path": str(path), - "transport": { - "server_addr": str(transport.get("server_addr", "")), - "relay_via": str(transport.get("relay_via", "")), - "bind_ip": str(transport.get("bind_ip", "")), - "bind_device": str(transport.get("bind_device", "")), - }, - "control_receiver": { - "peer_id": str(control.get("peer_id", "peer-b-ctrl")), - "stats_interval_ms": int(control.get("stats_interval_ms", 100)), - "queue_capacity": int(control.get("queue_capacity", 256)), - "kcp": _merge_kcp_defaults(control_defaults, control), - }, - "video_sender": { - "enabled": bool(video.get("enabled", False)), - "binary_path": binary_path, - "peer_id": str(video.get("peer_id", "peer-b-video")), - "target_peer": str(video.get("target_peer", "peer-a-video")), - "stats_interval_ms": int(video.get("stats_interval_ms", 100)), - "device": str(video.get("device", "/dev/video0")), - "capture_width": int(video.get("capture_width", 1280)), - "capture_height": int(video.get("capture_height", 720)), - "output_width": int(video.get("output_width", 640)), - "output_height": int(video.get("output_height", 360)), - "initial_profile": { - "fps": int(video.get("fps", 10)), - "jpeg_quality_qscale": int(video.get("jpeg_quality_qscale", 8)), - "max_frame_bytes": int(video.get("max_frame_bytes", 40960)), - }, - }, - "daemon": { - "socket_path": os.getenv( - "OMNIBDAEMON_SOCKET", - str(daemon_cfg.get("socket_path", DEFAULT_SOCKET_PATH)), - ), - "ctrl_socket_path": os.getenv( - "OMNIBDAEMON_CTRL_SOCKET", - str(daemon_cfg.get("ctrl_socket_path", DEFAULT_CTRL_SOCKET_PATH)), - ), - "reconnect_delay_ms": int(daemon_cfg.get("reconnect_delay_ms", 2000)), - "telemetry_interval_ms": int(daemon_cfg.get("telemetry_interval_ms", 100)), - "worker_restart_delay_ms": int(daemon_cfg.get("worker_restart_delay_ms", 2000)), - }, - "policy": { - "mode": str(policy.get("mode", "auto")).lower(), - "health_window_ms": int(policy.get("health_window_ms", 2000)), - "green_srtt_ms": int(policy.get("green_srtt_ms", 30)), - "yellow_srtt_ms": int(policy.get("yellow_srtt_ms", 55)), - "retrans_red_threshold": int(policy.get("retrans_red_threshold", 8)), - "profile_green": _profile("profile_green", 10, 8, 40960), - "profile_yellow": _profile("profile_yellow", 7, 12, 28672), - "profile_red": _profile("profile_red", 5, 16, 20480), - }, - } - - -class ControlRecvManager: - def __init__(self, config: dict[str, Any]) -> None: - control_defaults, msg_type_binary, session_cls = load_omnisocket_api() - transport = config["transport"] - control_cfg = config["control_receiver"] - daemon_cfg = config["daemon"] - - self._msg_type_binary = msg_type_binary - self._session_cls = session_cls - self._connect_kwargs = { - "server_addr": transport["server_addr"], - "relay_via": transport["relay_via"], - "bind_ip": transport["bind_ip"], - "bind_device": transport["bind_device"], - "peer_id": control_cfg["peer_id"], - "stats_interval_ms": control_cfg["stats_interval_ms"], - **_merge_kcp_defaults(control_defaults, control_cfg["kcp"]), - } - self._reconnect_delay_s = max(0.25, daemon_cfg["reconnect_delay_ms"] / 1000.0) - self._queue: queue.Queue[bytes] = queue.Queue(maxsize=control_cfg["queue_capacity"]) - - self._lock = threading.Lock() - self._stop_event = threading.Event() - self._thread = threading.Thread(target=self._run, name="omni-b-side-control", daemon=True) - self._session = None - self._connected = False - self._last_error = "" - self._last_connect_attempt = 0.0 - self._packets_received = 0 - self._packets_enqueued = 0 - self._ignored_non_binary = 0 - self._ignored_bad_length = 0 - self._dropped_queue_full = 0 - - @property - def packet_queue(self) -> queue.Queue[bytes]: - return self._queue - - def start(self) -> None: - self._thread.start() - - def stop(self) -> None: - self._stop_event.set() - if self._thread.is_alive(): - self._thread.join(timeout=2.0) - self._disconnect("control receiver stopped") - - def snapshot(self) -> dict[str, Any]: - with self._lock: - session = self._session - connected = self._connected and session is not None - last_error = self._last_error - packets_received = self._packets_received - packets_enqueued = self._packets_enqueued - ignored_non_binary = self._ignored_non_binary - ignored_bad_length = self._ignored_bad_length - dropped_queue_full = self._dropped_queue_full - - stats = _default_stats() - metrics = _default_kcp_metrics() - if session is not None: - try: - stats = dict(session.stats()) - metrics = dict(session.kcp_metrics()) - connected = bool(stats.get("connected") and metrics.get("connected")) - except Exception as error: # pragma: no cover - runtime integration - connected = False - last_error = str(error) - - return { - "connected": connected, - "peer_id": self._connect_kwargs["peer_id"], - "server_addr": self._connect_kwargs["server_addr"], - "relay_via": self._connect_kwargs["relay_via"], - "queue_depth": self._queue.qsize(), - "packets_received": packets_received, - "packets_enqueued": packets_enqueued, - "ignored_non_binary": ignored_non_binary, - "ignored_bad_length": ignored_bad_length, - "dropped_queue_full": dropped_queue_full, - "last_error": last_error, - "stats": stats, - "kcp_metrics": metrics, - } - - def _run(self) -> None: - while not self._stop_event.is_set(): - if not self._is_connected(): - now = time.monotonic() - if now - self._last_connect_attempt >= self._reconnect_delay_s: - self._last_connect_attempt = now - self._connect() - time.sleep(0.2) - continue - - with self._lock: - session = self._session - if session is None: - continue - - try: - item = session.recv(timeout_ms=200) - except Exception as error: # pragma: no cover - runtime integration - self._disconnect(str(error)) - continue - - if item is None: - continue - - _from_peer, msg_type, payload = item - with self._lock: - self._packets_received += 1 - - if msg_type != self._msg_type_binary: - with self._lock: - self._ignored_non_binary += 1 - continue - if len(payload) != CONTROL_PACKET_STRUCT.size: - with self._lock: - self._ignored_bad_length += 1 - continue - self._enqueue_packet(bytes(payload)) - - def _connect(self) -> None: - session = self._session_cls() - try: - session.connect(**self._connect_kwargs) - except Exception as error: # pragma: no cover - runtime integration - with self._lock: - self._connected = False - self._last_error = str(error) - try: - session.close() - except Exception: - pass - return - - with self._lock: - self._session = session - self._connected = True - self._last_error = "" - - def _disconnect(self, error_message: str) -> None: - with self._lock: - session = self._session - self._session = None - self._connected = False - if error_message: - self._last_error = error_message - if session is not None: - try: - session.close() - except Exception: - pass - - def _enqueue_packet(self, payload: bytes) -> None: - try: - self._queue.put_nowait(payload) - except queue.Full: - try: - self._queue.get_nowait() - except queue.Empty: - pass - with self._lock: - self._dropped_queue_full += 1 - try: - self._queue.put_nowait(payload) - except queue.Full: - with self._lock: - self._dropped_queue_full += 1 - return - with self._lock: - self._packets_enqueued += 1 - - def _is_connected(self) -> bool: - with self._lock: - return self._connected and self._session is not None - - -class ControlFanout: - def __init__(self, socket_path: str, packet_queue: queue.Queue[bytes]) -> None: - self._socket_path = socket_path - self._packet_queue = packet_queue - self._lock = threading.Lock() - self._stop_event = threading.Event() - self._accept_thread = threading.Thread(target=self._accept_loop, name="omni-b-side-ctrl-accept", daemon=True) - self._send_thread = threading.Thread(target=self._send_loop, name="omni-b-side-ctrl-send", daemon=True) - self._server_socket: socket.socket | None = None - self._consumer_socket: socket.socket | None = None - self._sent_packets = 0 - self._accepted_connections = 0 - self._dropped_no_consumer = 0 - self._dropped_send_errors = 0 - - def start(self) -> None: - socket_dir = os.path.dirname(self._socket_path) - if socket_dir: - os.makedirs(socket_dir, exist_ok=True) - try: - os.unlink(self._socket_path) - except FileNotFoundError: - pass - server_socket = socket.socket(socket.AF_UNIX, socket.SOCK_SEQPACKET) - server_socket.bind(self._socket_path) - server_socket.listen(1) - server_socket.settimeout(0.5) - self._server_socket = server_socket - self._accept_thread.start() - self._send_thread.start() - - def stop(self) -> None: - self._stop_event.set() - with self._lock: - consumer = self._consumer_socket - self._consumer_socket = None - if consumer is not None: - try: - consumer.close() - except OSError: - pass - if self._server_socket is not None: - try: - self._server_socket.close() - except OSError: - pass - self._server_socket = None - if self._accept_thread.is_alive(): - self._accept_thread.join(timeout=2.0) - if self._send_thread.is_alive(): - self._send_thread.join(timeout=2.0) - try: - os.unlink(self._socket_path) - except FileNotFoundError: - pass - - def snapshot(self) -> dict[str, Any]: - with self._lock: - consumer_connected = self._consumer_socket is not None - sent_packets = self._sent_packets - accepted_connections = self._accepted_connections - dropped_no_consumer = self._dropped_no_consumer - dropped_send_errors = self._dropped_send_errors - return { - "consumer_connected": consumer_connected, - "queue_depth": self._packet_queue.qsize(), - "sent_packets": sent_packets, - "accepted_connections": accepted_connections, - "dropped_no_consumer": dropped_no_consumer, - "dropped_send_errors": dropped_send_errors, - "socket_path": self._socket_path, - } - - def _accept_loop(self) -> None: - while not self._stop_event.is_set(): - server_socket = self._server_socket - if server_socket is None: - return - try: - consumer, _addr = server_socket.accept() - except socket.timeout: - continue - except OSError: - if self._stop_event.is_set(): - return - time.sleep(0.2) - continue - - with self._lock: - old_consumer = self._consumer_socket - self._consumer_socket = consumer - self._accepted_connections += 1 - if old_consumer is not None: - try: - old_consumer.close() - except OSError: - pass - - def _send_loop(self) -> None: - while not self._stop_event.is_set(): - try: - payload = self._packet_queue.get(timeout=0.2) - except queue.Empty: - continue - with self._lock: - consumer = self._consumer_socket - if consumer is None: - with self._lock: - self._dropped_no_consumer += 1 - continue - - try: - consumer.sendall(payload) - except OSError: - with self._lock: - self._dropped_send_errors += 1 - if self._consumer_socket is consumer: - self._consumer_socket = None - try: - consumer.close() - except OSError: - pass - continue - - with self._lock: - self._sent_packets += 1 - - -class VideoWorkerManager: - def __init__(self, config: dict[str, Any]) -> None: - self._config = config - self._transport_cfg = config["transport"] - self._video_cfg = config["video_sender"] - self._daemon_cfg = config["daemon"] - self._policy_cfg = config["policy"] - self._enabled = bool(self._video_cfg["enabled"]) - self._restart_delay_s = max(0.5, self._daemon_cfg["worker_restart_delay_ms"] / 1000.0) - - initial = self._video_cfg["initial_profile"] - initial_profile = VideoProfile( - fps=int(initial["fps"]), - jpeg_quality_qscale=int(initial["jpeg_quality_qscale"]), - max_frame_bytes=int(initial["max_frame_bytes"]), - ) - - self._lock = threading.Lock() - self._stop_event = threading.Event() - self._thread = threading.Thread(target=self._run, name="omni-b-side-video-worker", daemon=True) - self._reader_thread: threading.Thread | None = None - self._process: subprocess.Popen[str] | None = None - self._control_stream = None - self._telemetry_stream = None - self._mode = self._policy_cfg["mode"] if self._policy_cfg["mode"] in {"auto", "manual"} else "auto" - self._desired_profile = initial_profile - self._active_profile = initial_profile - self._restart_count = 0 - self._last_exit_code: int | None = None - self._last_error = "" - self._frames_total = 0 - self._frames_sent = 0 - self._frames_dropped = 0 - self._oversized_drops = 0 - self._last_frame_bytes = 0 - self._last_encode_us = 0 - self._last_drop_reason = "" - self._last_frame_ts = 0 - self._kcp_metrics = _default_kcp_metrics() - - def start(self) -> None: - if not self._enabled: - return - self._thread.start() - - def stop(self) -> None: - self._stop_event.set() - self._send_worker_command({"type": "shutdown"}) - if self._thread.is_alive(): - self._thread.join(timeout=3.0) - self._terminate_worker() - - def snapshot(self) -> dict[str, Any]: - with self._lock: - process = self._process - running = process is not None and process.poll() is None - return { - "enabled": self._enabled, - "running": running, - "mode": self._mode, - "desired_profile": self._desired_profile.as_dict(), - "active_profile": self._active_profile.as_dict(), - "restart_count": self._restart_count, - "last_exit_code": self._last_exit_code, - "last_error": self._last_error, - "frames_total": self._frames_total, - "frames_sent": self._frames_sent, - "frames_dropped": self._frames_dropped, - "oversized_drops": self._oversized_drops, - "last_frame_bytes": self._last_frame_bytes, - "last_encode_us": self._last_encode_us, - "last_drop_reason": self._last_drop_reason, - "last_frame_ts": self._last_frame_ts, - "kcp_metrics": dict(self._kcp_metrics), - "binary_path": self._video_cfg["binary_path"], - } - - def set_profile( - self, - *, - mode: str, - fps: int | None = None, - jpeg_quality_qscale: int | None = None, - max_frame_bytes: int | None = None, - ) -> dict[str, Any]: - mode = mode.lower() - if mode not in {"auto", "manual"}: - raise ValueError("mode must be auto or manual") - - if mode == "auto": - with self._lock: - self._mode = "auto" - profile = self._desired_profile - self._send_worker_command({"type": "set_profile", **profile.as_dict()}) - return self.snapshot() - - if fps is None or jpeg_quality_qscale is None or max_frame_bytes is None: - raise ValueError("manual mode requires fps, jpeg_quality_qscale, and max_frame_bytes") - - profile = VideoProfile( - fps=max(1, int(fps)), - jpeg_quality_qscale=max(1, int(jpeg_quality_qscale)), - max_frame_bytes=max(1024, int(max_frame_bytes)), - ) - with self._lock: - self._mode = "manual" - self._desired_profile = profile - self._send_worker_command({"type": "set_profile", **profile.as_dict()}) - return self.snapshot() - - def apply_auto_profile(self, profile: VideoProfile) -> None: - with self._lock: - if self._mode != "auto": - return - if self._desired_profile.as_dict() == profile.as_dict(): - return - self._desired_profile = profile - self._send_worker_command({"type": "set_profile", **profile.as_dict()}) - - def _run(self) -> None: - while not self._stop_event.is_set(): - process = self._process - if process is None: - self._spawn_worker() - time.sleep(0.2) - continue - - exit_code = process.poll() - if exit_code is None: - time.sleep(0.5) - continue - - with self._lock: - self._last_exit_code = int(exit_code) - self._last_error = f"video worker exited with code {exit_code}" - self._restart_count += 1 - self._close_worker_handles() - with self._lock: - self._process = None - if self._stop_event.wait(self._restart_delay_s): - return - - def _spawn_worker(self) -> None: - binary_path = self._video_cfg["binary_path"] - if not os.path.exists(binary_path): - with self._lock: - self._last_error = f"video worker binary not found: {binary_path}" - self._stop_event.wait(self._restart_delay_s) - return - - command_read_fd, command_write_fd = os.pipe() - telemetry_read_fd, telemetry_write_fd = os.pipe() - - def _preexec() -> None: - os.dup2(command_read_fd, 3) - os.dup2(telemetry_write_fd, 4) - for fd in (command_read_fd, command_write_fd, telemetry_read_fd, telemetry_write_fd): - if fd not in (3, 4): - try: - os.close(fd) - except OSError: - pass - - env = dict(os.environ) - env.update( - { - "OMNI_VIDEO_SERVER_ADDR": self._transport_cfg["server_addr"], - "OMNI_VIDEO_RELAY_VIA": self._transport_cfg["relay_via"], - "OMNI_VIDEO_BIND_IP": self._transport_cfg["bind_ip"], - "OMNI_VIDEO_BIND_DEVICE": self._transport_cfg["bind_device"], - "OMNI_VIDEO_PEER_ID": self._video_cfg["peer_id"], - "OMNI_VIDEO_TARGET_PEER": self._video_cfg["target_peer"], - "OMNI_VIDEO_DEVICE": self._video_cfg["device"], - "OMNI_VIDEO_CAPTURE_WIDTH": str(self._video_cfg["capture_width"]), - "OMNI_VIDEO_CAPTURE_HEIGHT": str(self._video_cfg["capture_height"]), - "OMNI_VIDEO_OUTPUT_WIDTH": str(self._video_cfg["output_width"]), - "OMNI_VIDEO_OUTPUT_HEIGHT": str(self._video_cfg["output_height"]), - "OMNI_VIDEO_STATS_INTERVAL_MS": str(self._video_cfg["stats_interval_ms"]), - } - ) - with self._lock: - profile = self._desired_profile - env["OMNI_VIDEO_FPS"] = str(profile.fps) - env["OMNI_VIDEO_JPEG_QSCALE"] = str(profile.jpeg_quality_qscale) - env["OMNI_VIDEO_MAX_FRAME_BYTES"] = str(profile.max_frame_bytes) - - try: - process = subprocess.Popen( - [binary_path], - stdin=subprocess.DEVNULL, - stdout=subprocess.DEVNULL, - stderr=None, - env=env, - close_fds=True, - pass_fds=(command_read_fd, telemetry_write_fd), - preexec_fn=_preexec, - ) - except Exception as error: # pragma: no cover - runtime integration - for fd in (command_read_fd, command_write_fd, telemetry_read_fd, telemetry_write_fd): - try: - os.close(fd) - except OSError: - pass - with self._lock: - self._last_error = str(error) - self._stop_event.wait(self._restart_delay_s) - return - - os.close(command_read_fd) - os.close(telemetry_write_fd) - control_stream = os.fdopen(command_write_fd, "w", buffering=1, encoding="utf-8") - telemetry_stream = os.fdopen( - telemetry_read_fd, - "r", - buffering=1, - encoding="utf-8", - errors="replace", - ) - - with self._lock: - self._process = process - self._control_stream = control_stream - self._telemetry_stream = telemetry_stream - self._active_profile = profile - self._last_error = "" - - reader_thread = threading.Thread( - target=self._telemetry_loop, - args=(telemetry_stream, process), - name="omni-b-side-video-telemetry", - daemon=True, - ) - self._reader_thread = reader_thread - reader_thread.start() - self._send_worker_command({"type": "set_profile", **profile.as_dict()}) - - def _telemetry_loop(self, telemetry_stream, process: subprocess.Popen[Any]) -> None: - while not self._stop_event.is_set(): - line = telemetry_stream.readline() - if not line: - return - try: - payload = json.loads(line) - except json.JSONDecodeError: - continue - if not isinstance(payload, dict): - continue - payload_type = payload.get("type") - with self._lock: - if payload_type == "frame_stat": - self._frames_total += 1 - self._last_frame_bytes = int(payload.get("frame_bytes", 0) or 0) - self._last_encode_us = int(payload.get("encode_us", 0) or 0) - self._last_drop_reason = str(payload.get("drop_reason", "")) - self._last_frame_ts = int(payload.get("ts_unix_ms", 0) or 0) - if bool(payload.get("sent", False)): - self._frames_sent += 1 - else: - self._frames_dropped += 1 - if self._last_drop_reason == "frame_too_large": - self._oversized_drops += 1 - elif payload_type == "kcp_metrics": - merged = _default_kcp_metrics() - for key in merged: - if key in payload: - merged[key] = payload[key] - self._kcp_metrics = merged - else: - self._last_error = f"unknown worker telemetry type: {payload_type}" - if process.poll() is not None: - return - - def _send_worker_command(self, payload: dict[str, Any]) -> None: - with self._lock: - control_stream = self._control_stream - if control_stream is None: - return - try: - control_stream.write(json.dumps(payload) + "\n") - control_stream.flush() - except OSError as error: - with self._lock: - self._last_error = str(error) - - def _close_worker_handles(self) -> None: - with self._lock: - control_stream = self._control_stream - telemetry_stream = self._telemetry_stream - self._control_stream = None - self._telemetry_stream = None - if control_stream is not None: - try: - control_stream.close() - except OSError: - pass - if telemetry_stream is not None: - try: - telemetry_stream.close() - except OSError: - pass - - def _terminate_worker(self) -> None: - self._close_worker_handles() - with self._lock: - process = self._process - self._process = None - if process is None: - return - if process.poll() is None: - process.terminate() - try: - process.wait(timeout=2.0) - except subprocess.TimeoutExpired: - process.kill() - try: - process.wait(timeout=2.0) - except subprocess.TimeoutExpired: - pass - - -class PolicyEngine: - def __init__(self, policy_cfg: dict[str, Any]) -> None: - self._policy_cfg = policy_cfg - - def classify( - self, - *, - control_connected: bool, - consumer_connected: bool, - avg_srtt_ms: float, - retrans_delta: int, - video_oversized_recent: bool, - video_backlogged: bool, - ) -> tuple[str, VideoProfile]: - if not control_connected or not consumer_connected: - band = "red" - elif avg_srtt_ms >= self._policy_cfg["yellow_srtt_ms"] or retrans_delta >= self._policy_cfg["retrans_red_threshold"]: - band = "red" - elif avg_srtt_ms >= self._policy_cfg["green_srtt_ms"] or retrans_delta > 0: - band = "yellow" - else: - band = "green" - - if band == "green" and (video_oversized_recent or video_backlogged): - band = "yellow" - elif band == "yellow" and (video_oversized_recent or video_backlogged): - band = "red" - - profile_raw = self._policy_cfg[f"profile_{band}"] - return band, VideoProfile( - fps=int(profile_raw["fps"]), - jpeg_quality_qscale=int(profile_raw["jpeg_quality_qscale"]), - max_frame_bytes=int(profile_raw["max_frame_bytes"]), - ) - - -class TelemetrySampler: - def __init__( - self, - config: dict[str, Any], - control_manager: ControlRecvManager, - fanout: ControlFanout, - video_worker: VideoWorkerManager, - ) -> None: - self._config = config - self._control_manager = control_manager - self._fanout = fanout - self._video_worker = video_worker - self._policy_engine = PolicyEngine(config["policy"]) - self._interval_s = max(0.1, config["daemon"]["telemetry_interval_ms"] / 1000.0) - self._window_s = max(self._interval_s, config["policy"]["health_window_ms"] / 1000.0) - - self._lock = threading.Lock() - self._stop_event = threading.Event() - self._thread = threading.Thread(target=self._run, name="omni-b-side-telemetry", daemon=True) - self._started_at = utc_iso_now() - self._state = self._build_initial_state(config) - self._history: list[dict[str, Any]] = [] - self._last_totals: dict[str, float] | None = None - - def start(self) -> None: - self._thread.start() - - def stop(self) -> None: - self._stop_event.set() - if self._thread.is_alive(): - self._thread.join(timeout=2.0) - - def snapshot(self) -> dict[str, Any]: - with self._lock: - return copy.deepcopy(self._state) - - def _run(self) -> None: - while not self._stop_event.is_set(): - self._sample_once() - self._stop_event.wait(self._interval_s) - - def _sample_once(self) -> None: - now = time.time() - control = self._control_manager.snapshot() - fanout = self._fanout.snapshot() - video = self._video_worker.snapshot() - control_metrics = control["kcp_metrics"] - video_metrics = video["kcp_metrics"] - - self._history.append( - { - "ts": now, - "connected": bool(control["connected"]), - "srtt_ms": float(control_metrics.get("srtt_ms", 0.0) or 0.0), - "retrans_segs": int(control_metrics.get("retrans_segs", 0) or 0), - } - ) - self._history = [sample for sample in self._history if now - sample["ts"] <= self._window_s] - - srtt_values = [sample["srtt_ms"] for sample in self._history if sample["connected"]] - avg_srtt_ms = sum(srtt_values) / len(srtt_values) if srtt_values else 0.0 - jitter_ms = 0.0 - if len(srtt_values) >= 2: - deltas = [abs(srtt_values[index] - srtt_values[index - 1]) for index in range(1, len(srtt_values))] - jitter_ms = sum(deltas) / len(deltas) - retrans_delta = 0 - if len(self._history) >= 2: - retrans_delta = max(0, int(self._history[-1]["retrans_segs"] - self._history[0]["retrans_segs"])) - - last_frame_ts_ms = int(video["last_frame_ts"] or 0) - video_oversized_recent = bool( - video["last_drop_reason"] == "frame_too_large" - and last_frame_ts_ms > 0 - and now - (last_frame_ts_ms / 1000.0) <= self._window_s - ) - video_backlogged = bool( - int(video_metrics.get("ring_buffer_snd_queue", 0) or 0) > 128 - or int(video_metrics.get("ring_buffer_snd_buffer", 0) or 0) > 128 - ) - - health_band, recommended_profile = self._policy_engine.classify( - control_connected=bool(control["connected"]), - consumer_connected=bool(fanout["consumer_connected"]), - avg_srtt_ms=avg_srtt_ms, - retrans_delta=retrans_delta, - video_oversized_recent=video_oversized_recent, - video_backlogged=video_backlogged, - ) - self._video_worker.apply_auto_profile(recommended_profile) - - total_bytes_sent = int(control_metrics.get("bytes_sent", 0) or 0) + int(video_metrics.get("bytes_sent", 0) or 0) - total_bytes_received = int(control_metrics.get("bytes_received", 0) or 0) + int(video_metrics.get("bytes_received", 0) or 0) - total_out_segs = int(control_metrics.get("out_segs", 0) or 0) + int(video_metrics.get("out_segs", 0) or 0) - total_retrans = int(control_metrics.get("retrans_segs", 0) or 0) + int(video_metrics.get("retrans_segs", 0) or 0) - tx_kbps = 0 - rx_kbps = 0 - retrans_pct = 0.0 - if self._last_totals is not None: - dt = max(0.001, now - self._last_totals["ts"]) - sent_delta = max(0, total_bytes_sent - int(self._last_totals["bytes_sent"])) - recv_delta = max(0, total_bytes_received - int(self._last_totals["bytes_received"])) - out_seg_delta = max(0, total_out_segs - int(self._last_totals["out_segs"])) - retrans_total_delta = max(0, total_retrans - int(self._last_totals["retrans"])) - tx_kbps = int((sent_delta * 8.0) / dt / 1000.0) - rx_kbps = int((recv_delta * 8.0) / dt / 1000.0) - if out_seg_delta > 0: - retrans_pct = round((retrans_total_delta / out_seg_delta) * 100.0, 2) - self._last_totals = { - "ts": now, - "bytes_sent": float(total_bytes_sent), - "bytes_received": float(total_bytes_received), - "out_segs": float(total_out_segs), - "retrans": float(total_retrans), - } - - state = { - "network": { - "peer_status": "online" if control["connected"] else "offline", - "latency_ms": round(avg_srtt_ms or float(control_metrics.get("srtt_ms", 0) or 0.0), 1), - "jitter_ms": round(jitter_ms, 1), - "retrans_pct": round(retrans_pct, 2), - "packet_loss_pct": round(retrans_pct, 2), - "tx_kbps": tx_kbps, - "rx_kbps": rx_kbps, - "signal_dbm": None, - "transport": "OmniSocket / daemon", - "source_mode": "daemon-live", - "updated_at": utc_iso_now(), - }, - "control": { - "connected": bool(control["connected"]), - "consumer_connected": bool(fanout["consumer_connected"]), - "queue_depth": int(control["queue_depth"]), - "fanout_queue_depth": int(fanout["queue_depth"]), - "packets_received": int(control["packets_received"]), - "packets_enqueued": int(control["packets_enqueued"]), - "sent_to_consumer": int(fanout["sent_packets"]), - "dropped_no_consumer": int(fanout["dropped_no_consumer"]), - "dropped_queue_full": int(control["dropped_queue_full"]), - "ignored_non_binary": int(control["ignored_non_binary"]), - "ignored_bad_length": int(control["ignored_bad_length"]), - "last_error": control["last_error"], - "peer_id": control["peer_id"], - "server_addr": control["server_addr"], - "relay_via": control["relay_via"], - "stats": control["stats"], - "kcp_metrics": control_metrics, - }, - "video": { - "enabled": bool(video["enabled"]), - "running": bool(video["running"]), - "mode": str(video["mode"]), - "desired_profile": video["desired_profile"], - "active_profile": video["active_profile"], - "restart_count": int(video["restart_count"]), - "last_exit_code": video["last_exit_code"], - "last_error": video["last_error"], - "frames_total": int(video["frames_total"]), - "frames_sent": int(video["frames_sent"]), - "frames_dropped": int(video["frames_dropped"]), - "oversized_drops": int(video["oversized_drops"]), - "last_frame_bytes": int(video["last_frame_bytes"]), - "last_encode_us": int(video["last_encode_us"]), - "last_drop_reason": str(video["last_drop_reason"]), - "last_frame_ts": int(video["last_frame_ts"]), - "binary_path": video["binary_path"], - "kcp_metrics": video_metrics, - }, - "policy": { - "mode": video["mode"], - "health_band": health_band, - "recommended_video_profile": recommended_profile.as_dict(), - }, - "daemon": { - "started_at": self._started_at, - "version": VERSION, - "config_path": self._config["config_path"], - "socket_path": self._config["daemon"]["socket_path"], - "ctrl_socket_path": self._config["daemon"]["ctrl_socket_path"], - }, - } - - with self._lock: - self._state = state - - @staticmethod - def _build_initial_state(config: dict[str, Any]) -> dict[str, Any]: - return { - "network": { - "peer_status": "offline", - "latency_ms": 0.0, - "jitter_ms": 0.0, - "retrans_pct": 0.0, - "packet_loss_pct": 0.0, - "tx_kbps": 0, - "rx_kbps": 0, - "signal_dbm": None, - "transport": "OmniSocket / daemon", - "source_mode": "daemon-starting", - "updated_at": utc_iso_now(), - }, - "control": { - "connected": False, - "consumer_connected": False, - "queue_depth": 0, - "fanout_queue_depth": 0, - "packets_received": 0, - "packets_enqueued": 0, - "sent_to_consumer": 0, - "dropped_no_consumer": 0, - "dropped_queue_full": 0, - "ignored_non_binary": 0, - "ignored_bad_length": 0, - "last_error": "", - "peer_id": "", - "server_addr": "", - "relay_via": "", - "stats": _default_stats(), - "kcp_metrics": _default_kcp_metrics(), - }, - "video": { - "enabled": bool(config["video_sender"]["enabled"]), - "running": False, - "mode": config["policy"]["mode"], - "desired_profile": dict(config["video_sender"]["initial_profile"]), - "active_profile": dict(config["video_sender"]["initial_profile"]), - "restart_count": 0, - "last_exit_code": None, - "last_error": "", - "frames_total": 0, - "frames_sent": 0, - "frames_dropped": 0, - "oversized_drops": 0, - "last_frame_bytes": 0, - "last_encode_us": 0, - "last_drop_reason": "", - "last_frame_ts": 0, - "binary_path": config["video_sender"]["binary_path"], - "kcp_metrics": _default_kcp_metrics(), - }, - "policy": { - "mode": config["policy"]["mode"], - "health_band": "red", - "recommended_video_profile": dict(config["policy"]["profile_red"]), - }, - "daemon": { - "started_at": utc_iso_now(), - "version": VERSION, - "config_path": config["config_path"], - "socket_path": config["daemon"]["socket_path"], - "ctrl_socket_path": config["daemon"]["ctrl_socket_path"], - }, - } - - -class ThreadingUnixHTTPServer(socketserver.ThreadingMixIn, socketserver.UnixStreamServer): - daemon_threads = True - - -class OmniDaemonHTTPHandler(BaseHTTPRequestHandler): - server_version = "BsideOmniDaemonHTTP/1.0" - protocol_version = "HTTP/1.1" - - def do_GET(self) -> None: # pragma: no cover - exercised by runtime integration - app: BSideOmniDaemon = self.server.app # type: ignore[attr-defined] - if self.path == "/v1/health": - state = app.get_state() - band = state["policy"]["health_band"] - status = "ok" if band == "green" else "degraded" - if not state["control"]["connected"]: - status = "unavailable" - self._send_json( - HTTPStatus.OK, - { - "status": status, - "health_band": band, - "control_connected": state["control"]["connected"], - "consumer_connected": state["control"]["consumer_connected"], - "video_running": state["video"]["running"], - "updated_at": state["network"]["updated_at"], - }, - ) - return - if self.path == "/v1/state": - self._send_json(HTTPStatus.OK, app.get_state()) - return - self._send_json(HTTPStatus.NOT_FOUND, {"error": f"unknown path: {self.path}"}) - - def do_POST(self) -> None: # pragma: no cover - exercised by runtime integration - app: BSideOmniDaemon = self.server.app # type: ignore[attr-defined] - if self.path != "/v1/video/profile": - self._send_json(HTTPStatus.NOT_FOUND, {"error": f"unknown path: {self.path}"}) - return - try: - payload = self._read_json() - except ValueError as error: - self._send_json(HTTPStatus.BAD_REQUEST, {"error": str(error)}) - return - - try: - mode = str(payload.get("mode", "auto")).lower() - if mode == "auto": - state = app.set_video_profile(mode="auto") - else: - state = app.set_video_profile( - mode="manual", - fps=int(payload.get("fps", 0)), - jpeg_quality_qscale=int(payload.get("jpeg_quality_qscale", 0)), - max_frame_bytes=int(payload.get("max_frame_bytes", 0)), - ) - except (TypeError, ValueError) as error: - self._send_json(HTTPStatus.BAD_REQUEST, {"error": str(error)}) - return - - self._send_json(HTTPStatus.OK, state) - - def log_message(self, format: str, *args: Any) -> None: # noqa: A003 - return - - def _read_json(self) -> dict[str, Any]: - raw_length = self.headers.get("Content-Length") - if raw_length is None: - raise ValueError("missing Content-Length") - try: - length = int(raw_length) - except ValueError as error: - raise ValueError("invalid Content-Length") from error - raw = self.rfile.read(length) - try: - payload = json.loads(raw.decode("utf-8")) - except json.JSONDecodeError as error: - raise ValueError(f"invalid JSON body: {error}") from error - if not isinstance(payload, dict): - raise ValueError("request body must be a JSON object") - return payload - - def _send_json(self, status: HTTPStatus, payload: dict[str, Any]) -> None: - body = json.dumps(payload).encode("utf-8") - self.send_response(status.value) - self.send_header("Content-Type", "application/json; charset=utf-8") - self.send_header("Content-Length", str(len(body))) - self.send_header("Cache-Control", "no-store") - self.send_header("Connection", "keep-alive") - self.end_headers() - self.wfile.write(body) - - -class BSideOmniDaemon: - def __init__(self, config_path: str | None = None) -> None: - self._config = _load_config(config_path) - self._control_manager = ControlRecvManager(self._config) - self._fanout = ControlFanout( - self._config["daemon"]["ctrl_socket_path"], - self._control_manager.packet_queue, - ) - self._video_worker = VideoWorkerManager(self._config) - self._telemetry = TelemetrySampler( - self._config, - self._control_manager, - self._fanout, - self._video_worker, - ) - self._server: ThreadingUnixHTTPServer | None = None - self._server_thread: threading.Thread | None = None - - @property - def socket_path(self) -> str: - return self._config["daemon"]["socket_path"] - - def start(self) -> None: - self._fanout.start() - self._control_manager.start() - self._video_worker.start() - self._telemetry.start() - self._server = self._build_server() - - def stop(self) -> None: - server_thread = None - if self._server is not None: - if self._server_thread is not None and self._server_thread.is_alive(): - self._server.shutdown() - self._server.server_close() - self._server = None - if self._server_thread is not None and self._server_thread.is_alive(): - server_thread = self._server_thread - self._server_thread = None - try: - os.unlink(self.socket_path) - except FileNotFoundError: - pass - if server_thread is not None and server_thread is not threading.current_thread(): - server_thread.join(timeout=2.0) - self._telemetry.stop() - self._video_worker.stop() - self._control_manager.stop() - self._fanout.stop() - - def serve_forever(self) -> None: - self.start() - assert self._server is not None - self._server_thread = threading.Thread( - target=self._server.serve_forever, - name="omni-b-side-http", - daemon=True, - ) - self._server_thread.start() - self._server_thread.join() - - def get_state(self) -> dict[str, Any]: - return self._telemetry.snapshot() - - def set_video_profile( - self, - *, - mode: str, - fps: int | None = None, - jpeg_quality_qscale: int | None = None, - max_frame_bytes: int | None = None, - ) -> dict[str, Any]: - return self._video_worker.set_profile( - mode=mode, - fps=fps, - jpeg_quality_qscale=jpeg_quality_qscale, - max_frame_bytes=max_frame_bytes, - ) - - def _build_server(self) -> ThreadingUnixHTTPServer: - socket_path = self.socket_path - socket_dir = os.path.dirname(socket_path) - if socket_dir: - os.makedirs(socket_dir, exist_ok=True) - try: - os.unlink(socket_path) - except FileNotFoundError: - pass - server = ThreadingUnixHTTPServer(socket_path, OmniDaemonHTTPHandler) - server.app = self # type: ignore[attr-defined] - return server - - -def main(argv: list[str] | None = None) -> None: - parser = argparse.ArgumentParser(description="Run the B-side OmniDaemon") - parser.add_argument("--config", dest="config_path", help="Path to daemon YAML config") - args = parser.parse_args(argv) - - app = BSideOmniDaemon(config_path=args.config_path) - - def _handle_signal(_signum: int, _frame: Any) -> None: - app.stop() - - signal.signal(signal.SIGINT, _handle_signal) - signal.signal(signal.SIGTERM, _handle_signal) - - try: - app.serve_forever() - except KeyboardInterrupt: - pass - finally: - app.stop() - - -if __name__ == "__main__": - main() diff --git a/python/setup.py b/python/setup.py index 234684d..040ddaa 100644 --- a/python/setup.py +++ b/python/setup.py @@ -35,11 +35,10 @@ COMMON_SOURCES = [ setup( name="omnisocket", version="0.1.0", - packages=["omnisocket", "omnisocket_a_side", "omnisocket_b_side"], + packages=["omnisocket", "omnisocket_a_side"], entry_points={ "console_scripts": [ "omnisocket-a-side-daemon=omnisocket_a_side.daemon:main", - "omnisocket-b-side-daemon=omnisocket_b_side.daemon:main", ] }, ext_modules=[ diff --git a/scripts/start_b_side.sh b/scripts/start_b_side.sh deleted file mode 100644 index a03b246..0000000 --- a/scripts/start_b_side.sh +++ /dev/null @@ -1,11 +0,0 @@ -#!/usr/bin/env bash -set -euo pipefail - -ROOT="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)" -CONFIG_PATH="${1:-$ROOT/config/b_side_omnidaemon.yaml}" - -export OMNIBDAEMON_CONFIG="$CONFIG_PATH" -export PYTHONPATH="$ROOT/python${PYTHONPATH:+:$PYTHONPATH}" - -cd "$ROOT" -exec python3 -m omnisocket_b_side.daemon --config "$CONFIG_PATH"