#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "cJSON.h" #include "peer_kcp_client.h" #define WORKER_CONTROL_FD 3 #define WORKER_TELEMETRY_FD 4 #define NUM_BUFFERS 4 #define CLEAR(x) memset(&(x), 0, sizeof(x)) #define VIDEO_SERVER_ADDR_ENV "OMNI_VIDEO_SERVER_ADDR" #define VIDEO_RELAY_ADDR_ENV "OMNI_VIDEO_RELAY_VIA" #define VIDEO_BIND_IP_ENV "OMNI_VIDEO_BIND_IP" #define VIDEO_BIND_DEVICE_ENV "OMNI_VIDEO_BIND_DEVICE" #define VIDEO_PEER_ID_ENV "OMNI_VIDEO_PEER_ID" #define VIDEO_TARGET_PEER_ENV "OMNI_VIDEO_TARGET_PEER" #define VIDEO_DEVICE_ENV "OMNI_VIDEO_DEVICE" #define VIDEO_CAPTURE_WIDTH_ENV "OMNI_VIDEO_CAPTURE_WIDTH" #define VIDEO_CAPTURE_HEIGHT_ENV "OMNI_VIDEO_CAPTURE_HEIGHT" #define VIDEO_OUTPUT_WIDTH_ENV "OMNI_VIDEO_OUTPUT_WIDTH" #define VIDEO_OUTPUT_HEIGHT_ENV "OMNI_VIDEO_OUTPUT_HEIGHT" #define VIDEO_FPS_ENV "OMNI_VIDEO_FPS" #define VIDEO_JPEG_QSCALE_ENV "OMNI_VIDEO_JPEG_QSCALE" #define VIDEO_MAX_FRAME_BYTES_ENV "OMNI_VIDEO_MAX_FRAME_BYTES" #define VIDEO_STATS_INTERVAL_ENV "OMNI_VIDEO_STATS_INTERVAL_MS" typedef struct { void *start; size_t length; } Buffer; typedef struct { char server_addr[OMNI_MAX_ADDR_TEXT]; char relay_addr[OMNI_MAX_ADDR_TEXT]; char bind_ip[OMNI_MAX_ADDR_TEXT]; char bind_device[128]; char peer_id[OMNI_MAX_PEER_ID]; char target_peer[OMNI_MAX_PEER_ID]; char video_device[256]; int capture_width; int capture_height; int output_width; int output_height; int initial_fps; int initial_qscale; int initial_max_frame_bytes; int stats_interval_ms; } worker_config_t; typedef struct { pthread_mutex_t lock; pthread_mutex_t telemetry_lock; FILE *telemetry_stream; int target_fps; int jpeg_quality_qscale; int max_frame_bytes; bool shutdown_requested; } runtime_state_t; typedef struct { kcp_client_t *client; char target_peer[OMNI_MAX_PEER_ID]; } video_sender_t; typedef struct { FILE *control_stream; runtime_state_t *runtime; } control_thread_args_t; static volatile sig_atomic_t g_signal_stop = 0; static double monotonic_ms(void) { struct timespec ts; clock_gettime(CLOCK_MONOTONIC, &ts); return (double) ts.tv_sec * 1000.0 + (double) ts.tv_nsec / 1000000.0; } static int64_t realtime_ms(void) { struct timespec ts; clock_gettime(CLOCK_REALTIME, &ts); return (int64_t) ts.tv_sec * 1000 + (int64_t) ts.tv_nsec / 1000000; } static int env_as_int(const char *name, int fallback) { const char *raw = getenv(name); char *endptr = NULL; long value; if (raw == NULL || raw[0] == '\0') { return fallback; } errno = 0; value = strtol(raw, &endptr, 10); if (errno != 0 || endptr == raw || (endptr != NULL && *endptr != '\0')) { return fallback; } return (int) value; } static const char *env_or_default(const char *name, const char *fallback) { const char *value = getenv(name); if (value != NULL && value[0] != '\0') { return value; } return fallback; } static void handle_signal(int signum) { (void) signum; g_signal_stop = 1; } static void runtime_set_shutdown(runtime_state_t *runtime, bool shutdown_requested) { pthread_mutex_lock(&runtime->lock); runtime->shutdown_requested = shutdown_requested; pthread_mutex_unlock(&runtime->lock); } static bool runtime_should_stop(runtime_state_t *runtime) { bool shutdown_requested; pthread_mutex_lock(&runtime->lock); shutdown_requested = runtime->shutdown_requested; pthread_mutex_unlock(&runtime->lock); return g_signal_stop != 0 || shutdown_requested; } static void runtime_get_profile(runtime_state_t *runtime, int *fps, int *qscale, int *max_frame_bytes) { pthread_mutex_lock(&runtime->lock); *fps = runtime->target_fps; *qscale = runtime->jpeg_quality_qscale; *max_frame_bytes = runtime->max_frame_bytes; pthread_mutex_unlock(&runtime->lock); } static void runtime_set_profile(runtime_state_t *runtime, int fps, int qscale, int max_frame_bytes) { pthread_mutex_lock(&runtime->lock); if (fps > 0) { runtime->target_fps = fps; } if (qscale > 0) { runtime->jpeg_quality_qscale = qscale; } if (max_frame_bytes > 0) { runtime->max_frame_bytes = max_frame_bytes; } pthread_mutex_unlock(&runtime->lock); } static void telemetry_write_line(runtime_state_t *runtime, const char *line) { pthread_mutex_lock(&runtime->telemetry_lock); if (runtime->telemetry_stream != NULL) { fputs(line, runtime->telemetry_stream); fputc('\n', runtime->telemetry_stream); fflush(runtime->telemetry_stream); } pthread_mutex_unlock(&runtime->telemetry_lock); } static void telemetry_write_frame_stat( runtime_state_t *runtime, int frame_bytes, int encode_us, bool sent, const char *drop_reason ) { char line[512]; int written; written = snprintf( line, sizeof(line), "{\"type\":\"frame_stat\",\"frame_bytes\":%d,\"encode_us\":%d," "\"sent\":%s,\"drop_reason\":\"%s\",\"ts_unix_ms\":%" PRId64 "}", frame_bytes, encode_us, sent ? "true" : "false", drop_reason != NULL ? drop_reason : "", realtime_ms()); if (written > 0 && written < (int) sizeof(line)) { telemetry_write_line(runtime, line); } } static void telemetry_write_kcp_metrics(runtime_state_t *runtime, const kcp_conn_metrics_t *metrics) { char line[1024]; int written; written = snprintf( line, sizeof(line), "{\"type\":\"kcp_metrics\",\"connected\":%d,\"srtt_ms\":%d,\"srttvar_ms\":%d," "\"rto_ms\":%u,\"bytes_sent\":%" PRIu64 ",\"bytes_received\":%" PRIu64 "," "\"out_pkts\":%" PRIu64 ",\"out_segs\":%" PRIu64 ",\"retrans_segs\":%" PRIu64 "," "\"fast_retrans_segs\":%" PRIu64 ",\"early_retrans_segs\":%" PRIu64 "," "\"lost_segs\":%" PRIu64 ",\"ring_buffer_snd_queue\":%" PRIu64 "," "\"ring_buffer_snd_buffer\":%" PRIu64 ",\"ts_unix_ms\":%" PRId64 "}", metrics->connected, metrics->srtt_ms, metrics->srttvar_ms, metrics->rto_ms, metrics->bytes_sent, metrics->bytes_received, metrics->out_pkts, metrics->out_segs, metrics->retrans_segs, metrics->fast_retrans_segs, metrics->early_retrans_segs, metrics->lost_segs, metrics->ring_buffer_snd_queue, metrics->ring_buffer_snd_buffer, realtime_ms()); if (written > 0 && written < (int) sizeof(line)) { telemetry_write_line(runtime, line); } } static int load_worker_config(worker_config_t *cfg) { const char *server_addr = getenv(VIDEO_SERVER_ADDR_ENV); if (cfg == NULL) { errno = EINVAL; return -1; } if (server_addr == NULL || server_addr[0] == '\0') { fprintf(stderr, "%s is required\n", VIDEO_SERVER_ADDR_ENV); errno = EINVAL; return -1; } CLEAR(*cfg); snprintf(cfg->server_addr, sizeof(cfg->server_addr), "%s", server_addr); snprintf(cfg->relay_addr, sizeof(cfg->relay_addr), "%s", env_or_default(VIDEO_RELAY_ADDR_ENV, "")); snprintf(cfg->bind_ip, sizeof(cfg->bind_ip), "%s", env_or_default(VIDEO_BIND_IP_ENV, "")); snprintf(cfg->bind_device, sizeof(cfg->bind_device), "%s", env_or_default(VIDEO_BIND_DEVICE_ENV, "")); snprintf(cfg->peer_id, sizeof(cfg->peer_id), "%s", env_or_default(VIDEO_PEER_ID_ENV, "peer-b-video")); snprintf(cfg->target_peer, sizeof(cfg->target_peer), "%s", env_or_default(VIDEO_TARGET_PEER_ENV, "peer-a-video")); snprintf(cfg->video_device, sizeof(cfg->video_device), "%s", env_or_default(VIDEO_DEVICE_ENV, "/dev/video0")); cfg->capture_width = env_as_int(VIDEO_CAPTURE_WIDTH_ENV, 1280); cfg->capture_height = env_as_int(VIDEO_CAPTURE_HEIGHT_ENV, 720); cfg->output_width = env_as_int(VIDEO_OUTPUT_WIDTH_ENV, 640); cfg->output_height = env_as_int(VIDEO_OUTPUT_HEIGHT_ENV, 360); cfg->initial_fps = env_as_int(VIDEO_FPS_ENV, 10); cfg->initial_qscale = env_as_int(VIDEO_JPEG_QSCALE_ENV, 8); cfg->initial_max_frame_bytes = env_as_int(VIDEO_MAX_FRAME_BYTES_ENV, 40960); cfg->stats_interval_ms = env_as_int(VIDEO_STATS_INTERVAL_ENV, 100); return 0; } static int open_v4l2_device(const char *device) { int fd = open(device, O_RDWR | O_NONBLOCK); if (fd < 0) { perror("open device"); } return fd; } static int init_v4l2_device(int fd, const worker_config_t *cfg) { struct v4l2_format fmt; CLEAR(fmt); fmt.type = V4L2_BUF_TYPE_VIDEO_CAPTURE; fmt.fmt.pix.width = cfg->capture_width; fmt.fmt.pix.height = cfg->capture_height; fmt.fmt.pix.pixelformat = V4L2_PIX_FMT_MJPEG; fmt.fmt.pix.field = V4L2_FIELD_NONE; if (ioctl(fd, VIDIOC_S_FMT, &fmt) < 0) { perror("VIDIOC_S_FMT"); return -1; } return 0; } static int init_mmap(int fd, Buffer **buffers, int *num_buffers) { struct v4l2_requestbuffers req; int index; CLEAR(req); req.count = NUM_BUFFERS; req.type = V4L2_BUF_TYPE_VIDEO_CAPTURE; req.memory = V4L2_MEMORY_MMAP; if (ioctl(fd, VIDIOC_REQBUFS, &req) < 0) { perror("VIDIOC_REQBUFS"); return -1; } *num_buffers = (int) req.count; *buffers = (Buffer *) calloc(req.count, sizeof(Buffer)); if (*buffers == NULL) { return -1; } for (index = 0; index < (int) req.count; ++index) { struct v4l2_buffer buf; CLEAR(buf); buf.type = V4L2_BUF_TYPE_VIDEO_CAPTURE; buf.memory = V4L2_MEMORY_MMAP; buf.index = (unsigned int) index; if (ioctl(fd, VIDIOC_QUERYBUF, &buf) < 0) { perror("VIDIOC_QUERYBUF"); return -1; } (*buffers)[index].length = buf.length; (*buffers)[index].start = mmap(NULL, buf.length, PROT_READ | PROT_WRITE, MAP_SHARED, fd, buf.m.offset); if ((*buffers)[index].start == MAP_FAILED) { perror("mmap"); return -1; } } return 0; } static int queue_all_buffers(int fd, int num_buffers) { int index; for (index = 0; index < num_buffers; ++index) { struct v4l2_buffer buf; CLEAR(buf); buf.type = V4L2_BUF_TYPE_VIDEO_CAPTURE; buf.memory = V4L2_MEMORY_MMAP; buf.index = (unsigned int) index; if (ioctl(fd, VIDIOC_QBUF, &buf) < 0) { perror("VIDIOC_QBUF"); return -1; } } return 0; } static AVCodecContext *create_mjpeg_decoder(const worker_config_t *cfg) { const AVCodec *decoder = avcodec_find_decoder(AV_CODEC_ID_MJPEG); AVCodecContext *ctx; if (decoder == NULL) { fprintf(stderr, "MJPEG decoder not found\n"); return NULL; } ctx = avcodec_alloc_context3(decoder); if (ctx == NULL) { return NULL; } ctx->width = cfg->capture_width; ctx->height = cfg->capture_height; ctx->pix_fmt = AV_PIX_FMT_YUVJ420P; ctx->color_range = AVCOL_RANGE_JPEG; ctx->thread_count = 1; if (avcodec_open2(ctx, decoder, NULL) < 0) { avcodec_free_context(&ctx); return NULL; } return ctx; } static AVCodecContext *create_mjpeg_encoder(const worker_config_t *cfg) { const AVCodec *encoder = avcodec_find_encoder(AV_CODEC_ID_MJPEG); AVCodecContext *ctx; if (encoder == NULL) { fprintf(stderr, "MJPEG encoder not found\n"); return NULL; } ctx = avcodec_alloc_context3(encoder); if (ctx == NULL) { return NULL; } ctx->width = cfg->output_width; ctx->height = cfg->output_height; ctx->pix_fmt = AV_PIX_FMT_YUVJ420P; ctx->time_base = (AVRational) {1, cfg->initial_fps > 0 ? cfg->initial_fps : 10}; ctx->qmin = 1; ctx->qmax = 31; ctx->flags |= AV_CODEC_FLAG_QSCALE; ctx->global_quality = FF_QP2LAMBDA * cfg->initial_qscale; if (avcodec_open2(ctx, encoder, NULL) < 0) { avcodec_free_context(&ctx); return NULL; } return ctx; } static int decode_mjpeg_frame(AVCodecContext *decoder, const uint8_t *data, int size, AVFrame **frame) { AVPacket *pkt; int ret; *frame = NULL; pkt = av_packet_alloc(); if (pkt == NULL) { return -1; } pkt->data = (uint8_t *) data; pkt->size = size; ret = avcodec_send_packet(decoder, pkt); if (ret < 0) { av_packet_free(&pkt); return -1; } *frame = av_frame_alloc(); if (*frame == NULL) { av_packet_free(&pkt); return -1; } ret = avcodec_receive_frame(decoder, *frame); av_packet_free(&pkt); if (ret < 0) { av_frame_free(frame); return -1; } return 0; } static struct SwsContext *create_scaler(AVFrame *src, const worker_config_t *cfg) { return sws_getContext( src->width, src->height, src->format, cfg->output_width, cfg->output_height, AV_PIX_FMT_YUVJ420P, SWS_BILINEAR, NULL, NULL, NULL); } static int scale_frame(struct SwsContext *sws_ctx, const worker_config_t *cfg, AVFrame *src, AVFrame **dst) { int ret; if (sws_ctx == NULL) { return -1; } *dst = av_frame_alloc(); if (*dst == NULL) { return -1; } (*dst)->width = cfg->output_width; (*dst)->height = cfg->output_height; (*dst)->format = AV_PIX_FMT_YUVJ420P; if (av_frame_get_buffer(*dst, 0) < 0) { av_frame_free(dst); return -1; } ret = sws_scale( sws_ctx, (const uint8_t *const *) src->data, src->linesize, 0, src->height, (*dst)->data, (*dst)->linesize); if (ret < 0) { av_frame_free(dst); return -1; } return 0; } static int encode_frame(AVCodecContext *encoder, AVFrame *frame, int qscale, AVPacket **pkt) { int ret; *pkt = av_packet_alloc(); if (*pkt == NULL) { return -1; } encoder->global_quality = FF_QP2LAMBDA * qscale; frame->quality = encoder->global_quality; ret = avcodec_send_frame(encoder, frame); if (ret < 0) { av_packet_free(pkt); return -1; } ret = avcodec_receive_packet(encoder, *pkt); if (ret < 0) { av_packet_free(pkt); return -1; } return 0; } static int video_sender_init(video_sender_t *sender, const worker_config_t *cfg) { kcp_conn_options_t options; if (sender == NULL) { errno = EINVAL; return -1; } CLEAR(*sender); snprintf(sender->target_peer, sizeof(sender->target_peer), "%s", cfg->target_peer); kcp_conn_options_set_video_defaults(&options); sender->client = kcp_client_dial_with_options( cfg->server_addr, cfg->relay_addr, cfg->peer_id, cfg->bind_ip, cfg->bind_device, &options, NULL, NULL, NULL, cfg->stats_interval_ms); if (sender->client == NULL) { return -1; } fprintf(stderr, "B-side video worker connected as %s -> %s\n", kcp_client_id(sender->client), sender->target_peer); return 0; } static int video_sender_send_packet(video_sender_t *sender, const AVPacket *encoded_pkt) { if (sender == NULL || sender->client == NULL || encoded_pkt == NULL) { errno = EINVAL; return -1; } return kcp_client_send_binary(sender->client, sender->target_peer, encoded_pkt->data, (size_t) encoded_pkt->size); } static void video_sender_close(video_sender_t *sender) { if (sender == NULL || sender->client == NULL) { return; } kcp_client_close(sender->client); kcp_client_free(sender->client); sender->client = NULL; } static void *control_thread_main(void *opaque) { control_thread_args_t *args = (control_thread_args_t *) opaque; char line[512]; while (fgets(line, sizeof(line), args->control_stream) != NULL) { cJSON *root = cJSON_Parse(line); cJSON *type_item; if (root == NULL) { continue; } type_item = cJSON_GetObjectItemCaseSensitive(root, "type"); if (!cJSON_IsString(type_item) || type_item->valuestring == NULL) { cJSON_Delete(root); continue; } if (strcmp(type_item->valuestring, "shutdown") == 0) { runtime_set_shutdown(args->runtime, true); cJSON_Delete(root); return NULL; } if (strcmp(type_item->valuestring, "set_profile") == 0) { cJSON *fps = cJSON_GetObjectItemCaseSensitive(root, "fps"); cJSON *qscale = cJSON_GetObjectItemCaseSensitive(root, "jpeg_quality_qscale"); cJSON *max_frame_bytes = cJSON_GetObjectItemCaseSensitive(root, "max_frame_bytes"); runtime_set_profile( args->runtime, cJSON_IsNumber(fps) ? fps->valueint : -1, cJSON_IsNumber(qscale) ? qscale->valueint : -1, cJSON_IsNumber(max_frame_bytes) ? max_frame_bytes->valueint : -1); } cJSON_Delete(root); } runtime_set_shutdown(args->runtime, true); return NULL; } int main(void) { worker_config_t cfg; runtime_state_t runtime; video_sender_t sender; control_thread_args_t control_args; pthread_t control_thread; FILE *control_stream = NULL; FILE *telemetry_stream = NULL; Buffer *buffers = NULL; int num_buffers = 0; int camera_fd = -1; enum v4l2_buf_type stream_type = V4L2_BUF_TYPE_VIDEO_CAPTURE; AVCodecContext *decoder = NULL; AVCodecContext *encoder = NULL; struct SwsContext *sws_ctx = NULL; double next_deadline_ms = 0.0; double next_metrics_ms = 0.0; int exit_code = 1; int index; int sws_src_width = 0; int sws_src_height = 0; int sws_src_format = -1; bool control_thread_started = false; av_log_set_level(AV_LOG_ERROR); signal(SIGINT, handle_signal); signal(SIGTERM, handle_signal); CLEAR(sender); if (load_worker_config(&cfg) != 0) { return 1; } CLEAR(runtime); pthread_mutex_init(&runtime.lock, NULL); pthread_mutex_init(&runtime.telemetry_lock, NULL); runtime.target_fps = cfg.initial_fps; runtime.jpeg_quality_qscale = cfg.initial_qscale; runtime.max_frame_bytes = cfg.initial_max_frame_bytes; control_stream = fdopen(WORKER_CONTROL_FD, "r"); telemetry_stream = fdopen(WORKER_TELEMETRY_FD, "w"); if (control_stream == NULL || telemetry_stream == NULL) { perror("fdopen worker control/telemetry"); goto cleanup; } setvbuf(telemetry_stream, NULL, _IOLBF, 0); runtime.telemetry_stream = telemetry_stream; control_args.control_stream = control_stream; control_args.runtime = &runtime; if (pthread_create(&control_thread, NULL, control_thread_main, &control_args) != 0) { perror("pthread_create"); goto cleanup; } control_thread_started = true; camera_fd = open_v4l2_device(cfg.video_device); if (camera_fd < 0) { goto cleanup_join_thread; } if (init_v4l2_device(camera_fd, &cfg) != 0) { goto cleanup_join_thread; } if (init_mmap(camera_fd, &buffers, &num_buffers) != 0) { goto cleanup_join_thread; } if (queue_all_buffers(camera_fd, num_buffers) != 0) { goto cleanup_join_thread; } if (ioctl(camera_fd, VIDIOC_STREAMON, &stream_type) < 0) { perror("VIDIOC_STREAMON"); goto cleanup_join_thread; } decoder = create_mjpeg_decoder(&cfg); encoder = create_mjpeg_encoder(&cfg); if (decoder == NULL || encoder == NULL) { fprintf(stderr, "failed to create codecs\n"); goto cleanup_join_thread; } if (video_sender_init(&sender, &cfg) != 0) { perror("video_sender_init"); goto cleanup_join_thread; } next_deadline_ms = monotonic_ms(); next_metrics_ms = next_deadline_ms + 100.0; while (!runtime_should_stop(&runtime)) { AVFrame *decoded_frame = NULL; AVFrame *scaled_frame = NULL; AVPacket *encoded_pkt = NULL; struct v4l2_buffer buf; fd_set fds; struct timeval tv; int select_result; int fps; int qscale; int max_frame_bytes; int encode_us = 0; bool sent = false; const char *drop_reason = ""; double now_ms = monotonic_ms(); double encode_start_ms; double encode_end_ms; runtime_get_profile(&runtime, &fps, &qscale, &max_frame_bytes); if (fps < 1) { fps = 1; } if (next_deadline_ms > now_ms) { usleep((useconds_t) ((next_deadline_ms - now_ms) * 1000.0)); } next_deadline_ms = monotonic_ms() + (1000.0 / (double) fps); FD_ZERO(&fds); FD_SET(camera_fd, &fds); tv.tv_sec = 2; tv.tv_usec = 0; select_result = select(camera_fd + 1, &fds, NULL, NULL, &tv); if (select_result <= 0) { if (select_result < 0 && errno == EINTR) { continue; } fprintf(stderr, "select timeout/error on camera\n"); continue; } CLEAR(buf); buf.type = V4L2_BUF_TYPE_VIDEO_CAPTURE; buf.memory = V4L2_MEMORY_MMAP; if (ioctl(camera_fd, VIDIOC_DQBUF, &buf) < 0) { if (errno == EAGAIN) { continue; } perror("VIDIOC_DQBUF"); break; } if (decode_mjpeg_frame(decoder, (uint8_t *) buffers[buf.index].start, (int) buf.bytesused, &decoded_frame) != 0) { drop_reason = "decode_failed"; goto requeue_and_report; } if ( sws_ctx == NULL || sws_src_width != decoded_frame->width || sws_src_height != decoded_frame->height || sws_src_format != decoded_frame->format ) { sws_freeContext(sws_ctx); sws_ctx = create_scaler(decoded_frame, &cfg); sws_src_width = decoded_frame->width; sws_src_height = decoded_frame->height; sws_src_format = decoded_frame->format; } if (scale_frame(sws_ctx, &cfg, decoded_frame, &scaled_frame) != 0) { drop_reason = "scale_failed"; goto requeue_and_report; } encode_start_ms = monotonic_ms(); if (encode_frame(encoder, scaled_frame, qscale, &encoded_pkt) != 0) { drop_reason = "encode_failed"; goto requeue_and_report; } encode_end_ms = monotonic_ms(); encode_us = (int) ((encode_end_ms - encode_start_ms) * 1000.0); if (encoded_pkt->size > max_frame_bytes) { drop_reason = "frame_too_large"; goto requeue_and_report; } if (video_sender_send_packet(&sender, encoded_pkt) != 0) { perror("video_sender_send_packet"); drop_reason = "send_failed"; goto requeue_and_report; } sent = true; requeue_and_report: telemetry_write_frame_stat( &runtime, encoded_pkt != NULL ? encoded_pkt->size : 0, encode_us, sent, drop_reason); if (ioctl(camera_fd, VIDIOC_QBUF, &buf) < 0) { perror("VIDIOC_QBUF"); av_frame_free(&decoded_frame); av_frame_free(&scaled_frame); if (encoded_pkt != NULL) { av_packet_free(&encoded_pkt); } break; } av_frame_free(&decoded_frame); av_frame_free(&scaled_frame); if (encoded_pkt != NULL) { av_packet_free(&encoded_pkt); } now_ms = monotonic_ms(); if (sender.client != NULL && now_ms >= next_metrics_ms) { kcp_conn_metrics_t metrics; if (kcp_client_metrics_snapshot(sender.client, &metrics) == 0) { telemetry_write_kcp_metrics(&runtime, &metrics); } next_metrics_ms = now_ms + 100.0; } } exit_code = 0; cleanup_join_thread: runtime_set_shutdown(&runtime, true); if (control_stream != NULL) { fclose(control_stream); control_stream = NULL; } if (control_thread_started) { pthread_join(control_thread, NULL); } cleanup: if (control_stream != NULL) { fclose(control_stream); control_stream = NULL; } if (camera_fd >= 0) { ioctl(camera_fd, VIDIOC_STREAMOFF, &stream_type); } if (buffers != NULL) { for (index = 0; index < num_buffers; ++index) { if (buffers[index].start != NULL && buffers[index].start != MAP_FAILED) { munmap(buffers[index].start, buffers[index].length); } } free(buffers); } video_sender_close(&sender); if (encoder != NULL) { avcodec_free_context(&encoder); } if (decoder != NULL) { avcodec_free_context(&decoder); } sws_freeContext(sws_ctx); if (camera_fd >= 0) { close(camera_fd); } if (telemetry_stream != NULL) { fclose(telemetry_stream); telemetry_stream = NULL; } pthread_mutex_destroy(&runtime.lock); pthread_mutex_destroy(&runtime.telemetry_lock); return exit_code; }