diff --git a/include/video_pipeline.h b/include/video_pipeline.h index ed54dbe..dcb1afa 100644 --- a/include/video_pipeline.h +++ b/include/video_pipeline.h @@ -41,6 +41,8 @@ typedef struct video_pipeline_config { int soft_backpressure_segments; int hard_backpressure_segments; int hard_backpressure_hold_ms; + int server_idle_reconnect_ms; + int frame_stall_reconnect_ms; video_pipeline_progress_fn progress_callback; void *progress_context; } video_pipeline_config_t; diff --git a/scripts/dev/robot-remote.env b/scripts/dev/robot-remote.env index 8c40ebb..1e5043b 100644 --- a/scripts/dev/robot-remote.env +++ b/scripts/dev/robot-remote.env @@ -55,6 +55,8 @@ OMNI_VIDEO_RELAY_VIA="${ROBOT_SIDE_OMNISOCKET_RELAY_VIA}" OMNI_VIDEO_SOFT_BACKPRESSURE_SEGMENTS="64" OMNI_VIDEO_HARD_BACKPRESSURE_SEGMENTS="192" OMNI_VIDEO_HARD_BACKPRESSURE_HOLD_MS="1000" +OMNI_VIDEO_SERVER_IDLE_RECONNECT_MS="3000" +OMNI_VIDEO_FRAME_STALL_RECONNECT_MS="3000" OMNI_CONTROL_PEER_ID="peer-b-ctrl" OMNI_CONTROL_EXPECTED_SENDER="peer-a-ctrl" OMNI_CONTROL_SERVER_ADDR="${ROBOT_SIDE_OMNISOCKET_SERVER_ADDR}" diff --git a/src/video_pipeline.c b/src/video_pipeline.c index a35c062..dffcf52 100644 --- a/src/video_pipeline.c +++ b/src/video_pipeline.c @@ -31,8 +31,11 @@ #define VIDEO_SOFT_BACKPRESSURE_SEGMENTS_DEFAULT 64 #define VIDEO_HARD_BACKPRESSURE_SEGMENTS_DEFAULT 192 #define VIDEO_HARD_BACKPRESSURE_HOLD_MS_DEFAULT 1000 +#define VIDEO_DEFAULT_SERVER_IDLE_RECONNECT_MS 3000 +#define VIDEO_DEFAULT_FRAME_STALL_RECONNECT_MS 3000 #define VIDEO_SOFT_BACKPRESSURE_WINDOW_PRESSURE_PCT 90.0 #define VIDEO_HARD_BACKPRESSURE_WINDOW_PRESSURE_PCT 98.0 +#define VIDEO_SESSION_POLL_INTERVAL_MS 250 typedef struct video_buffer { void *start; @@ -208,6 +211,8 @@ void video_pipeline_config_init(video_pipeline_config_t *config) { config->soft_backpressure_segments = VIDEO_SOFT_BACKPRESSURE_SEGMENTS_DEFAULT; config->hard_backpressure_segments = VIDEO_HARD_BACKPRESSURE_SEGMENTS_DEFAULT; config->hard_backpressure_hold_ms = VIDEO_HARD_BACKPRESSURE_HOLD_MS_DEFAULT; + config->server_idle_reconnect_ms = VIDEO_DEFAULT_SERVER_IDLE_RECONNECT_MS; + config->frame_stall_reconnect_ms = VIDEO_DEFAULT_FRAME_STALL_RECONNECT_MS; } void video_pipeline_config_load_env(video_pipeline_config_t *config) { @@ -228,6 +233,8 @@ void video_pipeline_config_load_env(video_pipeline_config_t *config) { config->soft_backpressure_segments = env_int_or_default("OMNI_VIDEO_SOFT_BACKPRESSURE_SEGMENTS", config->soft_backpressure_segments); config->hard_backpressure_segments = env_int_or_default("OMNI_VIDEO_HARD_BACKPRESSURE_SEGMENTS", config->hard_backpressure_segments); config->hard_backpressure_hold_ms = env_int_or_default("OMNI_VIDEO_HARD_BACKPRESSURE_HOLD_MS", config->hard_backpressure_hold_ms); + config->server_idle_reconnect_ms = env_int_or_default("OMNI_VIDEO_SERVER_IDLE_RECONNECT_MS", config->server_idle_reconnect_ms); + config->frame_stall_reconnect_ms = env_int_or_default("OMNI_VIDEO_FRAME_STALL_RECONNECT_MS", config->frame_stall_reconnect_ms); } int video_pipeline_stats_init(video_pipeline_stats_t *stats) { @@ -603,6 +610,8 @@ static int video_sender_init(video_sender_t *sender, const video_pipeline_config } static int video_sender_drain_pending_messages(video_sender_t *sender) { + int drained = 0; + if (sender == NULL || sender->client == NULL) { errno = EINVAL; return -1; @@ -625,6 +634,10 @@ static int video_sender_drain_pending_messages(video_sender_t *sender) { // Drain unread server errors so an offline receiver cannot back up the reverse KCP stream. protocol_message_clear(&msg); + drained += 1; + if (drained >= 8) { + return 0; + } } } @@ -722,6 +735,91 @@ static void video_pipeline_note_backpressure( pthread_mutex_unlock(&stats->mutex); } +static int video_server_error_requires_reconnect(const char *message) { + if (message == NULL || message[0] == '\0') { + return 0; + } + return strstr(message, "not registered") != NULL + || strstr(message, "first message must be register") != NULL + || strstr(message, "peer replaced") != NULL + || strstr(message, "timed out waiting for server_register_ok") != NULL + || strstr(message, "failed to acknowledge server heartbeat") != NULL; +} + +static void video_pipeline_update_connection_state( + video_pipeline_stats_t *stats, + const kcp_client_state_t *client_state, + const kcp_runtime_stats_t *transport +) { + if (stats == NULL) { + return; + } + + pthread_mutex_lock(&stats->mutex); + if (transport != NULL) { + stats->transport = *transport; + } + if (client_state != NULL) { + stats->connected = client_state->connected != 0 && client_state->registered != 0; + if (client_state->last_server_error[0] != '\0') { + snprintf(stats->last_error, sizeof(stats->last_error), "%s", client_state->last_server_error); + } + } + pthread_mutex_unlock(&stats->mutex); +} + +static int video_sender_check_session_stale( + video_sender_t *sender, + const video_pipeline_config_t *config, + video_pipeline_stats_t *stats, + kcp_runtime_stats_t *transport_stats, + char *reason, + size_t reason_len +) { + kcp_client_state_t client_state; + + if ( + sender == NULL || sender->client == NULL || config == NULL || stats == NULL || transport_stats == NULL + || reason == NULL || reason_len == 0 + ) { + errno = EINVAL; + return -1; + } + + reason[0] = '\0'; + memset(&client_state, 0, sizeof(client_state)); + kcp_client_runtime_stats_snapshot(sender->client, transport_stats); + kcp_client_state_snapshot(sender->client, &client_state); + video_pipeline_update_connection_state(stats, &client_state, transport_stats); + + if (!transport_stats->connected || !client_state.connected) { + snprintf(reason, reason_len, "video session stale: transport disconnected"); + return 1; + } + if (!client_state.registered) { + snprintf(reason, reason_len, "video session stale: server reported unregistered"); + return 1; + } + if ( + config->server_idle_reconnect_ms > 0 + && client_state.server_idle_ms >= (uint32_t) config->server_idle_reconnect_ms + ) { + snprintf( + reason, + reason_len, + "video session stale: server idle timeout (%u ms >= %d ms)", + client_state.server_idle_ms, + config->server_idle_reconnect_ms + ); + return 1; + } + if (video_server_error_requires_reconnect(client_state.last_server_error)) { + snprintf(reason, reason_len, "video session stale: server error %.180s", client_state.last_server_error); + return 1; + } + return 0; +} + static void video_pipeline_cleanup_buffers(video_buffer_t *buffers, int num_buffers) { int i; if (buffers == NULL) { @@ -752,6 +850,10 @@ int video_pipeline_run(const video_pipeline_config_t *config, video_pipeline_sta int sws_src_format = -1; uint32_t hard_backpressure_since_ms = 0; uint32_t last_soft_drop_log_ms = 0; + uint32_t last_session_poll_ms = 0; + uint32_t last_successful_send_ms = 0; + uint64_t soft_drops_since_last_send = 0; + int have_sent_frame = 0; const char *gpsd_host = env_or_default("OMNI_GPSD_HOST", "127.0.0.1"); int gps_buffer_started = 0; @@ -856,10 +958,12 @@ int video_pipeline_run(const video_pipeline_config_t *config, video_pipeline_sta double send_start_ms = 0.0; double send_end_ms = 0.0; video_pipeline_packet_metadata_t packet_metadata; + char reconnect_reason[256]; int frame_number = frame_index + 1; memset(&transport_stats, 0, sizeof(transport_stats)); memset(&packet_metadata, 0, sizeof(packet_metadata)); + reconnect_reason[0] = '\0'; video_pipeline_report_progress(config); if (config->max_frames > 0 && frame_index >= config->max_frames) { @@ -953,7 +1057,45 @@ int video_pipeline_run(const video_pipeline_config_t *config, video_pipeline_sta packet_metadata.longitude = gps_sample.longitude; } - kcp_client_runtime_stats_snapshot(sender.client, &transport_stats); + if ( + last_session_poll_ms == 0 + || omni_now_millis32() - last_session_poll_ms >= VIDEO_SESSION_POLL_INTERVAL_MS + ) { + if (video_sender_drain_pending_messages(&sender) != 0) { + video_pipeline_set_errno_error(stats, "failed to poll video session"); + av_frame_free(&decoded_frame); + av_frame_free(&scaled_frame); + av_packet_free(&encoded_pkt); + (void) ioctl(fd, VIDIOC_QBUF, &buf); + rc = VIDEO_PIPELINE_RUN_RETRY_IMMEDIATE; + goto cleanup; + } + if ( + video_sender_check_session_stale( + &sender, + config, + stats, + &transport_stats, + reconnect_reason, + sizeof(reconnect_reason) + ) != 0 + ) { + if (reconnect_reason[0] == '\0') { + snprintf(reconnect_reason, sizeof(reconnect_reason), "video session stale: poll failed"); + } + video_pipeline_set_error(stats, reconnect_reason); + fprintf(stderr, "[video_pipeline] %s\n", reconnect_reason); + av_frame_free(&decoded_frame); + av_frame_free(&scaled_frame); + av_packet_free(&encoded_pkt); + (void) ioctl(fd, VIDIOC_QBUF, &buf); + rc = VIDEO_PIPELINE_RUN_RETRY_IMMEDIATE; + goto cleanup; + } + last_session_poll_ms = omni_now_millis32(); + } else { + kcp_client_runtime_stats_snapshot(sender.client, &transport_stats); + } if (video_sender_hard_backpressure_active(config, &transport_stats)) { uint32_t now_ms = omni_now_millis32(); @@ -1012,6 +1154,7 @@ int video_pipeline_run(const video_pipeline_config_t *config, video_pipeline_sta config->soft_backpressure_segments ); video_pipeline_note_backpressure(stats, reason, &transport_stats, 1, 0); + soft_drops_since_last_send += 1; if (now_ms - last_soft_drop_log_ms >= 1000U) { fprintf( stderr, @@ -1024,6 +1167,31 @@ int video_pipeline_run(const video_pipeline_config_t *config, video_pipeline_sta ); last_soft_drop_log_ms = now_ms; } + if ( + have_sent_frame + && config->frame_stall_reconnect_ms > 0 + && now_ms - last_successful_send_ms >= (uint32_t) config->frame_stall_reconnect_ms + ) { + char stall_reason[192]; + + snprintf( + stall_reason, + sizeof(stall_reason), + "video pipeline stalled: no frames sent for %u ms while soft dropping (%llu drops, backlog=%u, srtt=%d ms)", + now_ms - last_successful_send_ms, + (unsigned long long) soft_drops_since_last_send, + backlog_segments, + transport_stats.srtt_ms + ); + video_pipeline_set_error(stats, stall_reason); + fprintf(stderr, "[video_pipeline] %s\n", stall_reason); + av_frame_free(&decoded_frame); + av_frame_free(&scaled_frame); + av_packet_free(&encoded_pkt); + (void) ioctl(fd, VIDIOC_QBUF, &buf); + rc = VIDEO_PIPELINE_RUN_RETRY_IMMEDIATE; + goto cleanup; + } av_frame_free(&decoded_frame); av_frame_free(&scaled_frame); av_packet_free(&encoded_pkt); @@ -1055,6 +1223,9 @@ int video_pipeline_run(const video_pipeline_config_t *config, video_pipeline_sta stats->last_frame_bytes = (uint64_t) encoded_pkt->size; kcp_client_runtime_stats_snapshot(sender.client, &stats->transport); pthread_mutex_unlock(&stats->mutex); + have_sent_frame = 1; + last_successful_send_ms = omni_now_millis32(); + soft_drops_since_last_send = 0; if (config->enable_timing_logs) { video_pipeline_print_timing_row( frame_number,