#include "video_pipeline.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #define VIDEO_CAPTURE_WIDTH_DEFAULT 1280 #define VIDEO_CAPTURE_HEIGHT_DEFAULT 720 #define VIDEO_OUTPUT_WIDTH_DEFAULT 640 #define VIDEO_OUTPUT_HEIGHT_DEFAULT 360 #define VIDEO_NUM_BUFFERS 4 #define VIDEO_DEFAULT_CAMERA_DEVICE "/dev/video0" #define VIDEO_DEFAULT_PEER_ID "peer-b-video" #define VIDEO_DEFAULT_TARGET_PEER "peer-a-video" #define VIDEO_SOFT_BACKPRESSURE_SEGMENTS_DEFAULT 64 #define VIDEO_HARD_BACKPRESSURE_SEGMENTS_DEFAULT 192 #define VIDEO_HARD_BACKPRESSURE_HOLD_MS_DEFAULT 1000 #define VIDEO_SOFT_BACKPRESSURE_WINDOW_PRESSURE_PCT 90.0 #define VIDEO_HARD_BACKPRESSURE_WINDOW_PRESSURE_PCT 98.0 typedef struct video_buffer { void *start; size_t length; } video_buffer_t; typedef struct video_sender { kcp_client_t *client; char target_peer[OMNI_MAX_PEER_ID]; uint8_t *send_buffer; size_t send_buffer_cap; } video_sender_t; static int video_pipeline_stop_requested(volatile sig_atomic_t *stop_requested) { return stop_requested != NULL && *stop_requested != 0; } static int env_flag_or_default(const char *name, int fallback) { const char *value = getenv(name); if (value == NULL || value[0] == '\0') { return fallback; } if ( strcmp(value, "1") == 0 || strcmp(value, "true") == 0 || strcmp(value, "TRUE") == 0 || strcmp(value, "yes") == 0 || strcmp(value, "on") == 0 ) { return 1; } if ( strcmp(value, "0") == 0 || strcmp(value, "false") == 0 || strcmp(value, "FALSE") == 0 || strcmp(value, "no") == 0 || strcmp(value, "off") == 0 ) { return 0; } return fallback; } static double video_pipeline_now_ms(void) { struct timespec ts; clock_gettime(CLOCK_MONOTONIC, &ts); return ts.tv_sec * 1000.0 + ts.tv_nsec / 1000000.0; } static void video_pipeline_print_timing_header(void) { fprintf(stderr, "Frame | Capture | Decode | Scale | Encode | Send | Total | Size | Marker\n"); fprintf(stderr, "------|---------|--------|-------|--------|------|-------|------|--------\n"); } static void video_pipeline_print_timing_failure(int frame_number, const char *stage) { fprintf(stderr, "Frame %d: %s failed\n", frame_number, stage); } static void video_pipeline_print_timing_row( int frame_number, double capture_ms, double decode_ms, double scale_ms, double encode_ms, double send_ms, double total_ms, const AVPacket *encoded_pkt ) { size_t size_kb = 0; unsigned int marker = 0; if (encoded_pkt != NULL) { size_kb = (size_t) encoded_pkt->size / 1024; if (encoded_pkt->size > 1) { marker = encoded_pkt->data[1]; } } fprintf( stderr, "%5d | %7.1f | %6.1f | %5.1f | %6.1f | %4.1f | %5.1f | %4zu KB | 0x%02x\n", frame_number, capture_ms, decode_ms, scale_ms, encode_ms, send_ms, total_ms, size_kb, marker ); } static const char *env_or_default(const char *name, const char *fallback) { const char *value = getenv(name); if (value != NULL && value[0] != '\0') { return value; } return fallback; } static const char *env_first_nonempty(const char *first, const char *second, const char *fallback) { const char *value = getenv(first); if (value != NULL && value[0] != '\0') { return value; } value = getenv(second); if (value != NULL && value[0] != '\0') { return value; } return fallback; } static int env_int_or_default(const char *name, int fallback) { const char *value = getenv(name); int parsed; if (value == NULL || value[0] == '\0') { return fallback; } parsed = atoi(value); if (parsed <= 0) { return fallback; } return parsed; } static void video_pipeline_set_error(video_pipeline_stats_t *stats, const char *message) { if (stats == NULL) { return; } pthread_mutex_lock(&stats->mutex); snprintf(stats->last_error, sizeof(stats->last_error), "%s", message == NULL ? "" : message); pthread_mutex_unlock(&stats->mutex); } static void video_pipeline_set_errno_error(video_pipeline_stats_t *stats, const char *prefix) { char buffer[256]; int saved_errno = errno; snprintf( buffer, sizeof(buffer), "%s: %s (errno=%d)", prefix == NULL ? "video pipeline error" : prefix, saved_errno != 0 ? strerror(saved_errno) : "unknown error", saved_errno ); video_pipeline_set_error(stats, buffer); } static void video_pipeline_report_progress(const video_pipeline_config_t *config) { if (config == NULL || config->progress_callback == NULL) { return; } config->progress_callback(config->progress_context); } void video_pipeline_config_init(video_pipeline_config_t *config) { if (config == NULL) { return; } memset(config, 0, sizeof(*config)); config->camera_device = VIDEO_DEFAULT_CAMERA_DEVICE; config->server_addr = ""; config->relay_via = ""; config->bind_ip = ""; config->bind_device = ""; config->peer_id = VIDEO_DEFAULT_PEER_ID; config->target_peer = VIDEO_DEFAULT_TARGET_PEER; config->capture_width = VIDEO_CAPTURE_WIDTH_DEFAULT; config->capture_height = VIDEO_CAPTURE_HEIGHT_DEFAULT; config->output_width = VIDEO_OUTPUT_WIDTH_DEFAULT; config->output_height = VIDEO_OUTPUT_HEIGHT_DEFAULT; config->max_frames = 0; config->enable_timing_logs = 0; config->soft_backpressure_segments = VIDEO_SOFT_BACKPRESSURE_SEGMENTS_DEFAULT; config->hard_backpressure_segments = VIDEO_HARD_BACKPRESSURE_SEGMENTS_DEFAULT; config->hard_backpressure_hold_ms = VIDEO_HARD_BACKPRESSURE_HOLD_MS_DEFAULT; } void video_pipeline_config_load_env(video_pipeline_config_t *config) { if (config == NULL) { return; } config->camera_device = env_or_default("OMNI_CAMERA_DEVICE", config->camera_device); config->server_addr = env_first_nonempty("OMNI_VIDEO_SERVER_ADDR", "OMNISOCKET_SERVER_ADDR", config->server_addr); config->relay_via = env_first_nonempty("OMNI_VIDEO_RELAY_VIA", "OMNISOCKET_RELAY_VIA", config->relay_via); config->bind_ip = env_first_nonempty("OMNI_VIDEO_BIND_IP", "OMNISOCKET_BIND_IP", config->bind_ip); config->bind_device = env_first_nonempty("OMNI_VIDEO_BIND_DEVICE", "OMNISOCKET_BIND_DEVICE", config->bind_device); config->peer_id = env_or_default("OMNI_VIDEO_PEER_ID", config->peer_id); config->target_peer = env_or_default("OMNI_VIDEO_TARGET_PEER", config->target_peer); if (getenv("OMNI_VIDEO_MAX_FRAMES") != NULL) { config->max_frames = atoi(getenv("OMNI_VIDEO_MAX_FRAMES")); } config->enable_timing_logs = env_flag_or_default("OMNI_VIDEO_DEBUG_TIMING", config->enable_timing_logs); config->soft_backpressure_segments = env_int_or_default("OMNI_VIDEO_SOFT_BACKPRESSURE_SEGMENTS", config->soft_backpressure_segments); config->hard_backpressure_segments = env_int_or_default("OMNI_VIDEO_HARD_BACKPRESSURE_SEGMENTS", config->hard_backpressure_segments); config->hard_backpressure_hold_ms = env_int_or_default("OMNI_VIDEO_HARD_BACKPRESSURE_HOLD_MS", config->hard_backpressure_hold_ms); } int video_pipeline_stats_init(video_pipeline_stats_t *stats) { int rc; if (stats == NULL) { errno = EINVAL; return -1; } memset(stats, 0, sizeof(*stats)); rc = pthread_mutex_init(&stats->mutex, NULL); if (rc != 0) { errno = rc; return -1; } return 0; } void video_pipeline_stats_destroy(video_pipeline_stats_t *stats) { if (stats == NULL) { return; } pthread_mutex_destroy(&stats->mutex); } void video_pipeline_stats_snapshot(video_pipeline_stats_t *stats, video_pipeline_stats_t *out_stats) { if (stats == NULL || out_stats == NULL) { return; } memset(out_stats, 0, sizeof(*out_stats)); pthread_mutex_lock(&stats->mutex); out_stats->frames_sent = stats->frames_sent; out_stats->bytes_sent = stats->bytes_sent; out_stats->send_errors = stats->send_errors; out_stats->backpressure_drops = stats->backpressure_drops; out_stats->backlog_resets = stats->backlog_resets; out_stats->last_frame_bytes = stats->last_frame_bytes; out_stats->last_backlog_segments = stats->last_backlog_segments; out_stats->connected = stats->connected; snprintf(out_stats->last_error, sizeof(out_stats->last_error), "%s", stats->last_error); snprintf(out_stats->last_backlog_reason, sizeof(out_stats->last_backlog_reason), "%s", stats->last_backlog_reason); out_stats->transport = stats->transport; pthread_mutex_unlock(&stats->mutex); } static int open_v4l2_device(const char *device) { return open(device, O_RDWR | O_NONBLOCK); } static int init_v4l2_device(int fd, int width, int height) { struct v4l2_format fmt; memset(&fmt, 0, sizeof(fmt)); fmt.type = V4L2_BUF_TYPE_VIDEO_CAPTURE; fmt.fmt.pix.width = width; fmt.fmt.pix.height = height; fmt.fmt.pix.pixelformat = V4L2_PIX_FMT_MJPEG; fmt.fmt.pix.field = V4L2_FIELD_NONE; return ioctl(fd, VIDIOC_S_FMT, &fmt); } static int init_mmap(int fd, video_buffer_t **buffers, int *num_buffers) { struct v4l2_requestbuffers req; int i; memset(&req, 0, sizeof(req)); req.count = VIDEO_NUM_BUFFERS; req.type = V4L2_BUF_TYPE_VIDEO_CAPTURE; req.memory = V4L2_MEMORY_MMAP; if (ioctl(fd, VIDIOC_REQBUFS, &req) < 0) { return -1; } *num_buffers = (int) req.count; *buffers = (video_buffer_t *) calloc(req.count, sizeof(video_buffer_t)); if (*buffers == NULL) { return -1; } for (i = 0; i < (int) req.count; i++) { struct v4l2_buffer buf; memset(&buf, 0, sizeof(buf)); buf.type = V4L2_BUF_TYPE_VIDEO_CAPTURE; buf.memory = V4L2_MEMORY_MMAP; buf.index = (unsigned int) i; if (ioctl(fd, VIDIOC_QUERYBUF, &buf) < 0) { return -1; } (*buffers)[i].length = buf.length; (*buffers)[i].start = mmap(NULL, buf.length, PROT_READ | PROT_WRITE, MAP_SHARED, fd, buf.m.offset); if ((*buffers)[i].start == MAP_FAILED) { return -1; } } return 0; } static AVCodecContext *create_mjpeg_decoder(int width, int height) { const AVCodec *decoder = avcodec_find_decoder(AV_CODEC_ID_MJPEG); AVCodecContext *ctx; AVDictionary *opts = NULL; if (decoder == NULL) { errno = ENOENT; return NULL; } ctx = avcodec_alloc_context3(decoder); if (ctx == NULL) { return NULL; } ctx->width = width; ctx->height = height; ctx->pix_fmt = AV_PIX_FMT_YUVJ420P; ctx->color_range = AVCOL_RANGE_JPEG; ctx->thread_count = 1; av_dict_set(&opts, "flags2", "+fast", 0); if (avcodec_open2(ctx, decoder, &opts) < 0) { avcodec_free_context(&ctx); av_dict_free(&opts); errno = EINVAL; return NULL; } av_dict_free(&opts); return ctx; } static AVCodecContext *create_mjpeg_encoder(int width, int height) { const AVCodec *encoder = avcodec_find_encoder(AV_CODEC_ID_MJPEG); AVCodecContext *ctx; AVDictionary *opts = NULL; if (encoder == NULL) { errno = ENOENT; return NULL; } ctx = avcodec_alloc_context3(encoder); if (ctx == NULL) { return NULL; } ctx->width = width; ctx->height = height; ctx->pix_fmt = AV_PIX_FMT_YUVJ420P; ctx->time_base = (AVRational){1, 30}; ctx->qmin = 8; ctx->qmax = 31; ctx->flags |= AV_CODEC_FLAG_QSCALE; ctx->global_quality = FF_QP2LAMBDA * 5; av_dict_set(&opts, "huffman", "default", 0); if (avcodec_open2(ctx, encoder, &opts) < 0) { avcodec_free_context(&ctx); av_dict_free(&opts); errno = EINVAL; return NULL; } av_dict_free(&opts); return ctx; } static int decode_mjpeg_frame(AVCodecContext *decoder, const uint8_t *data, int size, AVFrame **frame) { AVPacket *pkt; int ret; if (frame == NULL) { errno = EINVAL; return -1; } *frame = NULL; pkt = av_packet_alloc(); if (pkt == NULL) { return -1; } pkt->data = (uint8_t *) data; pkt->size = size; ret = avcodec_send_packet(decoder, pkt); if (ret < 0) { av_packet_free(&pkt); errno = EINVAL; return -1; } *frame = av_frame_alloc(); if (*frame == NULL) { av_packet_free(&pkt); return -1; } ret = avcodec_receive_frame(decoder, *frame); av_packet_free(&pkt); if (ret < 0) { av_frame_free(frame); errno = EINVAL; return -1; } return 0; } static int ensure_scale_context( struct SwsContext **sws_ctx, int *cached_src_width, int *cached_src_height, int *cached_src_format, const AVFrame *src, int output_width, int output_height ) { if ( *sws_ctx != NULL && *cached_src_width == src->width && *cached_src_height == src->height && *cached_src_format == src->format ) { return 0; } sws_freeContext(*sws_ctx); *sws_ctx = sws_getContext( src->width, src->height, src->format, output_width, output_height, AV_PIX_FMT_YUVJ420P, SWS_BILINEAR, NULL, NULL, NULL ); if (*sws_ctx == NULL) { errno = EINVAL; return -1; } *cached_src_width = src->width; *cached_src_height = src->height; *cached_src_format = src->format; return 0; } static int scale_frame(AVFrame *src, AVFrame **dst, struct SwsContext *sws_ctx, int output_width, int output_height) { int ret; if (sws_ctx == NULL) { errno = EINVAL; return -1; } *dst = av_frame_alloc(); if (*dst == NULL) { return -1; } (*dst)->width = output_width; (*dst)->height = output_height; (*dst)->format = AV_PIX_FMT_YUVJ420P; if (av_frame_get_buffer(*dst, 0) < 0) { av_frame_free(dst); errno = ENOMEM; return -1; } ret = sws_scale( sws_ctx, (const uint8_t *const *) src->data, src->linesize, 0, src->height, (*dst)->data, (*dst)->linesize ); if (ret < 0) { av_frame_free(dst); errno = EINVAL; return -1; } return 0; } static int video_sender_ensure_buffer_capacity(video_sender_t *sender, size_t min_capacity) { uint8_t *resized_buffer; size_t next_capacity; if (sender == NULL) { errno = EINVAL; return -1; } if (sender->send_buffer_cap >= min_capacity) { return 0; } next_capacity = sender->send_buffer_cap == 0 ? min_capacity : sender->send_buffer_cap; while (next_capacity < min_capacity) { next_capacity *= 2; } resized_buffer = (uint8_t *) realloc(sender->send_buffer, next_capacity); if (resized_buffer == NULL) { return -1; } sender->send_buffer = resized_buffer; sender->send_buffer_cap = next_capacity; return 0; } static int encode_frame(AVCodecContext *encoder, AVFrame *frame, AVPacket **pkt) { int ret; if (pkt == NULL) { errno = EINVAL; return -1; } *pkt = av_packet_alloc(); if (*pkt == NULL) { return -1; } ret = avcodec_send_frame(encoder, frame); if (ret < 0) { av_packet_free(pkt); errno = EINVAL; return -1; } ret = avcodec_receive_packet(encoder, *pkt); if (ret < 0) { av_packet_free(pkt); errno = EINVAL; return -1; } return 0; } static int64_t get_realtime_ms(void) { struct timespec ts; clock_gettime(CLOCK_REALTIME, &ts); return (int64_t) ts.tv_sec * 1000 + ts.tv_nsec / 1000000; } static int video_sender_init(video_sender_t *sender, const video_pipeline_config_t *config) { kcp_conn_options_t options; if (sender == NULL || config == NULL || config->server_addr == NULL || config->server_addr[0] == '\0') { errno = EINVAL; return -1; } memset(sender, 0, sizeof(*sender)); snprintf(sender->target_peer, sizeof(sender->target_peer), "%s", config->target_peer); kcp_conn_options_set_video_defaults(&options); sender->client = kcp_client_dial_with_options( config->server_addr, config->relay_via, config->peer_id, config->bind_ip, config->bind_device, &options, NULL, NULL, NULL, KCP_DEFAULT_STATS_INTERVAL_MS ); if (sender->client == NULL) { return -1; } return 0; } static int video_sender_drain_pending_messages(video_sender_t *sender) { if (sender == NULL || sender->client == NULL) { errno = EINVAL; return -1; } for (;;) { message_t msg; int rc; protocol_message_init(&msg); rc = kcp_client_receive_timed(sender->client, &msg, 1); if (rc == 1) { protocol_message_clear(&msg); return 0; } if (rc != 0) { protocol_message_clear(&msg); return -1; } // Drain unread server errors so an offline receiver cannot back up the reverse KCP stream. protocol_message_clear(&msg); } } static int video_sender_send_packet( video_sender_t *sender, const AVPacket *encoded_pkt, const video_pipeline_packet_metadata_t *metadata ) { uint8_t *payload; size_t payload_len; int rc; if (sender == NULL || sender->client == NULL || encoded_pkt == NULL || metadata == NULL) { errno = EINVAL; return -1; } payload_len = (size_t) encoded_pkt->size + sizeof(*metadata); if (video_sender_ensure_buffer_capacity(sender, payload_len) != 0) { return -1; } payload = sender->send_buffer; memcpy(payload, encoded_pkt->data, (size_t) encoded_pkt->size); memcpy(payload + encoded_pkt->size, metadata, sizeof(*metadata)); rc = kcp_client_send_binary(sender->client, sender->target_peer, payload, payload_len); if (rc != 0) { return rc; } rc = video_sender_drain_pending_messages(sender); return rc; } static void video_sender_close(video_sender_t *sender) { if (sender == NULL) { return; } if (sender->client != NULL) { kcp_client_close(sender->client); kcp_client_free(sender->client); sender->client = NULL; } free(sender->send_buffer); sender->send_buffer = NULL; sender->send_buffer_cap = 0; } static uint32_t video_sender_backlog_segments(const kcp_runtime_stats_t *stats) { if (stats == NULL) { return 0; } return stats->snd_queue + stats->snd_buffer; } static int video_sender_soft_backpressure_active(const video_pipeline_config_t *config, const kcp_runtime_stats_t *transport) { if (config == NULL || transport == NULL) { return 0; } return video_sender_backlog_segments(transport) >= (uint32_t) config->soft_backpressure_segments || transport->window_pressure_pct >= VIDEO_SOFT_BACKPRESSURE_WINDOW_PRESSURE_PCT; } static int video_sender_hard_backpressure_active(const video_pipeline_config_t *config, const kcp_runtime_stats_t *transport) { if (config == NULL || transport == NULL) { return 0; } return video_sender_backlog_segments(transport) >= (uint32_t) config->hard_backpressure_segments || transport->window_pressure_pct >= VIDEO_HARD_BACKPRESSURE_WINDOW_PRESSURE_PCT; } static void video_pipeline_note_backpressure( video_pipeline_stats_t *stats, const char *reason, const kcp_runtime_stats_t *transport, int increment_drop, int increment_reset ) { if (stats == NULL) { return; } pthread_mutex_lock(&stats->mutex); if (increment_drop) { stats->backpressure_drops += 1; } if (increment_reset) { stats->backlog_resets += 1; } if (transport != NULL) { stats->last_backlog_segments = video_sender_backlog_segments(transport); stats->transport = *transport; } else { stats->last_backlog_segments = 0; } snprintf(stats->last_backlog_reason, sizeof(stats->last_backlog_reason), "%s", reason == NULL ? "" : reason); pthread_mutex_unlock(&stats->mutex); } static void video_pipeline_cleanup_buffers(video_buffer_t *buffers, int num_buffers) { int i; if (buffers == NULL) { return; } for (i = 0; i < num_buffers; i++) { if (buffers[i].start != NULL && buffers[i].start != MAP_FAILED) { munmap(buffers[i].start, buffers[i].length); } } free(buffers); } int video_pipeline_run(const video_pipeline_config_t *config, video_pipeline_stats_t *stats, volatile sig_atomic_t *stop_requested) { video_pipeline_config_t defaults; video_sender_t sender; video_buffer_t *buffers = NULL; AVCodecContext *decoder = NULL; AVCodecContext *encoder = NULL; struct SwsContext *sws_ctx = NULL; enum v4l2_buf_type type = V4L2_BUF_TYPE_VIDEO_CAPTURE; int num_buffers = 0; int fd = -1; int frame_index = 0; int rc = -1; int sws_src_width = 0; int sws_src_height = 0; int sws_src_format = -1; uint32_t hard_backpressure_since_ms = 0; uint32_t last_soft_drop_log_ms = 0; const char *gpsd_host = env_or_default("OMNI_GPSD_HOST", "127.0.0.1"); int gps_buffer_started = 0; memset(&sender, 0, sizeof(sender)); if (stats == NULL) { errno = EINVAL; return -1; } video_pipeline_config_init(&defaults); if (config == NULL) { config = &defaults; } #ifdef QUIET_FFMPEG_LOGS av_log_set_level(AV_LOG_ERROR); #endif if (config->server_addr == NULL || config->server_addr[0] == '\0') { errno = EINVAL; video_pipeline_set_error(stats, "video server address is required"); return -1; } fd = open_v4l2_device(config->camera_device); if (fd < 0) { video_pipeline_set_errno_error(stats, "failed to open camera device"); goto cleanup; } if (init_v4l2_device(fd, config->capture_width, config->capture_height) < 0) { video_pipeline_set_errno_error(stats, "failed to configure V4L2"); goto cleanup; } if (init_mmap(fd, &buffers, &num_buffers) < 0) { video_pipeline_set_errno_error(stats, "failed to initialize V4L2 mmap"); goto cleanup; } decoder = create_mjpeg_decoder(config->capture_width, config->capture_height); encoder = create_mjpeg_encoder(config->output_width, config->output_height); if (decoder == NULL || encoder == NULL) { video_pipeline_set_errno_error(stats, "failed to initialize codecs"); goto cleanup; } if (video_sender_init(&sender, config) < 0) { video_pipeline_set_errno_error(stats, "failed to start video sender"); goto cleanup; } if (gps_buffer_init(gpsd_host) != 0) { fprintf(stderr, "[video_pipeline] failed to start GPS buffer using %s:2947\n", gpsd_host); } else { gps_buffer_started = 1; } pthread_mutex_lock(&stats->mutex); stats->connected = 1; stats->last_error[0] = '\0'; pthread_mutex_unlock(&stats->mutex); for (frame_index = 0; frame_index < num_buffers; frame_index++) { struct v4l2_buffer buf; memset(&buf, 0, sizeof(buf)); buf.type = V4L2_BUF_TYPE_VIDEO_CAPTURE; buf.memory = V4L2_MEMORY_MMAP; buf.index = (unsigned int) frame_index; if (ioctl(fd, VIDIOC_QBUF, &buf) < 0) { video_pipeline_set_errno_error(stats, "failed to queue V4L2 buffer"); goto cleanup; } } if (ioctl(fd, VIDIOC_STREAMON, &type) < 0) { video_pipeline_set_errno_error(stats, "failed to start V4L2 streaming"); goto cleanup; } if (config->enable_timing_logs) { fprintf(stderr, "\nRunning video pipeline timing benchmark...\n"); video_pipeline_print_timing_header(); } frame_index = 0; while (!video_pipeline_stop_requested(stop_requested)) { fd_set fds; struct timeval timeout; struct v4l2_buffer buf; AVFrame *decoded_frame = NULL; AVFrame *scaled_frame = NULL; AVPacket *encoded_pkt = NULL; kcp_runtime_stats_t transport_stats; int select_rc; double total_start_ms = 0.0; double capture_start_ms = 0.0; double capture_end_ms = 0.0; double decode_start_ms = 0.0; double decode_end_ms = 0.0; double scale_start_ms = 0.0; double scale_end_ms = 0.0; double encode_start_ms = 0.0; double encode_end_ms = 0.0; double send_start_ms = 0.0; double send_end_ms = 0.0; video_pipeline_packet_metadata_t packet_metadata; int frame_number = frame_index + 1; memset(&transport_stats, 0, sizeof(transport_stats)); memset(&packet_metadata, 0, sizeof(packet_metadata)); video_pipeline_report_progress(config); if (config->max_frames > 0 && frame_index >= config->max_frames) { break; } if (config->enable_timing_logs) { total_start_ms = video_pipeline_now_ms(); } FD_ZERO(&fds); FD_SET(fd, &fds); timeout.tv_sec = 2; timeout.tv_usec = 0; select_rc = select(fd + 1, &fds, NULL, NULL, &timeout); if (select_rc <= 0) { if (select_rc == 0) { errno = ETIMEDOUT; } video_pipeline_set_errno_error(stats, "failed waiting for camera frame"); goto cleanup; } if (config->enable_timing_logs) { capture_start_ms = video_pipeline_now_ms(); } memset(&buf, 0, sizeof(buf)); buf.type = V4L2_BUF_TYPE_VIDEO_CAPTURE; buf.memory = V4L2_MEMORY_MMAP; if (ioctl(fd, VIDIOC_DQBUF, &buf) < 0) { video_pipeline_set_errno_error(stats, "failed to dequeue V4L2 buffer"); goto cleanup; } if (config->enable_timing_logs) { capture_end_ms = video_pipeline_now_ms(); decode_start_ms = capture_end_ms; } if (decode_mjpeg_frame(decoder, (const uint8_t *) buffers[buf.index].start, (int) buf.bytesused, &decoded_frame) != 0) { if (config->enable_timing_logs) { video_pipeline_print_timing_failure(frame_number, "decode"); } (void) ioctl(fd, VIDIOC_QBUF, &buf); continue; } if (config->enable_timing_logs) { decode_end_ms = video_pipeline_now_ms(); scale_start_ms = decode_end_ms; } if ( ensure_scale_context( &sws_ctx, &sws_src_width, &sws_src_height, &sws_src_format, decoded_frame, config->output_width, config->output_height ) != 0 || scale_frame(decoded_frame, &scaled_frame, sws_ctx, config->output_width, config->output_height) != 0 ) { if (config->enable_timing_logs) { video_pipeline_print_timing_failure(frame_number, "scale"); } av_frame_free(&decoded_frame); (void) ioctl(fd, VIDIOC_QBUF, &buf); continue; } if (config->enable_timing_logs) { scale_end_ms = video_pipeline_now_ms(); encode_start_ms = scale_end_ms; } if (encode_frame(encoder, scaled_frame, &encoded_pkt) != 0) { if (config->enable_timing_logs) { video_pipeline_print_timing_failure(frame_number, "encode"); } av_frame_free(&decoded_frame); av_frame_free(&scaled_frame); (void) ioctl(fd, VIDIOC_QBUF, &buf); continue; } if (config->enable_timing_logs) { encode_end_ms = video_pipeline_now_ms(); send_start_ms = encode_end_ms; } { gps_video_sample_t gps_sample = get_latest_gps_for_video(); packet_metadata.timestamp_ms = (uint64_t) get_realtime_ms(); packet_metadata.latitude = gps_sample.latitude; packet_metadata.longitude = gps_sample.longitude; } kcp_client_runtime_stats_snapshot(sender.client, &transport_stats); if (video_sender_hard_backpressure_active(config, &transport_stats)) { uint32_t now_ms = omni_now_millis32(); if (hard_backpressure_since_ms == 0) { hard_backpressure_since_ms = now_ms; } if (now_ms - hard_backpressure_since_ms >= (uint32_t) config->hard_backpressure_hold_ms) { char reason[128]; uint32_t backlog_segments = video_sender_backlog_segments(&transport_stats); snprintf( reason, sizeof(reason), "hard_reset backlog=%u snd_queue=%u snd_buffer=%u window_pressure=%.1f%% hold_ms=%d", backlog_segments, transport_stats.snd_queue, transport_stats.snd_buffer, transport_stats.window_pressure_pct, config->hard_backpressure_hold_ms ); video_pipeline_note_backpressure(stats, reason, &transport_stats, 0, 1); video_pipeline_set_error(stats, reason); fprintf( stderr, "[video_pipeline] backlog hard reset: backlog=%u snd_queue=%u snd_buffer=%u window_pressure=%.1f%% hold_ms=%d\n", backlog_segments, transport_stats.snd_queue, transport_stats.snd_buffer, transport_stats.window_pressure_pct, config->hard_backpressure_hold_ms ); av_frame_free(&decoded_frame); av_frame_free(&scaled_frame); av_packet_free(&encoded_pkt); (void) ioctl(fd, VIDIOC_QBUF, &buf); rc = VIDEO_PIPELINE_RUN_RETRY_IMMEDIATE; goto cleanup; } } else { hard_backpressure_since_ms = 0; } if (video_sender_soft_backpressure_active(config, &transport_stats)) { uint32_t now_ms = omni_now_millis32(); uint32_t backlog_segments = video_sender_backlog_segments(&transport_stats); char reason[128]; snprintf( reason, sizeof(reason), "soft_drop backlog=%u snd_queue=%u snd_buffer=%u window_pressure=%.1f%% threshold=%d", backlog_segments, transport_stats.snd_queue, transport_stats.snd_buffer, transport_stats.window_pressure_pct, config->soft_backpressure_segments ); video_pipeline_note_backpressure(stats, reason, &transport_stats, 1, 0); if (now_ms - last_soft_drop_log_ms >= 1000U) { fprintf( stderr, "[video_pipeline] soft drop: backlog=%u snd_queue=%u snd_buffer=%u window_pressure=%.1f%% threshold=%d\n", backlog_segments, transport_stats.snd_queue, transport_stats.snd_buffer, transport_stats.window_pressure_pct, config->soft_backpressure_segments ); last_soft_drop_log_ms = now_ms; } av_frame_free(&decoded_frame); av_frame_free(&scaled_frame); av_packet_free(&encoded_pkt); (void) ioctl(fd, VIDIOC_QBUF, &buf); continue; } if (video_sender_send_packet(&sender, encoded_pkt, &packet_metadata) != 0) { pthread_mutex_lock(&stats->mutex); stats->send_errors += 1; pthread_mutex_unlock(&stats->mutex); if (config->enable_timing_logs) { video_pipeline_print_timing_failure(frame_number, "send"); } video_pipeline_set_errno_error(stats, "failed to send video packet"); av_frame_free(&decoded_frame); av_frame_free(&scaled_frame); av_packet_free(&encoded_pkt); (void) ioctl(fd, VIDIOC_QBUF, &buf); goto cleanup; } if (config->enable_timing_logs) { send_end_ms = video_pipeline_now_ms(); } pthread_mutex_lock(&stats->mutex); stats->frames_sent += 1; stats->bytes_sent += (uint64_t) encoded_pkt->size; stats->last_frame_bytes = (uint64_t) encoded_pkt->size; kcp_client_runtime_stats_snapshot(sender.client, &stats->transport); pthread_mutex_unlock(&stats->mutex); if (config->enable_timing_logs) { video_pipeline_print_timing_row( frame_number, capture_end_ms - capture_start_ms, decode_end_ms - decode_start_ms, scale_end_ms - scale_start_ms, encode_end_ms - encode_start_ms, send_end_ms - send_start_ms, send_end_ms - total_start_ms, encoded_pkt ); } av_frame_free(&decoded_frame); av_frame_free(&scaled_frame); av_packet_free(&encoded_pkt); if (ioctl(fd, VIDIOC_QBUF, &buf) < 0) { video_pipeline_set_errno_error(stats, "failed to requeue V4L2 buffer"); goto cleanup; } frame_index += 1; } rc = 0; cleanup: pthread_mutex_lock(&stats->mutex); stats->connected = 0; pthread_mutex_unlock(&stats->mutex); if (gps_buffer_started) { gps_buffer_cleanup(); } if (fd >= 0) { (void) ioctl(fd, VIDIOC_STREAMOFF, &type); } video_sender_close(&sender); if (encoder != NULL) { avcodec_free_context(&encoder); } if (decoder != NULL) { avcodec_free_context(&decoder); } sws_freeContext(sws_ctx); video_pipeline_cleanup_buffers(buffers, num_buffers); if (fd >= 0) { close(fd); } return rc; }