Files
OmniSocketGo/src/video_pipeline.c

1266 lines
40 KiB
C

#include "video_pipeline.h"
#include <errno.h>
#include <fcntl.h>
#include <linux/videodev2.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/ioctl.h>
#include <sys/mman.h>
#include <sys/select.h>
#include <sys/time.h>
#include <time.h>
#include <unistd.h>
#include <libavcodec/avcodec.h>
#include <libavformat/avformat.h>
#include <libavutil/avutil.h>
#include <libavutil/imgutils.h>
#include <libavutil/opt.h>
#include <libswscale/swscale.h>
#define VIDEO_CAPTURE_WIDTH_DEFAULT 1280
#define VIDEO_CAPTURE_HEIGHT_DEFAULT 720
#define VIDEO_OUTPUT_WIDTH_DEFAULT 640
#define VIDEO_OUTPUT_HEIGHT_DEFAULT 360
#define VIDEO_NUM_BUFFERS 4
#define VIDEO_DEFAULT_CAMERA_DEVICE "/dev/video0"
#define VIDEO_DEFAULT_PEER_ID "peer-b-video"
#define VIDEO_DEFAULT_TARGET_PEER "peer-a-video"
#define VIDEO_SOFT_BACKPRESSURE_SEGMENTS_DEFAULT 64
#define VIDEO_HARD_BACKPRESSURE_SEGMENTS_DEFAULT 192
#define VIDEO_HARD_BACKPRESSURE_HOLD_MS_DEFAULT 1000
#define VIDEO_DEFAULT_FRAME_STALL_RECONNECT_MS 3000
#define VIDEO_SOFT_BACKPRESSURE_WINDOW_PRESSURE_PCT 90.0
#define VIDEO_HARD_BACKPRESSURE_WINDOW_PRESSURE_PCT 98.0
#define VIDEO_SESSION_POLL_INTERVAL_MS 250
typedef struct video_buffer {
void *start;
size_t length;
} video_buffer_t;
typedef struct video_sender {
kcp_client_t *client;
char target_peer[OMNI_MAX_PEER_ID];
uint8_t *send_buffer;
size_t send_buffer_cap;
} video_sender_t;
static int video_pipeline_stop_requested(volatile sig_atomic_t *stop_requested) {
return stop_requested != NULL && *stop_requested != 0;
}
static int env_flag_or_default(const char *name, int fallback) {
const char *value = getenv(name);
if (value == NULL || value[0] == '\0') {
return fallback;
}
if (
strcmp(value, "1") == 0 || strcmp(value, "true") == 0 || strcmp(value, "TRUE") == 0
|| strcmp(value, "yes") == 0 || strcmp(value, "on") == 0
) {
return 1;
}
if (
strcmp(value, "0") == 0 || strcmp(value, "false") == 0 || strcmp(value, "FALSE") == 0
|| strcmp(value, "no") == 0 || strcmp(value, "off") == 0
) {
return 0;
}
return fallback;
}
static double video_pipeline_now_ms(void) {
struct timespec ts;
clock_gettime(CLOCK_MONOTONIC, &ts);
return ts.tv_sec * 1000.0 + ts.tv_nsec / 1000000.0;
}
static void video_pipeline_print_timing_header(void) {
fprintf(stderr, "Frame | Capture | Decode | Scale | Encode | Send | Total | Size | Marker\n");
fprintf(stderr, "------|---------|--------|-------|--------|------|-------|------|--------\n");
}
static void video_pipeline_print_timing_failure(int frame_number, const char *stage) {
fprintf(stderr, "Frame %d: %s failed\n", frame_number, stage);
}
static void video_pipeline_print_timing_row(
int frame_number,
double capture_ms,
double decode_ms,
double scale_ms,
double encode_ms,
double send_ms,
double total_ms,
const AVPacket *encoded_pkt
) {
size_t size_kb = 0;
unsigned int marker = 0;
if (encoded_pkt != NULL) {
size_kb = (size_t) encoded_pkt->size / 1024;
if (encoded_pkt->size > 1) {
marker = encoded_pkt->data[1];
}
}
fprintf(
stderr,
"%5d | %7.1f | %6.1f | %5.1f | %6.1f | %4.1f | %5.1f | %4zu KB | 0x%02x\n",
frame_number,
capture_ms,
decode_ms,
scale_ms,
encode_ms,
send_ms,
total_ms,
size_kb,
marker
);
}
static const char *env_or_default(const char *name, const char *fallback) {
const char *value = getenv(name);
if (value != NULL && value[0] != '\0') {
return value;
}
return fallback;
}
static const char *env_first_nonempty(const char *first, const char *second, const char *fallback) {
const char *value = getenv(first);
if (value != NULL && value[0] != '\0') {
return value;
}
value = getenv(second);
if (value != NULL && value[0] != '\0') {
return value;
}
return fallback;
}
static int env_int_or_default(const char *name, int fallback) {
const char *value = getenv(name);
int parsed;
if (value == NULL || value[0] == '\0') {
return fallback;
}
parsed = atoi(value);
if (parsed <= 0) {
return fallback;
}
return parsed;
}
static void video_pipeline_set_error(video_pipeline_stats_t *stats, const char *message) {
if (stats == NULL) {
return;
}
pthread_mutex_lock(&stats->mutex);
snprintf(stats->last_error, sizeof(stats->last_error), "%s", message == NULL ? "" : message);
pthread_mutex_unlock(&stats->mutex);
}
static void video_pipeline_set_errno_error(video_pipeline_stats_t *stats, const char *prefix) {
char buffer[256];
int saved_errno = errno;
snprintf(
buffer,
sizeof(buffer),
"%s: %s (errno=%d)",
prefix == NULL ? "video pipeline error" : prefix,
saved_errno != 0 ? strerror(saved_errno) : "unknown error",
saved_errno
);
video_pipeline_set_error(stats, buffer);
}
static void video_pipeline_report_progress(const video_pipeline_config_t *config) {
if (config == NULL || config->progress_callback == NULL) {
return;
}
config->progress_callback(config->progress_context);
}
void video_pipeline_config_init(video_pipeline_config_t *config) {
if (config == NULL) {
return;
}
memset(config, 0, sizeof(*config));
config->camera_device = VIDEO_DEFAULT_CAMERA_DEVICE;
config->server_addr = "";
config->relay_via = "";
config->bind_ip = "";
config->bind_device = "";
config->peer_id = VIDEO_DEFAULT_PEER_ID;
config->target_peer = VIDEO_DEFAULT_TARGET_PEER;
config->capture_width = VIDEO_CAPTURE_WIDTH_DEFAULT;
config->capture_height = VIDEO_CAPTURE_HEIGHT_DEFAULT;
config->output_width = VIDEO_OUTPUT_WIDTH_DEFAULT;
config->output_height = VIDEO_OUTPUT_HEIGHT_DEFAULT;
config->max_frames = 0;
config->enable_timing_logs = 0;
config->soft_backpressure_segments = VIDEO_SOFT_BACKPRESSURE_SEGMENTS_DEFAULT;
config->hard_backpressure_segments = VIDEO_HARD_BACKPRESSURE_SEGMENTS_DEFAULT;
config->hard_backpressure_hold_ms = VIDEO_HARD_BACKPRESSURE_HOLD_MS_DEFAULT;
config->frame_stall_reconnect_ms = VIDEO_DEFAULT_FRAME_STALL_RECONNECT_MS;
config->stats_logger = NULL;
config->stats_interval_ms = 1000;
}
void video_pipeline_config_load_env(video_pipeline_config_t *config) {
if (config == NULL) {
return;
}
config->camera_device = env_or_default("OMNI_CAMERA_DEVICE", config->camera_device);
config->server_addr = env_first_nonempty("OMNI_VIDEO_SERVER_ADDR", "OMNISOCKET_SERVER_ADDR", config->server_addr);
config->relay_via = env_first_nonempty("OMNI_VIDEO_RELAY_VIA", "OMNISOCKET_RELAY_VIA", config->relay_via);
config->bind_ip = env_first_nonempty("OMNI_VIDEO_BIND_IP", "OMNISOCKET_BIND_IP", config->bind_ip);
config->bind_device = env_first_nonempty("OMNI_VIDEO_BIND_DEVICE", "OMNISOCKET_BIND_DEVICE", config->bind_device);
config->peer_id = env_or_default("OMNI_VIDEO_PEER_ID", config->peer_id);
config->target_peer = env_or_default("OMNI_VIDEO_TARGET_PEER", config->target_peer);
if (getenv("OMNI_VIDEO_MAX_FRAMES") != NULL) {
config->max_frames = atoi(getenv("OMNI_VIDEO_MAX_FRAMES"));
}
config->enable_timing_logs = env_flag_or_default("OMNI_VIDEO_DEBUG_TIMING", config->enable_timing_logs);
config->soft_backpressure_segments = env_int_or_default("OMNI_VIDEO_SOFT_BACKPRESSURE_SEGMENTS", config->soft_backpressure_segments);
config->hard_backpressure_segments = env_int_or_default("OMNI_VIDEO_HARD_BACKPRESSURE_SEGMENTS", config->hard_backpressure_segments);
config->hard_backpressure_hold_ms = env_int_or_default("OMNI_VIDEO_HARD_BACKPRESSURE_HOLD_MS", config->hard_backpressure_hold_ms);
config->frame_stall_reconnect_ms = env_int_or_default("OMNI_VIDEO_FRAME_STALL_RECONNECT_MS", config->frame_stall_reconnect_ms);
config->stats_interval_ms = env_int_or_default("BLITZ_KCP_STATS_INTERVAL_MS", config->stats_interval_ms);
}
int video_pipeline_stats_init(video_pipeline_stats_t *stats) {
int rc;
if (stats == NULL) {
errno = EINVAL;
return -1;
}
memset(stats, 0, sizeof(*stats));
rc = pthread_mutex_init(&stats->mutex, NULL);
if (rc != 0) {
errno = rc;
return -1;
}
return 0;
}
void video_pipeline_stats_destroy(video_pipeline_stats_t *stats) {
if (stats == NULL) {
return;
}
pthread_mutex_destroy(&stats->mutex);
}
void video_pipeline_stats_snapshot(video_pipeline_stats_t *stats, video_pipeline_stats_t *out_stats) {
if (stats == NULL || out_stats == NULL) {
return;
}
memset(out_stats, 0, sizeof(*out_stats));
pthread_mutex_lock(&stats->mutex);
out_stats->frames_sent = stats->frames_sent;
out_stats->bytes_sent = stats->bytes_sent;
out_stats->send_errors = stats->send_errors;
out_stats->backpressure_drops = stats->backpressure_drops;
out_stats->backlog_resets = stats->backlog_resets;
out_stats->last_frame_bytes = stats->last_frame_bytes;
out_stats->last_backlog_segments = stats->last_backlog_segments;
out_stats->connected = stats->connected;
snprintf(out_stats->last_error, sizeof(out_stats->last_error), "%s", stats->last_error);
snprintf(out_stats->last_backlog_reason, sizeof(out_stats->last_backlog_reason), "%s", stats->last_backlog_reason);
out_stats->transport = stats->transport;
pthread_mutex_unlock(&stats->mutex);
}
static int open_v4l2_device(const char *device) {
return open(device, O_RDWR | O_NONBLOCK);
}
static int init_v4l2_device(int fd, int width, int height) {
struct v4l2_format fmt;
memset(&fmt, 0, sizeof(fmt));
fmt.type = V4L2_BUF_TYPE_VIDEO_CAPTURE;
fmt.fmt.pix.width = width;
fmt.fmt.pix.height = height;
fmt.fmt.pix.pixelformat = V4L2_PIX_FMT_MJPEG;
fmt.fmt.pix.field = V4L2_FIELD_NONE;
return ioctl(fd, VIDIOC_S_FMT, &fmt);
}
static int init_mmap(int fd, video_buffer_t **buffers, int *num_buffers) {
struct v4l2_requestbuffers req;
int i;
memset(&req, 0, sizeof(req));
req.count = VIDEO_NUM_BUFFERS;
req.type = V4L2_BUF_TYPE_VIDEO_CAPTURE;
req.memory = V4L2_MEMORY_MMAP;
if (ioctl(fd, VIDIOC_REQBUFS, &req) < 0) {
return -1;
}
*num_buffers = (int) req.count;
*buffers = (video_buffer_t *) calloc(req.count, sizeof(video_buffer_t));
if (*buffers == NULL) {
return -1;
}
for (i = 0; i < (int) req.count; i++) {
struct v4l2_buffer buf;
memset(&buf, 0, sizeof(buf));
buf.type = V4L2_BUF_TYPE_VIDEO_CAPTURE;
buf.memory = V4L2_MEMORY_MMAP;
buf.index = (unsigned int) i;
if (ioctl(fd, VIDIOC_QUERYBUF, &buf) < 0) {
return -1;
}
(*buffers)[i].length = buf.length;
(*buffers)[i].start = mmap(NULL, buf.length, PROT_READ | PROT_WRITE, MAP_SHARED, fd, buf.m.offset);
if ((*buffers)[i].start == MAP_FAILED) {
return -1;
}
}
return 0;
}
static AVCodecContext *create_mjpeg_decoder(int width, int height) {
const AVCodec *decoder = avcodec_find_decoder(AV_CODEC_ID_MJPEG);
AVCodecContext *ctx;
AVDictionary *opts = NULL;
if (decoder == NULL) {
errno = ENOENT;
return NULL;
}
ctx = avcodec_alloc_context3(decoder);
if (ctx == NULL) {
return NULL;
}
ctx->width = width;
ctx->height = height;
ctx->pix_fmt = AV_PIX_FMT_YUVJ420P;
ctx->color_range = AVCOL_RANGE_JPEG;
ctx->thread_count = 1;
av_dict_set(&opts, "flags2", "+fast", 0);
if (avcodec_open2(ctx, decoder, &opts) < 0) {
avcodec_free_context(&ctx);
av_dict_free(&opts);
errno = EINVAL;
return NULL;
}
av_dict_free(&opts);
return ctx;
}
static AVCodecContext *create_mjpeg_encoder(int width, int height) {
const AVCodec *encoder = avcodec_find_encoder(AV_CODEC_ID_MJPEG);
AVCodecContext *ctx;
AVDictionary *opts = NULL;
if (encoder == NULL) {
errno = ENOENT;
return NULL;
}
ctx = avcodec_alloc_context3(encoder);
if (ctx == NULL) {
return NULL;
}
ctx->width = width;
ctx->height = height;
ctx->pix_fmt = AV_PIX_FMT_YUVJ420P;
ctx->time_base = (AVRational){1, 30};
ctx->qmin = 8;
ctx->qmax = 31;
ctx->flags |= AV_CODEC_FLAG_QSCALE;
ctx->global_quality = FF_QP2LAMBDA * 5;
av_dict_set(&opts, "huffman", "default", 0);
if (avcodec_open2(ctx, encoder, &opts) < 0) {
avcodec_free_context(&ctx);
av_dict_free(&opts);
errno = EINVAL;
return NULL;
}
av_dict_free(&opts);
return ctx;
}
static int decode_mjpeg_frame(AVCodecContext *decoder, const uint8_t *data, int size, AVFrame **frame) {
AVPacket *pkt;
int ret;
if (frame == NULL) {
errno = EINVAL;
return -1;
}
*frame = NULL;
pkt = av_packet_alloc();
if (pkt == NULL) {
return -1;
}
pkt->data = (uint8_t *) data;
pkt->size = size;
ret = avcodec_send_packet(decoder, pkt);
if (ret < 0) {
av_packet_free(&pkt);
errno = EINVAL;
return -1;
}
*frame = av_frame_alloc();
if (*frame == NULL) {
av_packet_free(&pkt);
return -1;
}
ret = avcodec_receive_frame(decoder, *frame);
av_packet_free(&pkt);
if (ret < 0) {
av_frame_free(frame);
errno = EINVAL;
return -1;
}
return 0;
}
static int ensure_scale_context(
struct SwsContext **sws_ctx,
int *cached_src_width,
int *cached_src_height,
int *cached_src_format,
const AVFrame *src,
int output_width,
int output_height
) {
if (
*sws_ctx != NULL
&& *cached_src_width == src->width
&& *cached_src_height == src->height
&& *cached_src_format == src->format
) {
return 0;
}
sws_freeContext(*sws_ctx);
*sws_ctx = sws_getContext(
src->width,
src->height,
src->format,
output_width,
output_height,
AV_PIX_FMT_YUVJ420P,
SWS_BILINEAR,
NULL,
NULL,
NULL
);
if (*sws_ctx == NULL) {
errno = EINVAL;
return -1;
}
*cached_src_width = src->width;
*cached_src_height = src->height;
*cached_src_format = src->format;
return 0;
}
static int scale_frame(AVFrame *src, AVFrame **dst, struct SwsContext *sws_ctx, int output_width, int output_height) {
int ret;
if (sws_ctx == NULL) {
errno = EINVAL;
return -1;
}
*dst = av_frame_alloc();
if (*dst == NULL) {
return -1;
}
(*dst)->width = output_width;
(*dst)->height = output_height;
(*dst)->format = AV_PIX_FMT_YUVJ420P;
if (av_frame_get_buffer(*dst, 0) < 0) {
av_frame_free(dst);
errno = ENOMEM;
return -1;
}
ret = sws_scale(
sws_ctx,
(const uint8_t *const *) src->data,
src->linesize,
0,
src->height,
(*dst)->data,
(*dst)->linesize
);
if (ret < 0) {
av_frame_free(dst);
errno = EINVAL;
return -1;
}
return 0;
}
static int video_sender_ensure_buffer_capacity(video_sender_t *sender, size_t min_capacity) {
uint8_t *resized_buffer;
size_t next_capacity;
if (sender == NULL) {
errno = EINVAL;
return -1;
}
if (sender->send_buffer_cap >= min_capacity) {
return 0;
}
next_capacity = sender->send_buffer_cap == 0 ? min_capacity : sender->send_buffer_cap;
while (next_capacity < min_capacity) {
next_capacity *= 2;
}
resized_buffer = (uint8_t *) realloc(sender->send_buffer, next_capacity);
if (resized_buffer == NULL) {
return -1;
}
sender->send_buffer = resized_buffer;
sender->send_buffer_cap = next_capacity;
return 0;
}
static int encode_frame(AVCodecContext *encoder, AVFrame *frame, AVPacket **pkt) {
int ret;
if (pkt == NULL) {
errno = EINVAL;
return -1;
}
*pkt = av_packet_alloc();
if (*pkt == NULL) {
return -1;
}
ret = avcodec_send_frame(encoder, frame);
if (ret < 0) {
av_packet_free(pkt);
errno = EINVAL;
return -1;
}
ret = avcodec_receive_packet(encoder, *pkt);
if (ret < 0) {
av_packet_free(pkt);
errno = EINVAL;
return -1;
}
return 0;
}
static int64_t get_realtime_ms(void) {
struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
return (int64_t) ts.tv_sec * 1000 + ts.tv_nsec / 1000000;
}
static int video_sender_init(video_sender_t *sender, const video_pipeline_config_t *config) {
kcp_conn_options_t options;
if (sender == NULL || config == NULL || config->server_addr == NULL || config->server_addr[0] == '\0') {
errno = EINVAL;
return -1;
}
memset(sender, 0, sizeof(*sender));
snprintf(sender->target_peer, sizeof(sender->target_peer), "%s", config->target_peer);
kcp_conn_options_set_video_defaults(&options);
sender->client = kcp_client_dial_with_options(
config->server_addr,
config->relay_via,
config->peer_id,
config->bind_ip,
config->bind_device,
&options,
NULL,
NULL,
config->stats_logger,
config->stats_interval_ms
);
if (sender->client == NULL) {
return -1;
}
return 0;
}
static int video_sender_drain_pending_messages(video_sender_t *sender) {
int drained = 0;
if (sender == NULL || sender->client == NULL) {
errno = EINVAL;
return -1;
}
for (;;) {
message_t msg;
int rc;
protocol_message_init(&msg);
rc = kcp_client_receive_timed(sender->client, &msg, 1);
if (rc == 1) {
protocol_message_clear(&msg);
return 0;
}
if (rc != 0) {
protocol_message_clear(&msg);
return -1;
}
// Drain unread server errors so an offline receiver cannot back up the reverse KCP stream.
protocol_message_clear(&msg);
drained += 1;
if (drained >= 8) {
return 0;
}
}
}
static int video_sender_send_packet(
video_sender_t *sender,
const AVPacket *encoded_pkt,
const video_pipeline_packet_metadata_t *metadata
) {
uint8_t *payload;
size_t payload_len;
int rc;
if (sender == NULL || sender->client == NULL || encoded_pkt == NULL || metadata == NULL) {
errno = EINVAL;
return -1;
}
payload_len = (size_t) encoded_pkt->size + sizeof(*metadata);
if (video_sender_ensure_buffer_capacity(sender, payload_len) != 0) {
return -1;
}
payload = sender->send_buffer;
memcpy(payload, encoded_pkt->data, (size_t) encoded_pkt->size);
memcpy(payload + encoded_pkt->size, metadata, sizeof(*metadata));
rc = kcp_client_send_binary(sender->client, sender->target_peer, payload, payload_len);
if (rc != 0) {
return rc;
}
rc = video_sender_drain_pending_messages(sender);
return rc;
}
static void video_sender_close(video_sender_t *sender) {
if (sender == NULL) {
return;
}
if (sender->client != NULL) {
kcp_client_close(sender->client);
kcp_client_free(sender->client);
sender->client = NULL;
}
free(sender->send_buffer);
sender->send_buffer = NULL;
sender->send_buffer_cap = 0;
}
static uint32_t video_sender_backlog_segments(const kcp_runtime_stats_t *stats) {
if (stats == NULL) {
return 0;
}
return stats->snd_queue + stats->snd_buffer;
}
static int video_sender_soft_backpressure_active(const video_pipeline_config_t *config, const kcp_runtime_stats_t *transport) {
if (config == NULL || transport == NULL) {
return 0;
}
return video_sender_backlog_segments(transport) >= (uint32_t) config->soft_backpressure_segments
|| transport->window_pressure_pct >= VIDEO_SOFT_BACKPRESSURE_WINDOW_PRESSURE_PCT;
}
static int video_sender_hard_backpressure_active(const video_pipeline_config_t *config, const kcp_runtime_stats_t *transport) {
if (config == NULL || transport == NULL) {
return 0;
}
return video_sender_backlog_segments(transport) >= (uint32_t) config->hard_backpressure_segments
|| transport->window_pressure_pct >= VIDEO_HARD_BACKPRESSURE_WINDOW_PRESSURE_PCT;
}
static void video_pipeline_note_backpressure(
video_pipeline_stats_t *stats,
const char *reason,
const kcp_runtime_stats_t *transport,
int increment_drop,
int increment_reset
) {
if (stats == NULL) {
return;
}
pthread_mutex_lock(&stats->mutex);
if (increment_drop) {
stats->backpressure_drops += 1;
}
if (increment_reset) {
stats->backlog_resets += 1;
}
if (transport != NULL) {
stats->last_backlog_segments = video_sender_backlog_segments(transport);
stats->transport = *transport;
} else {
stats->last_backlog_segments = 0;
}
snprintf(stats->last_backlog_reason, sizeof(stats->last_backlog_reason), "%s", reason == NULL ? "" : reason);
pthread_mutex_unlock(&stats->mutex);
}
static int video_server_error_requires_reconnect(const char *message) {
if (message == NULL || message[0] == '\0') {
return 0;
}
return strstr(message, "not registered") != NULL
|| strstr(message, "first message must be register") != NULL
|| strstr(message, "peer replaced") != NULL
|| strstr(message, "timed out waiting for server_register_ok") != NULL
|| strstr(message, "failed to acknowledge server heartbeat") != NULL;
}
static void video_pipeline_update_connection_state(
video_pipeline_stats_t *stats,
const kcp_client_state_t *client_state,
const kcp_runtime_stats_t *transport
) {
if (stats == NULL) {
return;
}
pthread_mutex_lock(&stats->mutex);
if (transport != NULL) {
stats->transport = *transport;
}
if (client_state != NULL) {
stats->connected = client_state->connected != 0 && client_state->registered != 0;
if (client_state->last_server_error[0] != '\0') {
snprintf(stats->last_error, sizeof(stats->last_error), "%s", client_state->last_server_error);
}
}
pthread_mutex_unlock(&stats->mutex);
}
static int video_sender_check_session_stale(
video_sender_t *sender,
const video_pipeline_config_t *config,
video_pipeline_stats_t *stats,
kcp_runtime_stats_t *transport_stats,
char *reason,
size_t reason_len
) {
kcp_client_state_t client_state;
if (
sender == NULL || sender->client == NULL || config == NULL || stats == NULL || transport_stats == NULL
|| reason == NULL || reason_len == 0
) {
errno = EINVAL;
return -1;
}
reason[0] = '\0';
memset(&client_state, 0, sizeof(client_state));
kcp_client_runtime_stats_snapshot(sender->client, transport_stats);
kcp_client_state_snapshot(sender->client, &client_state);
video_pipeline_update_connection_state(stats, &client_state, transport_stats);
if (!transport_stats->connected || !client_state.connected) {
snprintf(reason, reason_len, "video session stale: transport disconnected");
return 1;
}
if (!client_state.registered) {
snprintf(reason, reason_len, "video session stale: server reported unregistered");
return 1;
}
if (video_server_error_requires_reconnect(client_state.last_server_error)) {
snprintf(reason, reason_len, "video session stale: server error %.180s", client_state.last_server_error);
return 1;
}
return 0;
}
static void video_pipeline_cleanup_buffers(video_buffer_t *buffers, int num_buffers) {
int i;
if (buffers == NULL) {
return;
}
for (i = 0; i < num_buffers; i++) {
if (buffers[i].start != NULL && buffers[i].start != MAP_FAILED) {
munmap(buffers[i].start, buffers[i].length);
}
}
free(buffers);
}
int video_pipeline_run(const video_pipeline_config_t *config, video_pipeline_stats_t *stats, volatile sig_atomic_t *stop_requested) {
video_pipeline_config_t defaults;
video_sender_t sender;
video_buffer_t *buffers = NULL;
AVCodecContext *decoder = NULL;
AVCodecContext *encoder = NULL;
struct SwsContext *sws_ctx = NULL;
enum v4l2_buf_type type = V4L2_BUF_TYPE_VIDEO_CAPTURE;
int num_buffers = 0;
int fd = -1;
int frame_index = 0;
int rc = -1;
int sws_src_width = 0;
int sws_src_height = 0;
int sws_src_format = -1;
uint32_t hard_backpressure_since_ms = 0;
uint32_t last_soft_drop_log_ms = 0;
uint32_t last_session_poll_ms = 0;
uint32_t last_successful_send_ms = 0;
uint64_t soft_drops_since_last_send = 0;
int have_sent_frame = 0;
const char *gpsd_host = env_or_default("OMNI_GPSD_HOST", "127.0.0.1");
int gps_buffer_started = 0;
memset(&sender, 0, sizeof(sender));
if (stats == NULL) {
errno = EINVAL;
return -1;
}
video_pipeline_config_init(&defaults);
if (config == NULL) {
config = &defaults;
}
#ifdef QUIET_FFMPEG_LOGS
av_log_set_level(AV_LOG_ERROR);
#endif
if (config->server_addr == NULL || config->server_addr[0] == '\0') {
errno = EINVAL;
video_pipeline_set_error(stats, "video server address is required");
return -1;
}
fd = open_v4l2_device(config->camera_device);
if (fd < 0) {
video_pipeline_set_errno_error(stats, "failed to open camera device");
goto cleanup;
}
if (init_v4l2_device(fd, config->capture_width, config->capture_height) < 0) {
video_pipeline_set_errno_error(stats, "failed to configure V4L2");
goto cleanup;
}
if (init_mmap(fd, &buffers, &num_buffers) < 0) {
video_pipeline_set_errno_error(stats, "failed to initialize V4L2 mmap");
goto cleanup;
}
decoder = create_mjpeg_decoder(config->capture_width, config->capture_height);
encoder = create_mjpeg_encoder(config->output_width, config->output_height);
if (decoder == NULL || encoder == NULL) {
video_pipeline_set_errno_error(stats, "failed to initialize codecs");
goto cleanup;
}
if (video_sender_init(&sender, config) < 0) {
video_pipeline_set_errno_error(stats, "failed to start video sender");
goto cleanup;
}
if (gps_buffer_init(gpsd_host) != 0) {
fprintf(stderr, "[video_pipeline] failed to start GPS buffer using %s:2947\n", gpsd_host);
} else {
gps_buffer_started = 1;
}
pthread_mutex_lock(&stats->mutex);
stats->connected = 1;
stats->last_error[0] = '\0';
pthread_mutex_unlock(&stats->mutex);
for (frame_index = 0; frame_index < num_buffers; frame_index++) {
struct v4l2_buffer buf;
memset(&buf, 0, sizeof(buf));
buf.type = V4L2_BUF_TYPE_VIDEO_CAPTURE;
buf.memory = V4L2_MEMORY_MMAP;
buf.index = (unsigned int) frame_index;
if (ioctl(fd, VIDIOC_QBUF, &buf) < 0) {
video_pipeline_set_errno_error(stats, "failed to queue V4L2 buffer");
goto cleanup;
}
}
if (ioctl(fd, VIDIOC_STREAMON, &type) < 0) {
video_pipeline_set_errno_error(stats, "failed to start V4L2 streaming");
goto cleanup;
}
if (config->enable_timing_logs) {
fprintf(stderr, "\nRunning video pipeline timing benchmark...\n");
video_pipeline_print_timing_header();
}
frame_index = 0;
while (!video_pipeline_stop_requested(stop_requested)) {
fd_set fds;
struct timeval timeout;
struct v4l2_buffer buf;
AVFrame *decoded_frame = NULL;
AVFrame *scaled_frame = NULL;
AVPacket *encoded_pkt = NULL;
kcp_runtime_stats_t transport_stats;
int select_rc;
double total_start_ms = 0.0;
double capture_start_ms = 0.0;
double capture_end_ms = 0.0;
double decode_start_ms = 0.0;
double decode_end_ms = 0.0;
double scale_start_ms = 0.0;
double scale_end_ms = 0.0;
double encode_start_ms = 0.0;
double encode_end_ms = 0.0;
double send_start_ms = 0.0;
double send_end_ms = 0.0;
video_pipeline_packet_metadata_t packet_metadata;
char reconnect_reason[256];
int frame_number = frame_index + 1;
memset(&transport_stats, 0, sizeof(transport_stats));
memset(&packet_metadata, 0, sizeof(packet_metadata));
reconnect_reason[0] = '\0';
video_pipeline_report_progress(config);
if (config->max_frames > 0 && frame_index >= config->max_frames) {
break;
}
if (config->enable_timing_logs) {
total_start_ms = video_pipeline_now_ms();
}
FD_ZERO(&fds);
FD_SET(fd, &fds);
timeout.tv_sec = 2;
timeout.tv_usec = 0;
select_rc = select(fd + 1, &fds, NULL, NULL, &timeout);
if (select_rc <= 0) {
if (select_rc == 0) {
errno = ETIMEDOUT;
}
video_pipeline_set_errno_error(stats, "failed waiting for camera frame");
goto cleanup;
}
if (config->enable_timing_logs) {
capture_start_ms = video_pipeline_now_ms();
}
memset(&buf, 0, sizeof(buf));
buf.type = V4L2_BUF_TYPE_VIDEO_CAPTURE;
buf.memory = V4L2_MEMORY_MMAP;
if (ioctl(fd, VIDIOC_DQBUF, &buf) < 0) {
video_pipeline_set_errno_error(stats, "failed to dequeue V4L2 buffer");
goto cleanup;
}
if (config->enable_timing_logs) {
capture_end_ms = video_pipeline_now_ms();
decode_start_ms = capture_end_ms;
}
if (decode_mjpeg_frame(decoder, (const uint8_t *) buffers[buf.index].start, (int) buf.bytesused, &decoded_frame) != 0) {
if (config->enable_timing_logs) {
video_pipeline_print_timing_failure(frame_number, "decode");
}
(void) ioctl(fd, VIDIOC_QBUF, &buf);
continue;
}
if (config->enable_timing_logs) {
decode_end_ms = video_pipeline_now_ms();
scale_start_ms = decode_end_ms;
}
if (
ensure_scale_context(
&sws_ctx,
&sws_src_width,
&sws_src_height,
&sws_src_format,
decoded_frame,
config->output_width,
config->output_height
) != 0
|| scale_frame(decoded_frame, &scaled_frame, sws_ctx, config->output_width, config->output_height) != 0
) {
if (config->enable_timing_logs) {
video_pipeline_print_timing_failure(frame_number, "scale");
}
av_frame_free(&decoded_frame);
(void) ioctl(fd, VIDIOC_QBUF, &buf);
continue;
}
if (config->enable_timing_logs) {
scale_end_ms = video_pipeline_now_ms();
encode_start_ms = scale_end_ms;
}
if (encode_frame(encoder, scaled_frame, &encoded_pkt) != 0) {
if (config->enable_timing_logs) {
video_pipeline_print_timing_failure(frame_number, "encode");
}
av_frame_free(&decoded_frame);
av_frame_free(&scaled_frame);
(void) ioctl(fd, VIDIOC_QBUF, &buf);
continue;
}
if (config->enable_timing_logs) {
encode_end_ms = video_pipeline_now_ms();
send_start_ms = encode_end_ms;
}
{
gps_video_sample_t gps_sample = get_latest_gps_for_video();
packet_metadata.timestamp_ms = (uint64_t) get_realtime_ms();
packet_metadata.latitude = gps_sample.latitude;
packet_metadata.longitude = gps_sample.longitude;
}
if (
last_session_poll_ms == 0
|| omni_now_millis32() - last_session_poll_ms >= VIDEO_SESSION_POLL_INTERVAL_MS
) {
if (video_sender_drain_pending_messages(&sender) != 0) {
video_pipeline_set_errno_error(stats, "failed to poll video session");
av_frame_free(&decoded_frame);
av_frame_free(&scaled_frame);
av_packet_free(&encoded_pkt);
(void) ioctl(fd, VIDIOC_QBUF, &buf);
rc = VIDEO_PIPELINE_RUN_RETRY_IMMEDIATE;
goto cleanup;
}
if (
video_sender_check_session_stale(
&sender,
config,
stats,
&transport_stats,
reconnect_reason,
sizeof(reconnect_reason)
) != 0
) {
if (reconnect_reason[0] == '\0') {
snprintf(reconnect_reason, sizeof(reconnect_reason), "video session stale: poll failed");
}
video_pipeline_set_error(stats, reconnect_reason);
fprintf(stderr, "[video_pipeline] %s\n", reconnect_reason);
av_frame_free(&decoded_frame);
av_frame_free(&scaled_frame);
av_packet_free(&encoded_pkt);
(void) ioctl(fd, VIDIOC_QBUF, &buf);
rc = VIDEO_PIPELINE_RUN_RETRY_IMMEDIATE;
goto cleanup;
}
last_session_poll_ms = omni_now_millis32();
} else {
kcp_client_runtime_stats_snapshot(sender.client, &transport_stats);
}
if (video_sender_hard_backpressure_active(config, &transport_stats)) {
uint32_t now_ms = omni_now_millis32();
if (hard_backpressure_since_ms == 0) {
hard_backpressure_since_ms = now_ms;
}
if (now_ms - hard_backpressure_since_ms >= (uint32_t) config->hard_backpressure_hold_ms) {
char reason[128];
uint32_t backlog_segments = video_sender_backlog_segments(&transport_stats);
snprintf(
reason,
sizeof(reason),
"hard_reset backlog=%u snd_queue=%u snd_buffer=%u window_pressure=%.1f%% hold_ms=%d",
backlog_segments,
transport_stats.snd_queue,
transport_stats.snd_buffer,
transport_stats.window_pressure_pct,
config->hard_backpressure_hold_ms
);
video_pipeline_note_backpressure(stats, reason, &transport_stats, 0, 1);
video_pipeline_set_error(stats, reason);
fprintf(
stderr,
"[video_pipeline] backlog hard reset: backlog=%u snd_queue=%u snd_buffer=%u window_pressure=%.1f%% hold_ms=%d\n",
backlog_segments,
transport_stats.snd_queue,
transport_stats.snd_buffer,
transport_stats.window_pressure_pct,
config->hard_backpressure_hold_ms
);
av_frame_free(&decoded_frame);
av_frame_free(&scaled_frame);
av_packet_free(&encoded_pkt);
(void) ioctl(fd, VIDIOC_QBUF, &buf);
rc = VIDEO_PIPELINE_RUN_RETRY_IMMEDIATE;
goto cleanup;
}
} else {
hard_backpressure_since_ms = 0;
}
if (video_sender_soft_backpressure_active(config, &transport_stats)) {
uint32_t now_ms = omni_now_millis32();
uint32_t backlog_segments = video_sender_backlog_segments(&transport_stats);
char reason[128];
snprintf(
reason,
sizeof(reason),
"soft_drop backlog=%u snd_queue=%u snd_buffer=%u window_pressure=%.1f%% threshold=%d",
backlog_segments,
transport_stats.snd_queue,
transport_stats.snd_buffer,
transport_stats.window_pressure_pct,
config->soft_backpressure_segments
);
video_pipeline_note_backpressure(stats, reason, &transport_stats, 1, 0);
soft_drops_since_last_send += 1;
if (now_ms - last_soft_drop_log_ms >= 1000U) {
fprintf(
stderr,
"[video_pipeline] soft drop: backlog=%u snd_queue=%u snd_buffer=%u window_pressure=%.1f%% threshold=%d\n",
backlog_segments,
transport_stats.snd_queue,
transport_stats.snd_buffer,
transport_stats.window_pressure_pct,
config->soft_backpressure_segments
);
last_soft_drop_log_ms = now_ms;
}
if (
have_sent_frame
&& config->frame_stall_reconnect_ms > 0
&& now_ms - last_successful_send_ms >= (uint32_t) config->frame_stall_reconnect_ms
) {
char stall_reason[192];
snprintf(
stall_reason,
sizeof(stall_reason),
"video pipeline stalled: no frames sent for %u ms while soft dropping (%llu drops, backlog=%u, srtt=%d ms)",
now_ms - last_successful_send_ms,
(unsigned long long) soft_drops_since_last_send,
backlog_segments,
transport_stats.srtt_ms
);
video_pipeline_set_error(stats, stall_reason);
fprintf(stderr, "[video_pipeline] %s\n", stall_reason);
av_frame_free(&decoded_frame);
av_frame_free(&scaled_frame);
av_packet_free(&encoded_pkt);
(void) ioctl(fd, VIDIOC_QBUF, &buf);
rc = VIDEO_PIPELINE_RUN_RETRY_IMMEDIATE;
goto cleanup;
}
av_frame_free(&decoded_frame);
av_frame_free(&scaled_frame);
av_packet_free(&encoded_pkt);
(void) ioctl(fd, VIDIOC_QBUF, &buf);
continue;
}
if (video_sender_send_packet(&sender, encoded_pkt, &packet_metadata) != 0) {
pthread_mutex_lock(&stats->mutex);
stats->send_errors += 1;
pthread_mutex_unlock(&stats->mutex);
if (config->enable_timing_logs) {
video_pipeline_print_timing_failure(frame_number, "send");
}
video_pipeline_set_errno_error(stats, "failed to send video packet");
av_frame_free(&decoded_frame);
av_frame_free(&scaled_frame);
av_packet_free(&encoded_pkt);
(void) ioctl(fd, VIDIOC_QBUF, &buf);
goto cleanup;
}
if (config->enable_timing_logs) {
send_end_ms = video_pipeline_now_ms();
}
pthread_mutex_lock(&stats->mutex);
stats->frames_sent += 1;
stats->bytes_sent += (uint64_t) encoded_pkt->size;
stats->last_frame_bytes = (uint64_t) encoded_pkt->size;
kcp_client_runtime_stats_snapshot(sender.client, &stats->transport);
pthread_mutex_unlock(&stats->mutex);
have_sent_frame = 1;
last_successful_send_ms = omni_now_millis32();
soft_drops_since_last_send = 0;
if (config->enable_timing_logs) {
video_pipeline_print_timing_row(
frame_number,
capture_end_ms - capture_start_ms,
decode_end_ms - decode_start_ms,
scale_end_ms - scale_start_ms,
encode_end_ms - encode_start_ms,
send_end_ms - send_start_ms,
send_end_ms - total_start_ms,
encoded_pkt
);
}
av_frame_free(&decoded_frame);
av_frame_free(&scaled_frame);
av_packet_free(&encoded_pkt);
if (ioctl(fd, VIDIOC_QBUF, &buf) < 0) {
video_pipeline_set_errno_error(stats, "failed to requeue V4L2 buffer");
goto cleanup;
}
frame_index += 1;
}
rc = 0;
cleanup:
pthread_mutex_lock(&stats->mutex);
stats->connected = 0;
pthread_mutex_unlock(&stats->mutex);
if (gps_buffer_started) {
gps_buffer_cleanup();
}
if (fd >= 0) {
(void) ioctl(fd, VIDIOC_STREAMOFF, &type);
}
video_sender_close(&sender);
if (encoder != NULL) {
avcodec_free_context(&encoder);
}
if (decoder != NULL) {
avcodec_free_context(&decoder);
}
sws_freeContext(sws_ctx);
video_pipeline_cleanup_buffers(buffers, num_buffers);
if (fd >= 0) {
close(fd);
}
return rc;
}