fix: 视频程序也需要有 stale-session 检测

This commit is contained in:
2026-04-14 13:22:19 +08:00
parent 6ccd9e9fa1
commit bb3e7b2989
3 changed files with 176 additions and 1 deletions

View File

@@ -41,6 +41,8 @@ typedef struct video_pipeline_config {
int soft_backpressure_segments;
int hard_backpressure_segments;
int hard_backpressure_hold_ms;
int server_idle_reconnect_ms;
int frame_stall_reconnect_ms;
video_pipeline_progress_fn progress_callback;
void *progress_context;
} video_pipeline_config_t;

View File

@@ -55,6 +55,8 @@ OMNI_VIDEO_RELAY_VIA="${ROBOT_SIDE_OMNISOCKET_RELAY_VIA}"
OMNI_VIDEO_SOFT_BACKPRESSURE_SEGMENTS="64"
OMNI_VIDEO_HARD_BACKPRESSURE_SEGMENTS="192"
OMNI_VIDEO_HARD_BACKPRESSURE_HOLD_MS="1000"
OMNI_VIDEO_SERVER_IDLE_RECONNECT_MS="3000"
OMNI_VIDEO_FRAME_STALL_RECONNECT_MS="3000"
OMNI_CONTROL_PEER_ID="peer-b-ctrl"
OMNI_CONTROL_EXPECTED_SENDER="peer-a-ctrl"
OMNI_CONTROL_SERVER_ADDR="${ROBOT_SIDE_OMNISOCKET_SERVER_ADDR}"

View File

@@ -31,8 +31,11 @@
#define VIDEO_SOFT_BACKPRESSURE_SEGMENTS_DEFAULT 64
#define VIDEO_HARD_BACKPRESSURE_SEGMENTS_DEFAULT 192
#define VIDEO_HARD_BACKPRESSURE_HOLD_MS_DEFAULT 1000
#define VIDEO_DEFAULT_SERVER_IDLE_RECONNECT_MS 3000
#define VIDEO_DEFAULT_FRAME_STALL_RECONNECT_MS 3000
#define VIDEO_SOFT_BACKPRESSURE_WINDOW_PRESSURE_PCT 90.0
#define VIDEO_HARD_BACKPRESSURE_WINDOW_PRESSURE_PCT 98.0
#define VIDEO_SESSION_POLL_INTERVAL_MS 250
typedef struct video_buffer {
void *start;
@@ -208,6 +211,8 @@ void video_pipeline_config_init(video_pipeline_config_t *config) {
config->soft_backpressure_segments = VIDEO_SOFT_BACKPRESSURE_SEGMENTS_DEFAULT;
config->hard_backpressure_segments = VIDEO_HARD_BACKPRESSURE_SEGMENTS_DEFAULT;
config->hard_backpressure_hold_ms = VIDEO_HARD_BACKPRESSURE_HOLD_MS_DEFAULT;
config->server_idle_reconnect_ms = VIDEO_DEFAULT_SERVER_IDLE_RECONNECT_MS;
config->frame_stall_reconnect_ms = VIDEO_DEFAULT_FRAME_STALL_RECONNECT_MS;
}
void video_pipeline_config_load_env(video_pipeline_config_t *config) {
@@ -228,6 +233,8 @@ void video_pipeline_config_load_env(video_pipeline_config_t *config) {
config->soft_backpressure_segments = env_int_or_default("OMNI_VIDEO_SOFT_BACKPRESSURE_SEGMENTS", config->soft_backpressure_segments);
config->hard_backpressure_segments = env_int_or_default("OMNI_VIDEO_HARD_BACKPRESSURE_SEGMENTS", config->hard_backpressure_segments);
config->hard_backpressure_hold_ms = env_int_or_default("OMNI_VIDEO_HARD_BACKPRESSURE_HOLD_MS", config->hard_backpressure_hold_ms);
config->server_idle_reconnect_ms = env_int_or_default("OMNI_VIDEO_SERVER_IDLE_RECONNECT_MS", config->server_idle_reconnect_ms);
config->frame_stall_reconnect_ms = env_int_or_default("OMNI_VIDEO_FRAME_STALL_RECONNECT_MS", config->frame_stall_reconnect_ms);
}
int video_pipeline_stats_init(video_pipeline_stats_t *stats) {
@@ -603,6 +610,8 @@ static int video_sender_init(video_sender_t *sender, const video_pipeline_config
}
static int video_sender_drain_pending_messages(video_sender_t *sender) {
int drained = 0;
if (sender == NULL || sender->client == NULL) {
errno = EINVAL;
return -1;
@@ -625,6 +634,10 @@ static int video_sender_drain_pending_messages(video_sender_t *sender) {
// Drain unread server errors so an offline receiver cannot back up the reverse KCP stream.
protocol_message_clear(&msg);
drained += 1;
if (drained >= 8) {
return 0;
}
}
}
@@ -722,6 +735,91 @@ static void video_pipeline_note_backpressure(
pthread_mutex_unlock(&stats->mutex);
}
static int video_server_error_requires_reconnect(const char *message) {
if (message == NULL || message[0] == '\0') {
return 0;
}
return strstr(message, "not registered") != NULL
|| strstr(message, "first message must be register") != NULL
|| strstr(message, "peer replaced") != NULL
|| strstr(message, "timed out waiting for server_register_ok") != NULL
|| strstr(message, "failed to acknowledge server heartbeat") != NULL;
}
static void video_pipeline_update_connection_state(
video_pipeline_stats_t *stats,
const kcp_client_state_t *client_state,
const kcp_runtime_stats_t *transport
) {
if (stats == NULL) {
return;
}
pthread_mutex_lock(&stats->mutex);
if (transport != NULL) {
stats->transport = *transport;
}
if (client_state != NULL) {
stats->connected = client_state->connected != 0 && client_state->registered != 0;
if (client_state->last_server_error[0] != '\0') {
snprintf(stats->last_error, sizeof(stats->last_error), "%s", client_state->last_server_error);
}
}
pthread_mutex_unlock(&stats->mutex);
}
static int video_sender_check_session_stale(
video_sender_t *sender,
const video_pipeline_config_t *config,
video_pipeline_stats_t *stats,
kcp_runtime_stats_t *transport_stats,
char *reason,
size_t reason_len
) {
kcp_client_state_t client_state;
if (
sender == NULL || sender->client == NULL || config == NULL || stats == NULL || transport_stats == NULL
|| reason == NULL || reason_len == 0
) {
errno = EINVAL;
return -1;
}
reason[0] = '\0';
memset(&client_state, 0, sizeof(client_state));
kcp_client_runtime_stats_snapshot(sender->client, transport_stats);
kcp_client_state_snapshot(sender->client, &client_state);
video_pipeline_update_connection_state(stats, &client_state, transport_stats);
if (!transport_stats->connected || !client_state.connected) {
snprintf(reason, reason_len, "video session stale: transport disconnected");
return 1;
}
if (!client_state.registered) {
snprintf(reason, reason_len, "video session stale: server reported unregistered");
return 1;
}
if (
config->server_idle_reconnect_ms > 0
&& client_state.server_idle_ms >= (uint32_t) config->server_idle_reconnect_ms
) {
snprintf(
reason,
reason_len,
"video session stale: server idle timeout (%u ms >= %d ms)",
client_state.server_idle_ms,
config->server_idle_reconnect_ms
);
return 1;
}
if (video_server_error_requires_reconnect(client_state.last_server_error)) {
snprintf(reason, reason_len, "video session stale: server error %.180s", client_state.last_server_error);
return 1;
}
return 0;
}
static void video_pipeline_cleanup_buffers(video_buffer_t *buffers, int num_buffers) {
int i;
if (buffers == NULL) {
@@ -752,6 +850,10 @@ int video_pipeline_run(const video_pipeline_config_t *config, video_pipeline_sta
int sws_src_format = -1;
uint32_t hard_backpressure_since_ms = 0;
uint32_t last_soft_drop_log_ms = 0;
uint32_t last_session_poll_ms = 0;
uint32_t last_successful_send_ms = 0;
uint64_t soft_drops_since_last_send = 0;
int have_sent_frame = 0;
const char *gpsd_host = env_or_default("OMNI_GPSD_HOST", "127.0.0.1");
int gps_buffer_started = 0;
@@ -856,10 +958,12 @@ int video_pipeline_run(const video_pipeline_config_t *config, video_pipeline_sta
double send_start_ms = 0.0;
double send_end_ms = 0.0;
video_pipeline_packet_metadata_t packet_metadata;
char reconnect_reason[256];
int frame_number = frame_index + 1;
memset(&transport_stats, 0, sizeof(transport_stats));
memset(&packet_metadata, 0, sizeof(packet_metadata));
reconnect_reason[0] = '\0';
video_pipeline_report_progress(config);
if (config->max_frames > 0 && frame_index >= config->max_frames) {
@@ -953,7 +1057,45 @@ int video_pipeline_run(const video_pipeline_config_t *config, video_pipeline_sta
packet_metadata.longitude = gps_sample.longitude;
}
if (
last_session_poll_ms == 0
|| omni_now_millis32() - last_session_poll_ms >= VIDEO_SESSION_POLL_INTERVAL_MS
) {
if (video_sender_drain_pending_messages(&sender) != 0) {
video_pipeline_set_errno_error(stats, "failed to poll video session");
av_frame_free(&decoded_frame);
av_frame_free(&scaled_frame);
av_packet_free(&encoded_pkt);
(void) ioctl(fd, VIDIOC_QBUF, &buf);
rc = VIDEO_PIPELINE_RUN_RETRY_IMMEDIATE;
goto cleanup;
}
if (
video_sender_check_session_stale(
&sender,
config,
stats,
&transport_stats,
reconnect_reason,
sizeof(reconnect_reason)
) != 0
) {
if (reconnect_reason[0] == '\0') {
snprintf(reconnect_reason, sizeof(reconnect_reason), "video session stale: poll failed");
}
video_pipeline_set_error(stats, reconnect_reason);
fprintf(stderr, "[video_pipeline] %s\n", reconnect_reason);
av_frame_free(&decoded_frame);
av_frame_free(&scaled_frame);
av_packet_free(&encoded_pkt);
(void) ioctl(fd, VIDIOC_QBUF, &buf);
rc = VIDEO_PIPELINE_RUN_RETRY_IMMEDIATE;
goto cleanup;
}
last_session_poll_ms = omni_now_millis32();
} else {
kcp_client_runtime_stats_snapshot(sender.client, &transport_stats);
}
if (video_sender_hard_backpressure_active(config, &transport_stats)) {
uint32_t now_ms = omni_now_millis32();
@@ -1012,6 +1154,7 @@ int video_pipeline_run(const video_pipeline_config_t *config, video_pipeline_sta
config->soft_backpressure_segments
);
video_pipeline_note_backpressure(stats, reason, &transport_stats, 1, 0);
soft_drops_since_last_send += 1;
if (now_ms - last_soft_drop_log_ms >= 1000U) {
fprintf(
stderr,
@@ -1024,6 +1167,31 @@ int video_pipeline_run(const video_pipeline_config_t *config, video_pipeline_sta
);
last_soft_drop_log_ms = now_ms;
}
if (
have_sent_frame
&& config->frame_stall_reconnect_ms > 0
&& now_ms - last_successful_send_ms >= (uint32_t) config->frame_stall_reconnect_ms
) {
char stall_reason[192];
snprintf(
stall_reason,
sizeof(stall_reason),
"video pipeline stalled: no frames sent for %u ms while soft dropping (%llu drops, backlog=%u, srtt=%d ms)",
now_ms - last_successful_send_ms,
(unsigned long long) soft_drops_since_last_send,
backlog_segments,
transport_stats.srtt_ms
);
video_pipeline_set_error(stats, stall_reason);
fprintf(stderr, "[video_pipeline] %s\n", stall_reason);
av_frame_free(&decoded_frame);
av_frame_free(&scaled_frame);
av_packet_free(&encoded_pkt);
(void) ioctl(fd, VIDIOC_QBUF, &buf);
rc = VIDEO_PIPELINE_RUN_RETRY_IMMEDIATE;
goto cleanup;
}
av_frame_free(&decoded_frame);
av_frame_free(&scaled_frame);
av_packet_free(&encoded_pkt);
@@ -1055,6 +1223,9 @@ int video_pipeline_run(const video_pipeline_config_t *config, video_pipeline_sta
stats->last_frame_bytes = (uint64_t) encoded_pkt->size;
kcp_client_runtime_stats_snapshot(sender.client, &stats->transport);
pthread_mutex_unlock(&stats->mutex);
have_sent_frame = 1;
last_successful_send_ms = omni_now_millis32();
soft_drops_since_last_send = 0;
if (config->enable_timing_logs) {
video_pipeline_print_timing_row(
frame_number,