diff --git a/cmd/b_side_omnid.c b/cmd/b_side_omnid.c index cd39799..c340c0f 100644 --- a/cmd/b_side_omnid.c +++ b/cmd/b_side_omnid.c @@ -16,6 +16,7 @@ #define CONTROL_DEFAULT_PEER_ID "peer-b-ctrl" #define CONTROL_DEFAULT_EXPECTED_SENDER "peer-a-ctrl" #define CONTROL_DEFAULT_UNIX_SOCKET "/tmp/omnisocket-b-side-cmd.sock" +#define CONTROL_DEFAULT_SERVER_IDLE_RECONNECT_MS 3000 typedef struct unix_dgram_client { int fd; @@ -31,9 +32,11 @@ typedef struct control_bridge_stats { uint64_t invalid_packets; uint64_t unix_send_errors; uint64_t reconnect_count; + uint32_t server_idle_ms; int ever_connected; int registered; char last_error[256]; + char last_reconnect_reason[256]; kcp_runtime_stats_t transport; } control_bridge_stats_t; @@ -48,6 +51,7 @@ typedef struct daemon_state { const char *control_peer_id; const char *control_expected_sender; const char *control_unix_socket; + int control_server_idle_reconnect_ms; unix_dgram_client_t unix_client; control_bridge_stats_t control_stats; } daemon_state_t; @@ -91,6 +95,20 @@ static const char *env_first_nonempty(const char *first, const char *second, con return fallback; } +static int env_int_or_default(const char *name, int fallback) { + const char *value = getenv(name); + int parsed; + + if (value == NULL || value[0] == '\0') { + return fallback; + } + parsed = atoi(value); + if (parsed <= 0) { + return fallback; + } + return parsed; +} + static int control_bridge_stats_init(control_bridge_stats_t *stats) { int rc; if (stats == NULL) { @@ -124,6 +142,15 @@ static void control_bridge_set_error(control_bridge_stats_t *stats, const char * pthread_mutex_unlock(&stats->mutex); } +static void control_bridge_set_reconnect_reason(control_bridge_stats_t *stats, const char *message) { + if (stats == NULL) { + return; + } + pthread_mutex_lock(&stats->mutex); + snprintf(stats->last_reconnect_reason, sizeof(stats->last_reconnect_reason), "%s", message == NULL ? "" : message); + pthread_mutex_unlock(&stats->mutex); +} + static void control_bridge_set_errno_error(control_bridge_stats_t *stats, const char *prefix) { char buffer[256]; int saved_errno = errno; @@ -149,12 +176,39 @@ static void control_bridge_stats_snapshot(control_bridge_stats_t *stats, control out_stats->invalid_packets = stats->invalid_packets; out_stats->unix_send_errors = stats->unix_send_errors; out_stats->reconnect_count = stats->reconnect_count; + out_stats->server_idle_ms = stats->server_idle_ms; out_stats->registered = stats->registered; snprintf(out_stats->last_error, sizeof(out_stats->last_error), "%s", stats->last_error); + snprintf(out_stats->last_reconnect_reason, sizeof(out_stats->last_reconnect_reason), "%s", stats->last_reconnect_reason); out_stats->transport = stats->transport; pthread_mutex_unlock(&stats->mutex); } +static int control_server_error_requires_reconnect(const char *message) { + if (message == NULL || message[0] == '\0') { + return 0; + } + return strstr(message, "not registered") != NULL + || strstr(message, "first message must be register") != NULL + || strstr(message, "peer replaced") != NULL + || strstr(message, "timed out waiting for server_register_ok") != NULL; +} + +static void control_message_body_to_cstr(const message_t *msg, char *buffer, size_t buffer_len) { + size_t copy_len; + + if (buffer == NULL || buffer_len == 0) { + return; + } + buffer[0] = '\0'; + if (msg == NULL || msg->body == NULL || msg->body_len == 0) { + return; + } + copy_len = msg->body_len < (buffer_len - 1U) ? msg->body_len : (buffer_len - 1U); + memcpy(buffer, msg->body, copy_len); + buffer[copy_len] = '\0'; +} + static int unix_dgram_client_init(unix_dgram_client_t *client, const char *dest_path) { struct sockaddr_un bind_addr; pid_t pid; @@ -241,9 +295,14 @@ static void *video_thread_main(void *arg) { daemon_state_t *state = (daemon_state_t *) arg; while (!*state->stop_requested) { - if (video_pipeline_run(&state->video_config, &state->video_stats, state->stop_requested) == 0) { + int video_rc = video_pipeline_run(&state->video_config, &state->video_stats, state->stop_requested); + + if (video_rc == 0) { break; } + if (video_rc == VIDEO_PIPELINE_RUN_RETRY_IMMEDIATE) { + continue; + } if (!*state->stop_requested) { sleep(1); } @@ -257,6 +316,7 @@ static void *control_thread_main(void *arg) { while (!*state->stop_requested) { kcp_conn_options_t options; kcp_client_t *client = NULL; + int reconnect_immediately = 0; kcp_conn_options_set_control_defaults(&options); client = kcp_client_dial_with_options( @@ -289,7 +349,10 @@ static void *control_thread_main(void *arg) { state->control_stats.ever_connected = 1; } state->control_stats.registered = client_state.registered; + state->control_stats.server_idle_ms = client_state.server_idle_ms; + state->control_stats.last_reconnect_reason[0] = '\0'; snprintf(state->control_stats.last_error, sizeof(state->control_stats.last_error), "%s", client_state.last_server_error); + kcp_client_runtime_stats_snapshot(client, &state->control_stats.transport); pthread_mutex_unlock(&state->control_stats.mutex); } @@ -301,20 +364,63 @@ static void *control_thread_main(void *arg) { protocol_message_init(&msg); rc = kcp_client_receive_timed(client, &msg, 100); if (rc == 1) { + char reconnect_reason[256]; + protocol_message_clear(&msg); memset(&client_state, 0, sizeof(client_state)); kcp_client_state_snapshot(client, &client_state); pthread_mutex_lock(&state->control_stats.mutex); state->control_stats.registered = client_state.registered; + state->control_stats.server_idle_ms = client_state.server_idle_ms; snprintf(state->control_stats.last_error, sizeof(state->control_stats.last_error), "%s", client_state.last_server_error); + kcp_client_runtime_stats_snapshot(client, &state->control_stats.transport); pthread_mutex_unlock(&state->control_stats.mutex); + if (!client_state.registered) { + snprintf(reconnect_reason, sizeof(reconnect_reason), "control session stale: server reported unregistered"); + } else if ( + state->control_server_idle_reconnect_ms > 0 + && client_state.server_idle_ms >= (uint32_t) state->control_server_idle_reconnect_ms + ) { + snprintf( + reconnect_reason, + sizeof(reconnect_reason), + "control session stale: server idle timeout (%u ms >= %d ms)", + client_state.server_idle_ms, + state->control_server_idle_reconnect_ms + ); + } else if (control_server_error_requires_reconnect(client_state.last_server_error)) { + snprintf( + reconnect_reason, + sizeof(reconnect_reason), + "control session stale: server error %.180s", + client_state.last_server_error + ); + } else { + reconnect_reason[0] = '\0'; + } + if (reconnect_reason[0] != '\0') { + control_bridge_set_error(&state->control_stats, reconnect_reason); + control_bridge_set_reconnect_reason(&state->control_stats, reconnect_reason); + fprintf(stderr, "[b_side_omnid] %s\n", reconnect_reason); + reconnect_immediately = 1; + break; + } continue; } if (rc != 0) { memset(&client_state, 0, sizeof(client_state)); kcp_client_state_snapshot(client, &client_state); + pthread_mutex_lock(&state->control_stats.mutex); + state->control_stats.registered = client_state.registered; + state->control_stats.server_idle_ms = client_state.server_idle_ms; + kcp_client_runtime_stats_snapshot(client, &state->control_stats.transport); + pthread_mutex_unlock(&state->control_stats.mutex); if (client_state.last_server_error[0] != '\0') { control_bridge_set_error(&state->control_stats, client_state.last_server_error); + if (control_server_error_requires_reconnect(client_state.last_server_error)) { + control_bridge_set_reconnect_reason(&state->control_stats, client_state.last_server_error); + reconnect_immediately = 1; + } } else { control_bridge_set_errno_error(&state->control_stats, "control receive loop stopped"); } @@ -323,7 +429,20 @@ static void *control_thread_main(void *arg) { } if (msg.type == MSG_TYPE_ERROR && strcmp(msg.from, SERVER_PEER_ID) == 0) { - control_bridge_set_error(&state->control_stats, (const char *) msg.body); + char server_error[256]; + + control_message_body_to_cstr(&msg, server_error, sizeof(server_error)); + control_bridge_set_error(&state->control_stats, server_error); + if (control_server_error_requires_reconnect(server_error)) { + char reconnect_reason[256]; + + snprintf(reconnect_reason, sizeof(reconnect_reason), "control session stale: server error %.180s", server_error); + control_bridge_set_reconnect_reason(&state->control_stats, reconnect_reason); + fprintf(stderr, "[b_side_omnid] %s\n", reconnect_reason); + reconnect_immediately = 1; + protocol_message_clear(&msg); + break; + } protocol_message_clear(&msg); continue; } @@ -351,8 +470,12 @@ static void *control_thread_main(void *arg) { recovered = unix_dgram_client_send(&state->unix_client, msg.body, msg.body_len) == 0; } if (recovered) { + memset(&client_state, 0, sizeof(client_state)); + kcp_client_state_snapshot(client, &client_state); pthread_mutex_lock(&state->control_stats.mutex); state->control_stats.packets_forwarded += 1; + state->control_stats.registered = client_state.registered; + state->control_stats.server_idle_ms = client_state.server_idle_ms; kcp_client_runtime_stats_snapshot(client, &state->control_stats.transport); pthread_mutex_unlock(&state->control_stats.mutex); protocol_message_clear(&msg); @@ -367,8 +490,12 @@ static void *control_thread_main(void *arg) { continue; } + memset(&client_state, 0, sizeof(client_state)); + kcp_client_state_snapshot(client, &client_state); pthread_mutex_lock(&state->control_stats.mutex); state->control_stats.packets_forwarded += 1; + state->control_stats.registered = client_state.registered; + state->control_stats.server_idle_ms = client_state.server_idle_ms; kcp_client_runtime_stats_snapshot(client, &state->control_stats.transport); pthread_mutex_unlock(&state->control_stats.mutex); protocol_message_clear(&msg); @@ -376,10 +503,11 @@ static void *control_thread_main(void *arg) { pthread_mutex_lock(&state->control_stats.mutex); state->control_stats.registered = 0; + state->control_stats.server_idle_ms = 0; pthread_mutex_unlock(&state->control_stats.mutex); kcp_client_close(client); kcp_client_free(client); - if (!*state->stop_requested) { + if (!*state->stop_requested && !reconnect_immediately) { sleep(1); } } @@ -398,17 +526,23 @@ static void print_stats(daemon_state_t *state) { fprintf( stderr, - "[b_side_omnid] video registered=%d frames=%llu bytes=%llu srtt=%dms | control registered=%d reconnects=%llu forwarded=%llu invalid=%llu unix_err=%llu srtt=%dms\n", + "[b_side_omnid] video registered=%d frames=%llu bytes=%llu drops=%llu resets=%llu backlog=%u reason=%s srtt=%dms | control registered=%d idle=%ums reconnects=%llu forwarded=%llu invalid=%llu unix_err=%llu srtt=%dms last_reconnect=%s\n", video_stats.connected, (unsigned long long) video_stats.frames_sent, (unsigned long long) video_stats.bytes_sent, + (unsigned long long) video_stats.backpressure_drops, + (unsigned long long) video_stats.backlog_resets, + video_stats.last_backlog_segments, + video_stats.last_backlog_reason[0] == '\0' ? "-" : video_stats.last_backlog_reason, video_stats.transport.srtt_ms, control_stats.registered, + control_stats.server_idle_ms, (unsigned long long) control_stats.reconnect_count, (unsigned long long) control_stats.packets_forwarded, (unsigned long long) control_stats.invalid_packets, (unsigned long long) control_stats.unix_send_errors, - control_stats.transport.srtt_ms + control_stats.transport.srtt_ms, + control_stats.last_reconnect_reason[0] == '\0' ? "-" : control_stats.last_reconnect_reason ); } @@ -429,6 +563,10 @@ int main(void) { state.control_peer_id = env_or_default("OMNI_CONTROL_PEER_ID", CONTROL_DEFAULT_PEER_ID); state.control_expected_sender = env_or_default("OMNI_CONTROL_EXPECTED_SENDER", CONTROL_DEFAULT_EXPECTED_SENDER); state.control_unix_socket = env_or_default("OMNI_CONTROL_UNIX_SOCKET_PATH", CONTROL_DEFAULT_UNIX_SOCKET); + state.control_server_idle_reconnect_ms = env_int_or_default( + "OMNI_CONTROL_SERVER_IDLE_RECONNECT_MS", + CONTROL_DEFAULT_SERVER_IDLE_RECONNECT_MS + ); if (state.video_config.server_addr == NULL || state.video_config.server_addr[0] == '\0' || state.control_server_addr == NULL || state.control_server_addr[0] == '\0') { diff --git a/cmd/v1_camera_pipeline_ifdef.c b/cmd/v1_camera_pipeline_ifdef.c index bd32de6..093ee6e 100644 --- a/cmd/v1_camera_pipeline_ifdef.c +++ b/cmd/v1_camera_pipeline_ifdef.c @@ -17,10 +17,17 @@ int main(void) { return 1; } - if (video_pipeline_run(&config, &stats, NULL) != 0) { - perror("video_pipeline_run"); - video_pipeline_stats_destroy(&stats); - return 1; + for (;;) { + int rc = video_pipeline_run(&config, &stats, NULL); + + if (rc == 0) { + break; + } + if (rc != VIDEO_PIPELINE_RUN_RETRY_IMMEDIATE) { + perror("video_pipeline_run"); + video_pipeline_stats_destroy(&stats); + return 1; + } } video_pipeline_stats_destroy(&stats); diff --git a/include/peer_kcp_client.h b/include/peer_kcp_client.h index 97e9529..84ff7e1 100644 --- a/include/peer_kcp_client.h +++ b/include/peer_kcp_client.h @@ -19,6 +19,7 @@ typedef struct kcp_client_recv_meta { typedef struct kcp_client_state { int connected; int registered; + uint32_t server_idle_ms; char last_server_error[256]; } kcp_client_state_t; diff --git a/include/video_pipeline.h b/include/video_pipeline.h index 996d906..f5c061a 100644 --- a/include/video_pipeline.h +++ b/include/video_pipeline.h @@ -25,6 +25,9 @@ typedef struct video_pipeline_config { int output_height; int max_frames; int enable_timing_logs; + int soft_backpressure_segments; + int hard_backpressure_segments; + int hard_backpressure_hold_ms; } video_pipeline_config_t; typedef struct video_pipeline_stats { @@ -32,12 +35,18 @@ typedef struct video_pipeline_stats { uint64_t frames_sent; uint64_t bytes_sent; uint64_t send_errors; + uint64_t backpressure_drops; + uint64_t backlog_resets; uint64_t last_frame_bytes; + uint32_t last_backlog_segments; int connected; char last_error[256]; + char last_backlog_reason[128]; kcp_runtime_stats_t transport; } video_pipeline_stats_t; +#define VIDEO_PIPELINE_RUN_RETRY_IMMEDIATE 2 + void video_pipeline_config_init(video_pipeline_config_t *config); void video_pipeline_config_load_env(video_pipeline_config_t *config); int video_pipeline_stats_init(video_pipeline_stats_t *stats); diff --git a/scripts/dev/README.md b/scripts/dev/README.md index c78a7a4..1bdbd7d 100644 --- a/scripts/dev/README.md +++ b/scripts/dev/README.md @@ -53,6 +53,11 @@ If you only want the shared environment for manual commands: source scripts/dev/load-env.sh ``` +When you launch via `start-*.sh`, you do not need to manually `export` the variables from +`robot-remote.env` or `robot-remote.env.local`. `load-env.sh` loads those files with `set -a`, +so the variables are exported automatically for the child process. Manual `export` is only needed +if you bypass these scripts and start binaries directly from a clean shell. + ## Customizing Edit `scripts/dev/robot-remote.env` for shared changes such as: @@ -66,6 +71,11 @@ Edit `scripts/dev/robot-remote.env` for shared changes such as: - `OMNI_CAMERA_DEVICE` - `OMNI_VIDEO_PEER_ID` - `OMNI_CONTROL_PEER_ID` +- `OMNI_VIDEO_SOFT_BACKPRESSURE_SEGMENTS` +- `OMNI_VIDEO_HARD_BACKPRESSURE_SEGMENTS` +- `OMNI_VIDEO_HARD_BACKPRESSURE_HOLD_MS` +- `OMNI_CONTROL_SERVER_IDLE_RECONNECT_MS` +- `OMNI_VIDEO_MAX_FRAME_AGE_MS` Role mapping: @@ -73,6 +83,12 @@ Role mapping: - `start-b-side-omnid.sh` uses the `ROBOT_SIDE_*` address pair - `start-ros-receiver.sh` defaults to the robot-side address pair, but with `transport=unix_dgram` it usually does not need the server address +New repair knobs: + +- `OMNI_VIDEO_SOFT_BACKPRESSURE_SEGMENTS`, `OMNI_VIDEO_HARD_BACKPRESSURE_SEGMENTS`, and `OMNI_VIDEO_HARD_BACKPRESSURE_HOLD_MS` are used by `b_side_omnid` +- `OMNI_CONTROL_SERVER_IDLE_RECONNECT_MS` is used by `b_side_omnid` +- `OMNI_VIDEO_MAX_FRAME_AGE_MS` is used by `start-backend.sh` on the A-side backend, not by `b_side_omnid` + Put machine-specific overrides into `scripts/dev/robot-remote.env.local`. Example: ```bash diff --git a/scripts/dev/robot-remote.env b/scripts/dev/robot-remote.env index 87dc195..b311ca6 100644 --- a/scripts/dev/robot-remote.env +++ b/scripts/dev/robot-remote.env @@ -40,10 +40,17 @@ OMNI_VIDEO_TARGET_PEER="peer-a-video" OMNI_CAMERA_DEVICE="/dev/video26" OMNI_VIDEO_SERVER_ADDR="${ROBOT_SIDE_OMNISOCKET_SERVER_ADDR}" OMNI_VIDEO_RELAY_VIA="${ROBOT_SIDE_OMNISOCKET_RELAY_VIA}" +OMNI_VIDEO_SOFT_BACKPRESSURE_SEGMENTS="64" +OMNI_VIDEO_HARD_BACKPRESSURE_SEGMENTS="192" +OMNI_VIDEO_HARD_BACKPRESSURE_HOLD_MS="1000" OMNI_CONTROL_PEER_ID="peer-b-ctrl" OMNI_CONTROL_EXPECTED_SENDER="peer-a-ctrl" OMNI_CONTROL_SERVER_ADDR="${ROBOT_SIDE_OMNISOCKET_SERVER_ADDR}" OMNI_CONTROL_RELAY_VIA="${ROBOT_SIDE_OMNISOCKET_RELAY_VIA}" OMNI_CONTROL_UNIX_SOCKET_PATH="${ROBOT_RECEIVER_LOCAL_SOCKET_PATH}" +OMNI_CONTROL_SERVER_IDLE_RECONNECT_MS="3000" + +# A-side backend video freshness guard. Used by scripts/dev/start-backend.sh. +OMNI_VIDEO_MAX_FRAME_AGE_MS="1000" B_SIDE_OMNID_USE_SUDO="1" diff --git a/src/peer_kcp_client.c b/src/peer_kcp_client.c index b7ee51b..88ec65c 100644 --- a/src/peer_kcp_client.c +++ b/src/peer_kcp_client.c @@ -19,6 +19,7 @@ struct kcp_client { pthread_mutex_t state_mu; uint64_t next_message_id; int registered; + uint32_t last_server_activity_ms; char last_server_error[256]; }; @@ -42,6 +43,15 @@ static void kcp_client_set_registered(kcp_client_t *client, int registered) { pthread_mutex_unlock(&client->state_mu); } +static void kcp_client_touch_server_activity(kcp_client_t *client) { + if (client == NULL) { + return; + } + pthread_mutex_lock(&client->state_mu); + client->last_server_activity_ms = omni_now_millis32(); + pthread_mutex_unlock(&client->state_mu); +} + static void kcp_client_set_last_server_error(kcp_client_t *client, const char *message) { if (client == NULL) { return; @@ -55,6 +65,16 @@ static void kcp_client_clear_last_server_error(kcp_client_t *client) { kcp_client_set_last_server_error(client, ""); } +static int kcp_client_server_error_invalidates_registration(const char *message) { + if (message == NULL || message[0] == '\0') { + return 0; + } + return strstr(message, "not registered") != NULL + || strstr(message, "first message must be register") != NULL + || strstr(message, "peer replaced") != NULL + || strstr(message, "timed out waiting for server_register_ok") != NULL; +} + static int kcp_client_is_registered(kcp_client_t *client) { int registered; @@ -158,6 +178,7 @@ static int kcp_client_handle_reserved_server_message(kcp_client_t *client, const if (msg->type != MSG_TYPE_TEXT || strcmp(msg->from, SERVER_PEER_ID) != 0) { return 0; } + kcp_client_touch_server_activity(client); if (kcp_client_text_body_equals(msg, KCP_CLIENT_CTRL_REGISTER_OK)) { kcp_client_set_registered(client, 1); kcp_client_clear_last_server_error(client); @@ -239,6 +260,7 @@ static int kcp_client_wait_for_register_ok(kcp_client_t *client) { char error_text[256]; kcp_client_copy_server_error_body(&msg, error_text, sizeof(error_text)); + kcp_client_touch_server_activity(client); kcp_client_set_registered(client, 0); kcp_client_set_last_server_error(client, error_text); protocol_message_clear(&msg); @@ -290,6 +312,9 @@ static int kcp_client_receive_business_timed(kcp_client_t *client, message_t *ou return rc; } + if (strcmp(out_msg->from, SERVER_PEER_ID) == 0) { + kcp_client_touch_server_activity(client); + } reserved_rc = kcp_client_handle_reserved_server_message(client, out_msg); if (reserved_rc < 0) { protocol_message_clear(out_msg); @@ -304,6 +329,9 @@ static int kcp_client_receive_business_timed(kcp_client_t *client, message_t *ou kcp_client_copy_server_error_body(out_msg, error_text, sizeof(error_text)); kcp_client_set_last_server_error(client, error_text); + if (kcp_client_server_error_invalidates_registration(error_text)) { + kcp_client_set_registered(client, 0); + } } latencylog_log_message_event(client->logger, OMNI_NODE_ROLE_PEER, client->id, EVENT_B_APP_RECV, out_msg); return 0; @@ -399,6 +427,7 @@ kcp_client_t *kcp_client_dial_with_options(const char *server_addr, const char * snprintf(client->id, sizeof(client->id), "%s", peer_id); snprintf(client->server_addr, sizeof(client->server_addr), "%s", server_addr == NULL ? "" : server_addr); pthread_mutex_init(&client->state_mu, NULL); + client->last_server_activity_ms = omni_now_millis32(); client->logger = logger; client->conn = kcp_conn_dial_with_options(actual_dial_addr, bind_ip, bind_device, options, packet_logger, logger, OMNI_NODE_ROLE_PEER, peer_id, stats_logger, stats_interval_ms); if (client->conn == NULL) { @@ -596,6 +625,9 @@ void kcp_client_state_snapshot(kcp_client_t *client, kcp_client_state_t *out_sta } pthread_mutex_lock(&client->state_mu); out_state->registered = client->registered; + out_state->server_idle_ms = client->last_server_activity_ms == 0 + ? 0 + : (omni_now_millis32() - client->last_server_activity_ms); snprintf(out_state->last_server_error, sizeof(out_state->last_server_error), "%s", client->last_server_error); pthread_mutex_unlock(&client->state_mu); } diff --git a/src/video_pipeline.c b/src/video_pipeline.c index eadf31f..2e35347 100644 --- a/src/video_pipeline.c +++ b/src/video_pipeline.c @@ -28,6 +28,11 @@ #define VIDEO_DEFAULT_CAMERA_DEVICE "/dev/video0" #define VIDEO_DEFAULT_PEER_ID "peer-b-video" #define VIDEO_DEFAULT_TARGET_PEER "peer-a-video" +#define VIDEO_SOFT_BACKPRESSURE_SEGMENTS_DEFAULT 64 +#define VIDEO_HARD_BACKPRESSURE_SEGMENTS_DEFAULT 192 +#define VIDEO_HARD_BACKPRESSURE_HOLD_MS_DEFAULT 1000 +#define VIDEO_SOFT_BACKPRESSURE_WINDOW_PRESSURE_PCT 90.0 +#define VIDEO_HARD_BACKPRESSURE_WINDOW_PRESSURE_PCT 98.0 typedef struct video_buffer { void *start; @@ -137,6 +142,20 @@ static const char *env_first_nonempty(const char *first, const char *second, con return fallback; } +static int env_int_or_default(const char *name, int fallback) { + const char *value = getenv(name); + int parsed; + + if (value == NULL || value[0] == '\0') { + return fallback; + } + parsed = atoi(value); + if (parsed <= 0) { + return fallback; + } + return parsed; +} + static void video_pipeline_set_error(video_pipeline_stats_t *stats, const char *message) { if (stats == NULL) { return; @@ -179,6 +198,9 @@ void video_pipeline_config_init(video_pipeline_config_t *config) { config->output_height = VIDEO_OUTPUT_HEIGHT_DEFAULT; config->max_frames = 0; config->enable_timing_logs = 0; + config->soft_backpressure_segments = VIDEO_SOFT_BACKPRESSURE_SEGMENTS_DEFAULT; + config->hard_backpressure_segments = VIDEO_HARD_BACKPRESSURE_SEGMENTS_DEFAULT; + config->hard_backpressure_hold_ms = VIDEO_HARD_BACKPRESSURE_HOLD_MS_DEFAULT; } void video_pipeline_config_load_env(video_pipeline_config_t *config) { @@ -196,6 +218,9 @@ void video_pipeline_config_load_env(video_pipeline_config_t *config) { config->max_frames = atoi(getenv("OMNI_VIDEO_MAX_FRAMES")); } config->enable_timing_logs = env_flag_or_default("OMNI_VIDEO_DEBUG_TIMING", config->enable_timing_logs); + config->soft_backpressure_segments = env_int_or_default("OMNI_VIDEO_SOFT_BACKPRESSURE_SEGMENTS", config->soft_backpressure_segments); + config->hard_backpressure_segments = env_int_or_default("OMNI_VIDEO_HARD_BACKPRESSURE_SEGMENTS", config->hard_backpressure_segments); + config->hard_backpressure_hold_ms = env_int_or_default("OMNI_VIDEO_HARD_BACKPRESSURE_HOLD_MS", config->hard_backpressure_hold_ms); } int video_pipeline_stats_init(video_pipeline_stats_t *stats) { @@ -229,9 +254,13 @@ void video_pipeline_stats_snapshot(video_pipeline_stats_t *stats, video_pipeline out_stats->frames_sent = stats->frames_sent; out_stats->bytes_sent = stats->bytes_sent; out_stats->send_errors = stats->send_errors; + out_stats->backpressure_drops = stats->backpressure_drops; + out_stats->backlog_resets = stats->backlog_resets; out_stats->last_frame_bytes = stats->last_frame_bytes; + out_stats->last_backlog_segments = stats->last_backlog_segments; out_stats->connected = stats->connected; snprintf(out_stats->last_error, sizeof(out_stats->last_error), "%s", stats->last_error); + snprintf(out_stats->last_backlog_reason, sizeof(out_stats->last_backlog_reason), "%s", stats->last_backlog_reason); out_stats->transport = stats->transport; pthread_mutex_unlock(&stats->mutex); } @@ -632,6 +661,56 @@ static void video_sender_close(video_sender_t *sender) { sender->send_buffer_cap = 0; } +static uint32_t video_sender_backlog_segments(const kcp_runtime_stats_t *stats) { + if (stats == NULL) { + return 0; + } + return stats->snd_queue + stats->snd_buffer; +} + +static int video_sender_soft_backpressure_active(const video_pipeline_config_t *config, const kcp_runtime_stats_t *transport) { + if (config == NULL || transport == NULL) { + return 0; + } + return video_sender_backlog_segments(transport) >= (uint32_t) config->soft_backpressure_segments + || transport->window_pressure_pct >= VIDEO_SOFT_BACKPRESSURE_WINDOW_PRESSURE_PCT; +} + +static int video_sender_hard_backpressure_active(const video_pipeline_config_t *config, const kcp_runtime_stats_t *transport) { + if (config == NULL || transport == NULL) { + return 0; + } + return video_sender_backlog_segments(transport) >= (uint32_t) config->hard_backpressure_segments + || transport->window_pressure_pct >= VIDEO_HARD_BACKPRESSURE_WINDOW_PRESSURE_PCT; +} + +static void video_pipeline_note_backpressure( + video_pipeline_stats_t *stats, + const char *reason, + const kcp_runtime_stats_t *transport, + int increment_drop, + int increment_reset +) { + if (stats == NULL) { + return; + } + pthread_mutex_lock(&stats->mutex); + if (increment_drop) { + stats->backpressure_drops += 1; + } + if (increment_reset) { + stats->backlog_resets += 1; + } + if (transport != NULL) { + stats->last_backlog_segments = video_sender_backlog_segments(transport); + stats->transport = *transport; + } else { + stats->last_backlog_segments = 0; + } + snprintf(stats->last_backlog_reason, sizeof(stats->last_backlog_reason), "%s", reason == NULL ? "" : reason); + pthread_mutex_unlock(&stats->mutex); +} + static void video_pipeline_cleanup_buffers(video_buffer_t *buffers, int num_buffers) { int i; if (buffers == NULL) { @@ -660,6 +739,8 @@ int video_pipeline_run(const video_pipeline_config_t *config, video_pipeline_sta int sws_src_width = 0; int sws_src_height = 0; int sws_src_format = -1; + uint32_t hard_backpressure_since_ms = 0; + uint32_t last_soft_drop_log_ms = 0; memset(&sender, 0, sizeof(sender)); if (stats == NULL) { @@ -743,6 +824,7 @@ int video_pipeline_run(const video_pipeline_config_t *config, video_pipeline_sta AVFrame *decoded_frame = NULL; AVFrame *scaled_frame = NULL; AVPacket *encoded_pkt = NULL; + kcp_runtime_stats_t transport_stats; int select_rc; double total_start_ms = 0.0; double capture_start_ms = 0.0; @@ -757,6 +839,8 @@ int video_pipeline_run(const video_pipeline_config_t *config, video_pipeline_sta double send_end_ms = 0.0; int frame_number = frame_index + 1; + memset(&transport_stats, 0, sizeof(transport_stats)); + if (config->max_frames > 0 && frame_index >= config->max_frames) { break; } @@ -840,6 +924,84 @@ int video_pipeline_run(const video_pipeline_config_t *config, video_pipeline_sta send_start_ms = encode_end_ms; } + kcp_client_runtime_stats_snapshot(sender.client, &transport_stats); + if (video_sender_hard_backpressure_active(config, &transport_stats)) { + uint32_t now_ms = omni_now_millis32(); + + if (hard_backpressure_since_ms == 0) { + hard_backpressure_since_ms = now_ms; + } + if (now_ms - hard_backpressure_since_ms >= (uint32_t) config->hard_backpressure_hold_ms) { + char reason[128]; + uint32_t backlog_segments = video_sender_backlog_segments(&transport_stats); + + snprintf( + reason, + sizeof(reason), + "hard_reset backlog=%u snd_queue=%u snd_buffer=%u window_pressure=%.1f%% hold_ms=%d", + backlog_segments, + transport_stats.snd_queue, + transport_stats.snd_buffer, + transport_stats.window_pressure_pct, + config->hard_backpressure_hold_ms + ); + video_pipeline_note_backpressure(stats, reason, &transport_stats, 0, 1); + video_pipeline_set_error(stats, reason); + fprintf( + stderr, + "[video_pipeline] backlog hard reset: backlog=%u snd_queue=%u snd_buffer=%u window_pressure=%.1f%% hold_ms=%d\n", + backlog_segments, + transport_stats.snd_queue, + transport_stats.snd_buffer, + transport_stats.window_pressure_pct, + config->hard_backpressure_hold_ms + ); + av_frame_free(&decoded_frame); + av_frame_free(&scaled_frame); + av_packet_free(&encoded_pkt); + (void) ioctl(fd, VIDIOC_QBUF, &buf); + rc = VIDEO_PIPELINE_RUN_RETRY_IMMEDIATE; + goto cleanup; + } + } else { + hard_backpressure_since_ms = 0; + } + + if (video_sender_soft_backpressure_active(config, &transport_stats)) { + uint32_t now_ms = omni_now_millis32(); + uint32_t backlog_segments = video_sender_backlog_segments(&transport_stats); + char reason[128]; + + snprintf( + reason, + sizeof(reason), + "soft_drop backlog=%u snd_queue=%u snd_buffer=%u window_pressure=%.1f%% threshold=%d", + backlog_segments, + transport_stats.snd_queue, + transport_stats.snd_buffer, + transport_stats.window_pressure_pct, + config->soft_backpressure_segments + ); + video_pipeline_note_backpressure(stats, reason, &transport_stats, 1, 0); + if (now_ms - last_soft_drop_log_ms >= 1000U) { + fprintf( + stderr, + "[video_pipeline] soft drop: backlog=%u snd_queue=%u snd_buffer=%u window_pressure=%.1f%% threshold=%d\n", + backlog_segments, + transport_stats.snd_queue, + transport_stats.snd_buffer, + transport_stats.window_pressure_pct, + config->soft_backpressure_segments + ); + last_soft_drop_log_ms = now_ms; + } + av_frame_free(&decoded_frame); + av_frame_free(&scaled_frame); + av_packet_free(&encoded_pkt); + (void) ioctl(fd, VIDIOC_QBUF, &buf); + continue; + } + if (video_sender_send_packet(&sender, encoded_pkt, (uint64_t) get_realtime_ms()) != 0) { pthread_mutex_lock(&stats->mutex); stats->send_errors += 1;