fix: 断联视频堆积问题与控制命令失效问题

This commit is contained in:
2026-04-11 03:55:19 +08:00
parent 6f727dbe57
commit 84e0cc54d2
8 changed files with 381 additions and 9 deletions

View File

@@ -16,6 +16,7 @@
#define CONTROL_DEFAULT_PEER_ID "peer-b-ctrl"
#define CONTROL_DEFAULT_EXPECTED_SENDER "peer-a-ctrl"
#define CONTROL_DEFAULT_UNIX_SOCKET "/tmp/omnisocket-b-side-cmd.sock"
#define CONTROL_DEFAULT_SERVER_IDLE_RECONNECT_MS 3000
typedef struct unix_dgram_client {
int fd;
@@ -31,9 +32,11 @@ typedef struct control_bridge_stats {
uint64_t invalid_packets;
uint64_t unix_send_errors;
uint64_t reconnect_count;
uint32_t server_idle_ms;
int ever_connected;
int registered;
char last_error[256];
char last_reconnect_reason[256];
kcp_runtime_stats_t transport;
} control_bridge_stats_t;
@@ -48,6 +51,7 @@ typedef struct daemon_state {
const char *control_peer_id;
const char *control_expected_sender;
const char *control_unix_socket;
int control_server_idle_reconnect_ms;
unix_dgram_client_t unix_client;
control_bridge_stats_t control_stats;
} daemon_state_t;
@@ -91,6 +95,20 @@ static const char *env_first_nonempty(const char *first, const char *second, con
return fallback;
}
static int env_int_or_default(const char *name, int fallback) {
const char *value = getenv(name);
int parsed;
if (value == NULL || value[0] == '\0') {
return fallback;
}
parsed = atoi(value);
if (parsed <= 0) {
return fallback;
}
return parsed;
}
static int control_bridge_stats_init(control_bridge_stats_t *stats) {
int rc;
if (stats == NULL) {
@@ -124,6 +142,15 @@ static void control_bridge_set_error(control_bridge_stats_t *stats, const char *
pthread_mutex_unlock(&stats->mutex);
}
static void control_bridge_set_reconnect_reason(control_bridge_stats_t *stats, const char *message) {
if (stats == NULL) {
return;
}
pthread_mutex_lock(&stats->mutex);
snprintf(stats->last_reconnect_reason, sizeof(stats->last_reconnect_reason), "%s", message == NULL ? "" : message);
pthread_mutex_unlock(&stats->mutex);
}
static void control_bridge_set_errno_error(control_bridge_stats_t *stats, const char *prefix) {
char buffer[256];
int saved_errno = errno;
@@ -149,12 +176,39 @@ static void control_bridge_stats_snapshot(control_bridge_stats_t *stats, control
out_stats->invalid_packets = stats->invalid_packets;
out_stats->unix_send_errors = stats->unix_send_errors;
out_stats->reconnect_count = stats->reconnect_count;
out_stats->server_idle_ms = stats->server_idle_ms;
out_stats->registered = stats->registered;
snprintf(out_stats->last_error, sizeof(out_stats->last_error), "%s", stats->last_error);
snprintf(out_stats->last_reconnect_reason, sizeof(out_stats->last_reconnect_reason), "%s", stats->last_reconnect_reason);
out_stats->transport = stats->transport;
pthread_mutex_unlock(&stats->mutex);
}
static int control_server_error_requires_reconnect(const char *message) {
if (message == NULL || message[0] == '\0') {
return 0;
}
return strstr(message, "not registered") != NULL
|| strstr(message, "first message must be register") != NULL
|| strstr(message, "peer replaced") != NULL
|| strstr(message, "timed out waiting for server_register_ok") != NULL;
}
static void control_message_body_to_cstr(const message_t *msg, char *buffer, size_t buffer_len) {
size_t copy_len;
if (buffer == NULL || buffer_len == 0) {
return;
}
buffer[0] = '\0';
if (msg == NULL || msg->body == NULL || msg->body_len == 0) {
return;
}
copy_len = msg->body_len < (buffer_len - 1U) ? msg->body_len : (buffer_len - 1U);
memcpy(buffer, msg->body, copy_len);
buffer[copy_len] = '\0';
}
static int unix_dgram_client_init(unix_dgram_client_t *client, const char *dest_path) {
struct sockaddr_un bind_addr;
pid_t pid;
@@ -241,9 +295,14 @@ static void *video_thread_main(void *arg) {
daemon_state_t *state = (daemon_state_t *) arg;
while (!*state->stop_requested) {
if (video_pipeline_run(&state->video_config, &state->video_stats, state->stop_requested) == 0) {
int video_rc = video_pipeline_run(&state->video_config, &state->video_stats, state->stop_requested);
if (video_rc == 0) {
break;
}
if (video_rc == VIDEO_PIPELINE_RUN_RETRY_IMMEDIATE) {
continue;
}
if (!*state->stop_requested) {
sleep(1);
}
@@ -257,6 +316,7 @@ static void *control_thread_main(void *arg) {
while (!*state->stop_requested) {
kcp_conn_options_t options;
kcp_client_t *client = NULL;
int reconnect_immediately = 0;
kcp_conn_options_set_control_defaults(&options);
client = kcp_client_dial_with_options(
@@ -289,7 +349,10 @@ static void *control_thread_main(void *arg) {
state->control_stats.ever_connected = 1;
}
state->control_stats.registered = client_state.registered;
state->control_stats.server_idle_ms = client_state.server_idle_ms;
state->control_stats.last_reconnect_reason[0] = '\0';
snprintf(state->control_stats.last_error, sizeof(state->control_stats.last_error), "%s", client_state.last_server_error);
kcp_client_runtime_stats_snapshot(client, &state->control_stats.transport);
pthread_mutex_unlock(&state->control_stats.mutex);
}
@@ -301,20 +364,63 @@ static void *control_thread_main(void *arg) {
protocol_message_init(&msg);
rc = kcp_client_receive_timed(client, &msg, 100);
if (rc == 1) {
char reconnect_reason[256];
protocol_message_clear(&msg);
memset(&client_state, 0, sizeof(client_state));
kcp_client_state_snapshot(client, &client_state);
pthread_mutex_lock(&state->control_stats.mutex);
state->control_stats.registered = client_state.registered;
state->control_stats.server_idle_ms = client_state.server_idle_ms;
snprintf(state->control_stats.last_error, sizeof(state->control_stats.last_error), "%s", client_state.last_server_error);
kcp_client_runtime_stats_snapshot(client, &state->control_stats.transport);
pthread_mutex_unlock(&state->control_stats.mutex);
if (!client_state.registered) {
snprintf(reconnect_reason, sizeof(reconnect_reason), "control session stale: server reported unregistered");
} else if (
state->control_server_idle_reconnect_ms > 0
&& client_state.server_idle_ms >= (uint32_t) state->control_server_idle_reconnect_ms
) {
snprintf(
reconnect_reason,
sizeof(reconnect_reason),
"control session stale: server idle timeout (%u ms >= %d ms)",
client_state.server_idle_ms,
state->control_server_idle_reconnect_ms
);
} else if (control_server_error_requires_reconnect(client_state.last_server_error)) {
snprintf(
reconnect_reason,
sizeof(reconnect_reason),
"control session stale: server error %.180s",
client_state.last_server_error
);
} else {
reconnect_reason[0] = '\0';
}
if (reconnect_reason[0] != '\0') {
control_bridge_set_error(&state->control_stats, reconnect_reason);
control_bridge_set_reconnect_reason(&state->control_stats, reconnect_reason);
fprintf(stderr, "[b_side_omnid] %s\n", reconnect_reason);
reconnect_immediately = 1;
break;
}
continue;
}
if (rc != 0) {
memset(&client_state, 0, sizeof(client_state));
kcp_client_state_snapshot(client, &client_state);
pthread_mutex_lock(&state->control_stats.mutex);
state->control_stats.registered = client_state.registered;
state->control_stats.server_idle_ms = client_state.server_idle_ms;
kcp_client_runtime_stats_snapshot(client, &state->control_stats.transport);
pthread_mutex_unlock(&state->control_stats.mutex);
if (client_state.last_server_error[0] != '\0') {
control_bridge_set_error(&state->control_stats, client_state.last_server_error);
if (control_server_error_requires_reconnect(client_state.last_server_error)) {
control_bridge_set_reconnect_reason(&state->control_stats, client_state.last_server_error);
reconnect_immediately = 1;
}
} else {
control_bridge_set_errno_error(&state->control_stats, "control receive loop stopped");
}
@@ -323,7 +429,20 @@ static void *control_thread_main(void *arg) {
}
if (msg.type == MSG_TYPE_ERROR && strcmp(msg.from, SERVER_PEER_ID) == 0) {
control_bridge_set_error(&state->control_stats, (const char *) msg.body);
char server_error[256];
control_message_body_to_cstr(&msg, server_error, sizeof(server_error));
control_bridge_set_error(&state->control_stats, server_error);
if (control_server_error_requires_reconnect(server_error)) {
char reconnect_reason[256];
snprintf(reconnect_reason, sizeof(reconnect_reason), "control session stale: server error %.180s", server_error);
control_bridge_set_reconnect_reason(&state->control_stats, reconnect_reason);
fprintf(stderr, "[b_side_omnid] %s\n", reconnect_reason);
reconnect_immediately = 1;
protocol_message_clear(&msg);
break;
}
protocol_message_clear(&msg);
continue;
}
@@ -351,8 +470,12 @@ static void *control_thread_main(void *arg) {
recovered = unix_dgram_client_send(&state->unix_client, msg.body, msg.body_len) == 0;
}
if (recovered) {
memset(&client_state, 0, sizeof(client_state));
kcp_client_state_snapshot(client, &client_state);
pthread_mutex_lock(&state->control_stats.mutex);
state->control_stats.packets_forwarded += 1;
state->control_stats.registered = client_state.registered;
state->control_stats.server_idle_ms = client_state.server_idle_ms;
kcp_client_runtime_stats_snapshot(client, &state->control_stats.transport);
pthread_mutex_unlock(&state->control_stats.mutex);
protocol_message_clear(&msg);
@@ -367,8 +490,12 @@ static void *control_thread_main(void *arg) {
continue;
}
memset(&client_state, 0, sizeof(client_state));
kcp_client_state_snapshot(client, &client_state);
pthread_mutex_lock(&state->control_stats.mutex);
state->control_stats.packets_forwarded += 1;
state->control_stats.registered = client_state.registered;
state->control_stats.server_idle_ms = client_state.server_idle_ms;
kcp_client_runtime_stats_snapshot(client, &state->control_stats.transport);
pthread_mutex_unlock(&state->control_stats.mutex);
protocol_message_clear(&msg);
@@ -376,10 +503,11 @@ static void *control_thread_main(void *arg) {
pthread_mutex_lock(&state->control_stats.mutex);
state->control_stats.registered = 0;
state->control_stats.server_idle_ms = 0;
pthread_mutex_unlock(&state->control_stats.mutex);
kcp_client_close(client);
kcp_client_free(client);
if (!*state->stop_requested) {
if (!*state->stop_requested && !reconnect_immediately) {
sleep(1);
}
}
@@ -398,17 +526,23 @@ static void print_stats(daemon_state_t *state) {
fprintf(
stderr,
"[b_side_omnid] video registered=%d frames=%llu bytes=%llu srtt=%dms | control registered=%d reconnects=%llu forwarded=%llu invalid=%llu unix_err=%llu srtt=%dms\n",
"[b_side_omnid] video registered=%d frames=%llu bytes=%llu drops=%llu resets=%llu backlog=%u reason=%s srtt=%dms | control registered=%d idle=%ums reconnects=%llu forwarded=%llu invalid=%llu unix_err=%llu srtt=%dms last_reconnect=%s\n",
video_stats.connected,
(unsigned long long) video_stats.frames_sent,
(unsigned long long) video_stats.bytes_sent,
(unsigned long long) video_stats.backpressure_drops,
(unsigned long long) video_stats.backlog_resets,
video_stats.last_backlog_segments,
video_stats.last_backlog_reason[0] == '\0' ? "-" : video_stats.last_backlog_reason,
video_stats.transport.srtt_ms,
control_stats.registered,
control_stats.server_idle_ms,
(unsigned long long) control_stats.reconnect_count,
(unsigned long long) control_stats.packets_forwarded,
(unsigned long long) control_stats.invalid_packets,
(unsigned long long) control_stats.unix_send_errors,
control_stats.transport.srtt_ms
control_stats.transport.srtt_ms,
control_stats.last_reconnect_reason[0] == '\0' ? "-" : control_stats.last_reconnect_reason
);
}
@@ -429,6 +563,10 @@ int main(void) {
state.control_peer_id = env_or_default("OMNI_CONTROL_PEER_ID", CONTROL_DEFAULT_PEER_ID);
state.control_expected_sender = env_or_default("OMNI_CONTROL_EXPECTED_SENDER", CONTROL_DEFAULT_EXPECTED_SENDER);
state.control_unix_socket = env_or_default("OMNI_CONTROL_UNIX_SOCKET_PATH", CONTROL_DEFAULT_UNIX_SOCKET);
state.control_server_idle_reconnect_ms = env_int_or_default(
"OMNI_CONTROL_SERVER_IDLE_RECONNECT_MS",
CONTROL_DEFAULT_SERVER_IDLE_RECONNECT_MS
);
if (state.video_config.server_addr == NULL || state.video_config.server_addr[0] == '\0' ||
state.control_server_addr == NULL || state.control_server_addr[0] == '\0') {

View File

@@ -17,10 +17,17 @@ int main(void) {
return 1;
}
if (video_pipeline_run(&config, &stats, NULL) != 0) {
perror("video_pipeline_run");
video_pipeline_stats_destroy(&stats);
return 1;
for (;;) {
int rc = video_pipeline_run(&config, &stats, NULL);
if (rc == 0) {
break;
}
if (rc != VIDEO_PIPELINE_RUN_RETRY_IMMEDIATE) {
perror("video_pipeline_run");
video_pipeline_stats_destroy(&stats);
return 1;
}
}
video_pipeline_stats_destroy(&stats);

View File

@@ -19,6 +19,7 @@ typedef struct kcp_client_recv_meta {
typedef struct kcp_client_state {
int connected;
int registered;
uint32_t server_idle_ms;
char last_server_error[256];
} kcp_client_state_t;

View File

@@ -25,6 +25,9 @@ typedef struct video_pipeline_config {
int output_height;
int max_frames;
int enable_timing_logs;
int soft_backpressure_segments;
int hard_backpressure_segments;
int hard_backpressure_hold_ms;
} video_pipeline_config_t;
typedef struct video_pipeline_stats {
@@ -32,12 +35,18 @@ typedef struct video_pipeline_stats {
uint64_t frames_sent;
uint64_t bytes_sent;
uint64_t send_errors;
uint64_t backpressure_drops;
uint64_t backlog_resets;
uint64_t last_frame_bytes;
uint32_t last_backlog_segments;
int connected;
char last_error[256];
char last_backlog_reason[128];
kcp_runtime_stats_t transport;
} video_pipeline_stats_t;
#define VIDEO_PIPELINE_RUN_RETRY_IMMEDIATE 2
void video_pipeline_config_init(video_pipeline_config_t *config);
void video_pipeline_config_load_env(video_pipeline_config_t *config);
int video_pipeline_stats_init(video_pipeline_stats_t *stats);

View File

@@ -53,6 +53,11 @@ If you only want the shared environment for manual commands:
source scripts/dev/load-env.sh
```
When you launch via `start-*.sh`, you do not need to manually `export` the variables from
`robot-remote.env` or `robot-remote.env.local`. `load-env.sh` loads those files with `set -a`,
so the variables are exported automatically for the child process. Manual `export` is only needed
if you bypass these scripts and start binaries directly from a clean shell.
## Customizing
Edit `scripts/dev/robot-remote.env` for shared changes such as:
@@ -66,6 +71,11 @@ Edit `scripts/dev/robot-remote.env` for shared changes such as:
- `OMNI_CAMERA_DEVICE`
- `OMNI_VIDEO_PEER_ID`
- `OMNI_CONTROL_PEER_ID`
- `OMNI_VIDEO_SOFT_BACKPRESSURE_SEGMENTS`
- `OMNI_VIDEO_HARD_BACKPRESSURE_SEGMENTS`
- `OMNI_VIDEO_HARD_BACKPRESSURE_HOLD_MS`
- `OMNI_CONTROL_SERVER_IDLE_RECONNECT_MS`
- `OMNI_VIDEO_MAX_FRAME_AGE_MS`
Role mapping:
@@ -73,6 +83,12 @@ Role mapping:
- `start-b-side-omnid.sh` uses the `ROBOT_SIDE_*` address pair
- `start-ros-receiver.sh` defaults to the robot-side address pair, but with `transport=unix_dgram` it usually does not need the server address
New repair knobs:
- `OMNI_VIDEO_SOFT_BACKPRESSURE_SEGMENTS`, `OMNI_VIDEO_HARD_BACKPRESSURE_SEGMENTS`, and `OMNI_VIDEO_HARD_BACKPRESSURE_HOLD_MS` are used by `b_side_omnid`
- `OMNI_CONTROL_SERVER_IDLE_RECONNECT_MS` is used by `b_side_omnid`
- `OMNI_VIDEO_MAX_FRAME_AGE_MS` is used by `start-backend.sh` on the A-side backend, not by `b_side_omnid`
Put machine-specific overrides into `scripts/dev/robot-remote.env.local`. Example:
```bash

View File

@@ -40,10 +40,17 @@ OMNI_VIDEO_TARGET_PEER="peer-a-video"
OMNI_CAMERA_DEVICE="/dev/video26"
OMNI_VIDEO_SERVER_ADDR="${ROBOT_SIDE_OMNISOCKET_SERVER_ADDR}"
OMNI_VIDEO_RELAY_VIA="${ROBOT_SIDE_OMNISOCKET_RELAY_VIA}"
OMNI_VIDEO_SOFT_BACKPRESSURE_SEGMENTS="64"
OMNI_VIDEO_HARD_BACKPRESSURE_SEGMENTS="192"
OMNI_VIDEO_HARD_BACKPRESSURE_HOLD_MS="1000"
OMNI_CONTROL_PEER_ID="peer-b-ctrl"
OMNI_CONTROL_EXPECTED_SENDER="peer-a-ctrl"
OMNI_CONTROL_SERVER_ADDR="${ROBOT_SIDE_OMNISOCKET_SERVER_ADDR}"
OMNI_CONTROL_RELAY_VIA="${ROBOT_SIDE_OMNISOCKET_RELAY_VIA}"
OMNI_CONTROL_UNIX_SOCKET_PATH="${ROBOT_RECEIVER_LOCAL_SOCKET_PATH}"
OMNI_CONTROL_SERVER_IDLE_RECONNECT_MS="3000"
# A-side backend video freshness guard. Used by scripts/dev/start-backend.sh.
OMNI_VIDEO_MAX_FRAME_AGE_MS="1000"
B_SIDE_OMNID_USE_SUDO="1"

View File

@@ -19,6 +19,7 @@ struct kcp_client {
pthread_mutex_t state_mu;
uint64_t next_message_id;
int registered;
uint32_t last_server_activity_ms;
char last_server_error[256];
};
@@ -42,6 +43,15 @@ static void kcp_client_set_registered(kcp_client_t *client, int registered) {
pthread_mutex_unlock(&client->state_mu);
}
static void kcp_client_touch_server_activity(kcp_client_t *client) {
if (client == NULL) {
return;
}
pthread_mutex_lock(&client->state_mu);
client->last_server_activity_ms = omni_now_millis32();
pthread_mutex_unlock(&client->state_mu);
}
static void kcp_client_set_last_server_error(kcp_client_t *client, const char *message) {
if (client == NULL) {
return;
@@ -55,6 +65,16 @@ static void kcp_client_clear_last_server_error(kcp_client_t *client) {
kcp_client_set_last_server_error(client, "");
}
static int kcp_client_server_error_invalidates_registration(const char *message) {
if (message == NULL || message[0] == '\0') {
return 0;
}
return strstr(message, "not registered") != NULL
|| strstr(message, "first message must be register") != NULL
|| strstr(message, "peer replaced") != NULL
|| strstr(message, "timed out waiting for server_register_ok") != NULL;
}
static int kcp_client_is_registered(kcp_client_t *client) {
int registered;
@@ -158,6 +178,7 @@ static int kcp_client_handle_reserved_server_message(kcp_client_t *client, const
if (msg->type != MSG_TYPE_TEXT || strcmp(msg->from, SERVER_PEER_ID) != 0) {
return 0;
}
kcp_client_touch_server_activity(client);
if (kcp_client_text_body_equals(msg, KCP_CLIENT_CTRL_REGISTER_OK)) {
kcp_client_set_registered(client, 1);
kcp_client_clear_last_server_error(client);
@@ -239,6 +260,7 @@ static int kcp_client_wait_for_register_ok(kcp_client_t *client) {
char error_text[256];
kcp_client_copy_server_error_body(&msg, error_text, sizeof(error_text));
kcp_client_touch_server_activity(client);
kcp_client_set_registered(client, 0);
kcp_client_set_last_server_error(client, error_text);
protocol_message_clear(&msg);
@@ -290,6 +312,9 @@ static int kcp_client_receive_business_timed(kcp_client_t *client, message_t *ou
return rc;
}
if (strcmp(out_msg->from, SERVER_PEER_ID) == 0) {
kcp_client_touch_server_activity(client);
}
reserved_rc = kcp_client_handle_reserved_server_message(client, out_msg);
if (reserved_rc < 0) {
protocol_message_clear(out_msg);
@@ -304,6 +329,9 @@ static int kcp_client_receive_business_timed(kcp_client_t *client, message_t *ou
kcp_client_copy_server_error_body(out_msg, error_text, sizeof(error_text));
kcp_client_set_last_server_error(client, error_text);
if (kcp_client_server_error_invalidates_registration(error_text)) {
kcp_client_set_registered(client, 0);
}
}
latencylog_log_message_event(client->logger, OMNI_NODE_ROLE_PEER, client->id, EVENT_B_APP_RECV, out_msg);
return 0;
@@ -399,6 +427,7 @@ kcp_client_t *kcp_client_dial_with_options(const char *server_addr, const char *
snprintf(client->id, sizeof(client->id), "%s", peer_id);
snprintf(client->server_addr, sizeof(client->server_addr), "%s", server_addr == NULL ? "" : server_addr);
pthread_mutex_init(&client->state_mu, NULL);
client->last_server_activity_ms = omni_now_millis32();
client->logger = logger;
client->conn = kcp_conn_dial_with_options(actual_dial_addr, bind_ip, bind_device, options, packet_logger, logger, OMNI_NODE_ROLE_PEER, peer_id, stats_logger, stats_interval_ms);
if (client->conn == NULL) {
@@ -596,6 +625,9 @@ void kcp_client_state_snapshot(kcp_client_t *client, kcp_client_state_t *out_sta
}
pthread_mutex_lock(&client->state_mu);
out_state->registered = client->registered;
out_state->server_idle_ms = client->last_server_activity_ms == 0
? 0
: (omni_now_millis32() - client->last_server_activity_ms);
snprintf(out_state->last_server_error, sizeof(out_state->last_server_error), "%s", client->last_server_error);
pthread_mutex_unlock(&client->state_mu);
}

View File

@@ -28,6 +28,11 @@
#define VIDEO_DEFAULT_CAMERA_DEVICE "/dev/video0"
#define VIDEO_DEFAULT_PEER_ID "peer-b-video"
#define VIDEO_DEFAULT_TARGET_PEER "peer-a-video"
#define VIDEO_SOFT_BACKPRESSURE_SEGMENTS_DEFAULT 64
#define VIDEO_HARD_BACKPRESSURE_SEGMENTS_DEFAULT 192
#define VIDEO_HARD_BACKPRESSURE_HOLD_MS_DEFAULT 1000
#define VIDEO_SOFT_BACKPRESSURE_WINDOW_PRESSURE_PCT 90.0
#define VIDEO_HARD_BACKPRESSURE_WINDOW_PRESSURE_PCT 98.0
typedef struct video_buffer {
void *start;
@@ -137,6 +142,20 @@ static const char *env_first_nonempty(const char *first, const char *second, con
return fallback;
}
static int env_int_or_default(const char *name, int fallback) {
const char *value = getenv(name);
int parsed;
if (value == NULL || value[0] == '\0') {
return fallback;
}
parsed = atoi(value);
if (parsed <= 0) {
return fallback;
}
return parsed;
}
static void video_pipeline_set_error(video_pipeline_stats_t *stats, const char *message) {
if (stats == NULL) {
return;
@@ -179,6 +198,9 @@ void video_pipeline_config_init(video_pipeline_config_t *config) {
config->output_height = VIDEO_OUTPUT_HEIGHT_DEFAULT;
config->max_frames = 0;
config->enable_timing_logs = 0;
config->soft_backpressure_segments = VIDEO_SOFT_BACKPRESSURE_SEGMENTS_DEFAULT;
config->hard_backpressure_segments = VIDEO_HARD_BACKPRESSURE_SEGMENTS_DEFAULT;
config->hard_backpressure_hold_ms = VIDEO_HARD_BACKPRESSURE_HOLD_MS_DEFAULT;
}
void video_pipeline_config_load_env(video_pipeline_config_t *config) {
@@ -196,6 +218,9 @@ void video_pipeline_config_load_env(video_pipeline_config_t *config) {
config->max_frames = atoi(getenv("OMNI_VIDEO_MAX_FRAMES"));
}
config->enable_timing_logs = env_flag_or_default("OMNI_VIDEO_DEBUG_TIMING", config->enable_timing_logs);
config->soft_backpressure_segments = env_int_or_default("OMNI_VIDEO_SOFT_BACKPRESSURE_SEGMENTS", config->soft_backpressure_segments);
config->hard_backpressure_segments = env_int_or_default("OMNI_VIDEO_HARD_BACKPRESSURE_SEGMENTS", config->hard_backpressure_segments);
config->hard_backpressure_hold_ms = env_int_or_default("OMNI_VIDEO_HARD_BACKPRESSURE_HOLD_MS", config->hard_backpressure_hold_ms);
}
int video_pipeline_stats_init(video_pipeline_stats_t *stats) {
@@ -229,9 +254,13 @@ void video_pipeline_stats_snapshot(video_pipeline_stats_t *stats, video_pipeline
out_stats->frames_sent = stats->frames_sent;
out_stats->bytes_sent = stats->bytes_sent;
out_stats->send_errors = stats->send_errors;
out_stats->backpressure_drops = stats->backpressure_drops;
out_stats->backlog_resets = stats->backlog_resets;
out_stats->last_frame_bytes = stats->last_frame_bytes;
out_stats->last_backlog_segments = stats->last_backlog_segments;
out_stats->connected = stats->connected;
snprintf(out_stats->last_error, sizeof(out_stats->last_error), "%s", stats->last_error);
snprintf(out_stats->last_backlog_reason, sizeof(out_stats->last_backlog_reason), "%s", stats->last_backlog_reason);
out_stats->transport = stats->transport;
pthread_mutex_unlock(&stats->mutex);
}
@@ -632,6 +661,56 @@ static void video_sender_close(video_sender_t *sender) {
sender->send_buffer_cap = 0;
}
static uint32_t video_sender_backlog_segments(const kcp_runtime_stats_t *stats) {
if (stats == NULL) {
return 0;
}
return stats->snd_queue + stats->snd_buffer;
}
static int video_sender_soft_backpressure_active(const video_pipeline_config_t *config, const kcp_runtime_stats_t *transport) {
if (config == NULL || transport == NULL) {
return 0;
}
return video_sender_backlog_segments(transport) >= (uint32_t) config->soft_backpressure_segments
|| transport->window_pressure_pct >= VIDEO_SOFT_BACKPRESSURE_WINDOW_PRESSURE_PCT;
}
static int video_sender_hard_backpressure_active(const video_pipeline_config_t *config, const kcp_runtime_stats_t *transport) {
if (config == NULL || transport == NULL) {
return 0;
}
return video_sender_backlog_segments(transport) >= (uint32_t) config->hard_backpressure_segments
|| transport->window_pressure_pct >= VIDEO_HARD_BACKPRESSURE_WINDOW_PRESSURE_PCT;
}
static void video_pipeline_note_backpressure(
video_pipeline_stats_t *stats,
const char *reason,
const kcp_runtime_stats_t *transport,
int increment_drop,
int increment_reset
) {
if (stats == NULL) {
return;
}
pthread_mutex_lock(&stats->mutex);
if (increment_drop) {
stats->backpressure_drops += 1;
}
if (increment_reset) {
stats->backlog_resets += 1;
}
if (transport != NULL) {
stats->last_backlog_segments = video_sender_backlog_segments(transport);
stats->transport = *transport;
} else {
stats->last_backlog_segments = 0;
}
snprintf(stats->last_backlog_reason, sizeof(stats->last_backlog_reason), "%s", reason == NULL ? "" : reason);
pthread_mutex_unlock(&stats->mutex);
}
static void video_pipeline_cleanup_buffers(video_buffer_t *buffers, int num_buffers) {
int i;
if (buffers == NULL) {
@@ -660,6 +739,8 @@ int video_pipeline_run(const video_pipeline_config_t *config, video_pipeline_sta
int sws_src_width = 0;
int sws_src_height = 0;
int sws_src_format = -1;
uint32_t hard_backpressure_since_ms = 0;
uint32_t last_soft_drop_log_ms = 0;
memset(&sender, 0, sizeof(sender));
if (stats == NULL) {
@@ -743,6 +824,7 @@ int video_pipeline_run(const video_pipeline_config_t *config, video_pipeline_sta
AVFrame *decoded_frame = NULL;
AVFrame *scaled_frame = NULL;
AVPacket *encoded_pkt = NULL;
kcp_runtime_stats_t transport_stats;
int select_rc;
double total_start_ms = 0.0;
double capture_start_ms = 0.0;
@@ -757,6 +839,8 @@ int video_pipeline_run(const video_pipeline_config_t *config, video_pipeline_sta
double send_end_ms = 0.0;
int frame_number = frame_index + 1;
memset(&transport_stats, 0, sizeof(transport_stats));
if (config->max_frames > 0 && frame_index >= config->max_frames) {
break;
}
@@ -840,6 +924,84 @@ int video_pipeline_run(const video_pipeline_config_t *config, video_pipeline_sta
send_start_ms = encode_end_ms;
}
kcp_client_runtime_stats_snapshot(sender.client, &transport_stats);
if (video_sender_hard_backpressure_active(config, &transport_stats)) {
uint32_t now_ms = omni_now_millis32();
if (hard_backpressure_since_ms == 0) {
hard_backpressure_since_ms = now_ms;
}
if (now_ms - hard_backpressure_since_ms >= (uint32_t) config->hard_backpressure_hold_ms) {
char reason[128];
uint32_t backlog_segments = video_sender_backlog_segments(&transport_stats);
snprintf(
reason,
sizeof(reason),
"hard_reset backlog=%u snd_queue=%u snd_buffer=%u window_pressure=%.1f%% hold_ms=%d",
backlog_segments,
transport_stats.snd_queue,
transport_stats.snd_buffer,
transport_stats.window_pressure_pct,
config->hard_backpressure_hold_ms
);
video_pipeline_note_backpressure(stats, reason, &transport_stats, 0, 1);
video_pipeline_set_error(stats, reason);
fprintf(
stderr,
"[video_pipeline] backlog hard reset: backlog=%u snd_queue=%u snd_buffer=%u window_pressure=%.1f%% hold_ms=%d\n",
backlog_segments,
transport_stats.snd_queue,
transport_stats.snd_buffer,
transport_stats.window_pressure_pct,
config->hard_backpressure_hold_ms
);
av_frame_free(&decoded_frame);
av_frame_free(&scaled_frame);
av_packet_free(&encoded_pkt);
(void) ioctl(fd, VIDIOC_QBUF, &buf);
rc = VIDEO_PIPELINE_RUN_RETRY_IMMEDIATE;
goto cleanup;
}
} else {
hard_backpressure_since_ms = 0;
}
if (video_sender_soft_backpressure_active(config, &transport_stats)) {
uint32_t now_ms = omni_now_millis32();
uint32_t backlog_segments = video_sender_backlog_segments(&transport_stats);
char reason[128];
snprintf(
reason,
sizeof(reason),
"soft_drop backlog=%u snd_queue=%u snd_buffer=%u window_pressure=%.1f%% threshold=%d",
backlog_segments,
transport_stats.snd_queue,
transport_stats.snd_buffer,
transport_stats.window_pressure_pct,
config->soft_backpressure_segments
);
video_pipeline_note_backpressure(stats, reason, &transport_stats, 1, 0);
if (now_ms - last_soft_drop_log_ms >= 1000U) {
fprintf(
stderr,
"[video_pipeline] soft drop: backlog=%u snd_queue=%u snd_buffer=%u window_pressure=%.1f%% threshold=%d\n",
backlog_segments,
transport_stats.snd_queue,
transport_stats.snd_buffer,
transport_stats.window_pressure_pct,
config->soft_backpressure_segments
);
last_soft_drop_log_ms = now_ms;
}
av_frame_free(&decoded_frame);
av_frame_free(&scaled_frame);
av_packet_free(&encoded_pkt);
(void) ioctl(fd, VIDIOC_QBUF, &buf);
continue;
}
if (video_sender_send_packet(&sender, encoded_pkt, (uint64_t) get_realtime_ms()) != 0) {
pthread_mutex_lock(&stats->mutex);
stats->send_errors += 1;