From 212459a8e4b86791977834f60784b12b1b0ffae6 Mon Sep 17 00:00:00 2001 From: Mock Date: Sat, 18 Apr 2026 12:52:39 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=A2=9E=E5=8A=A0=E6=97=A5=E5=BF=97?= =?UTF-8?q?=E6=A8=A1=E5=9D=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cmd/b_side_omnid.c | 327 +++++++++++++++++- include/kcp_session_stats.h | 4 + include/peer_kcp_client.h | 1 + include/transport_kcp.h | 2 + include/video_pipeline.h | 19 +- python/omnisocket/_omnisocket.c | 26 ++ python/omnisocket/omnisocket_client.c | 14 +- python/omnisocket/omnisocket_client.h | 9 + .../udp_teleop_bridge/omni_transport.py | 6 + scripts/boot/robot-boot.env | 5 + scripts/dev/aggregate-latency-estimates.py | 286 +++++++++++++++ scripts/dev/load-env.sh | 12 +- scripts/dev/robot-remote.env | 7 +- src/kcp_session_stats.c | 8 + src/peer_kcp_client.c | 13 + src/server_kcp_hub.c | 2 + src/transport_kcp.c | 32 ++ src/video_pipeline.c | 207 +++++++++-- 18 files changed, 938 insertions(+), 42 deletions(-) create mode 100644 scripts/dev/aggregate-latency-estimates.py diff --git a/cmd/b_side_omnid.c b/cmd/b_side_omnid.c index 7e267bc..8ecf5f8 100644 --- a/cmd/b_side_omnid.c +++ b/cmd/b_side_omnid.c @@ -20,6 +20,8 @@ #define CONTROL_DEFAULT_PEER_ID "peer-b-ctrl" #define CONTROL_DEFAULT_EXPECTED_SENDER "peer-a-ctrl" +#define CONTROL_ACK_DEFAULT_PEER_ID "peer-b-ctrl-ack" +#define CONTROL_ACK_DEFAULT_TARGET_PEER "peer-a-ctrl-ack" #define CONTROL_DEFAULT_UNIX_SOCKET "/tmp/omnisocket-b-side-cmd.sock" #define CONTROL_DEFAULT_SERVER_IDLE_RECONNECT_MS 3000 #define DEFAULT_RUNTIME_DIR "/run/blitz-robot" @@ -29,6 +31,7 @@ #define DEFAULT_THREAD_HEARTBEAT_TIMEOUT_SEC 15 #define DEFAULT_KCP_STATS_INTERVAL_MS 1000 #define DEFAULT_CONTROL_LATENCY_SAMPLE_MOD 100 +#define DEFAULT_CONTROL_ACK_SAMPLE_MOD 10 #define EXIT_CODE_VIDEO_THREAD_STALLED 101 #define EXIT_CODE_CONTROL_THREAD_STALLED 102 @@ -64,21 +67,32 @@ typedef struct daemon_state { const char *control_bind_device; const char *control_peer_id; const char *control_expected_sender; + const char *control_ack_peer_id; + const char *control_ack_target_peer; const char *control_unix_socket; int control_server_idle_reconnect_ms; const char *runtime_dir; int heartbeat_timeout_sec; int stats_interval_ms; uint64_t control_latency_sample_mod; + uint64_t control_ack_sample_mod; char status_file_path[512]; char video_thread_fault_file[512]; char control_thread_fault_file[512]; atomic_long video_thread_heartbeat_epoch_sec; atomic_long control_thread_heartbeat_epoch_sec; + atomic_int control_ack_shutdown_requested; kcp_session_stats_logger_t *stats_logger; latency_logger_t *control_latency_logger; + video_stage_logger_t *video_stage_logger; unix_dgram_client_t unix_client; control_bridge_stats_t control_stats; + pthread_mutex_t control_ack_mutex; + pthread_t control_ack_thread; + kcp_client_t *control_ack_client; + int control_ack_thread_started; + int control_ack_connect_requested; + int control_ack_connect_inflight; } daemon_state_t; static volatile sig_atomic_t g_stop_requested = 0; @@ -180,6 +194,19 @@ static int should_log_control_latency(const daemon_state_t *state, const message return msg->id % sample_mod == 0U; } +static int should_send_control_ack(const daemon_state_t *state, const message_t *msg) { + uint64_t sample_mod; + + if (state == NULL || msg == NULL) { + return 0; + } + sample_mod = state->control_ack_sample_mod; + if (sample_mod <= 1U) { + return 1; + } + return msg->id % sample_mod == 0U; +} + static void video_pipeline_heartbeat_progress(void *context) { update_thread_heartbeat((atomic_long *) context); } @@ -259,6 +286,62 @@ static void control_bridge_stats_destroy(control_bridge_stats_t *stats) { static void unix_dgram_client_close(unix_dgram_client_t *client); static void control_bridge_stats_snapshot(control_bridge_stats_t *stats, control_bridge_stats_t *out_stats); +static void close_control_ack_client(kcp_client_t **client_ptr); + +static int control_ack_enabled(const daemon_state_t *state) { + return state != NULL + && state->control_ack_peer_id != NULL + && state->control_ack_peer_id[0] != '\0' + && state->control_ack_target_peer != NULL + && state->control_ack_target_peer[0] != '\0'; +} + +static int control_ack_manager_init(daemon_state_t *state) { + int rc; + + if (state == NULL) { + errno = EINVAL; + return -1; + } + rc = pthread_mutex_init(&state->control_ack_mutex, NULL); + if (rc != 0) { + errno = rc; + return -1; + } + atomic_init(&state->control_ack_shutdown_requested, 0); + state->control_ack_client = NULL; + state->control_ack_thread_started = 0; + state->control_ack_connect_requested = 0; + state->control_ack_connect_inflight = 0; + return 0; +} + +static void control_ack_manager_reset(daemon_state_t *state, int request_connect) { + kcp_client_t *client = NULL; + + if (state == NULL) { + return; + } + pthread_mutex_lock(&state->control_ack_mutex); + client = state->control_ack_client; + state->control_ack_client = NULL; + state->control_ack_connect_requested = request_connect && control_ack_enabled(state) && state->control_ack_thread_started; + pthread_mutex_unlock(&state->control_ack_mutex); + close_control_ack_client(&client); +} + +static void control_ack_manager_destroy(daemon_state_t *state) { + if (state == NULL) { + return; + } + atomic_store(&state->control_ack_shutdown_requested, 1); + if (state->control_ack_thread_started) { + pthread_join(state->control_ack_thread, NULL); + state->control_ack_thread_started = 0; + } + control_ack_manager_reset(state, 0); + pthread_mutex_destroy(&state->control_ack_mutex); +} static int write_status_json_atomic(const char *path, cJSON *root) { char *json; @@ -348,6 +431,8 @@ static int write_daemon_status_file(daemon_state_t *state) { cJSON_AddNumberToObject(root, "video_frames_sent", (double) video_stats.frames_sent); cJSON_AddNumberToObject(root, "video_send_errors", (double) video_stats.send_errors); cJSON_AddNumberToObject(root, "video_backlog_resets", (double) video_stats.backlog_resets); + cJSON_AddNumberToObject(root, "video_last_capture_to_send_ms", (double) video_stats.last_capture_to_send_ms); + cJSON_AddNumberToObject(root, "video_avg_capture_to_send_ms", video_stats.avg_capture_to_send_ms); cJSON_AddStringToObject(root, "video_last_error", video_stats.last_error); cJSON_AddBoolToObject(root, "control_registered", control_stats.registered != 0); cJSON_AddNumberToObject(root, "control_reconnect_count", (double) control_stats.reconnect_count); @@ -467,6 +552,147 @@ static void control_message_body_to_cstr(const message_t *msg, char *buffer, siz buffer[copy_len] = '\0'; } +static kcp_client_t *connect_control_ack_client(const daemon_state_t *state) { + kcp_conn_options_t options; + + if (state == NULL || state->control_ack_peer_id == NULL || state->control_ack_peer_id[0] == '\0') { + errno = EINVAL; + return NULL; + } + kcp_conn_options_set_control_defaults(&options); + return kcp_client_dial_with_options( + state->control_server_addr, + state->control_relay_via, + state->control_ack_peer_id, + state->control_bind_ip, + state->control_bind_device, + &options, + NULL, + NULL, + state->stats_logger, + state->stats_interval_ms + ); +} + +static void close_control_ack_client(kcp_client_t **client_ptr) { + if (client_ptr == NULL || *client_ptr == NULL) { + return; + } + kcp_client_close(*client_ptr); + kcp_client_free(*client_ptr); + *client_ptr = NULL; +} + +static void control_ack_manager_request_connect(daemon_state_t *state) { + if (state == NULL || !control_ack_enabled(state) || !state->control_ack_thread_started) { + return; + } + pthread_mutex_lock(&state->control_ack_mutex); + if (state->control_ack_client == NULL) { + state->control_ack_connect_requested = 1; + } + pthread_mutex_unlock(&state->control_ack_mutex); +} + +static void *control_ack_thread_main(void *arg) { + daemon_state_t *state = (daemon_state_t *) arg; + + while (!atomic_load(&state->control_ack_shutdown_requested) && !*state->stop_requested) { + kcp_client_t *client = NULL; + int connect_failed = 0; + int should_connect = 0; + + pthread_mutex_lock(&state->control_ack_mutex); + if (state->control_ack_connect_requested && state->control_ack_client == NULL && !state->control_ack_connect_inflight) { + state->control_ack_connect_inflight = 1; + should_connect = 1; + } + pthread_mutex_unlock(&state->control_ack_mutex); + + if (!should_connect) { + usleep(200000); + continue; + } + + client = connect_control_ack_client(state); + connect_failed = client == NULL; + + pthread_mutex_lock(&state->control_ack_mutex); + state->control_ack_connect_inflight = 0; + if ( + client != NULL + && state->control_ack_connect_requested + && state->control_ack_client == NULL + && !atomic_load(&state->control_ack_shutdown_requested) + && !*state->stop_requested + ) { + state->control_ack_client = client; + state->control_ack_connect_requested = 0; + client = NULL; + } + pthread_mutex_unlock(&state->control_ack_mutex); + + if (client != NULL) { + close_control_ack_client(&client); + } + if (connect_failed && !atomic_load(&state->control_ack_shutdown_requested) && !*state->stop_requested) { + sleep(1); + } + } + return NULL; +} + +static void maybe_send_control_ack( + daemon_state_t *state, + const message_t *msg, + int64_t recv_unix_nano, + int64_t persist_end_unix_nano, + const char *sample_reason +) { + kcp_client_t *ack_client = NULL; + kcp_client_t *client_to_close = NULL; + char *payload = NULL; + int send_rc = -1; + + if ( + state == NULL || msg == NULL || recv_unix_nano <= 0 || persist_end_unix_nano <= recv_unix_nano + || !control_ack_enabled(state) || !state->control_ack_thread_started + ) { + return; + } + + payload = omni_strdup_printf( + "{\"message_id\":%" PRIu64 ",\"ack_phase\":\"persist_end\",\"b_recv_to_persist_us\":%" PRId64 ",\"unix_send_ok\":true,\"sample_reason\":\"%s\"}", + msg->id, + (persist_end_unix_nano - recv_unix_nano) / 1000, + sample_reason == NULL ? "sample_mod" : sample_reason + ); + if (payload == NULL) { + return; + } + + pthread_mutex_lock(&state->control_ack_mutex); + ack_client = state->control_ack_client; + if (ack_client == NULL) { + state->control_ack_connect_requested = 1; + pthread_mutex_unlock(&state->control_ack_mutex); + free(payload); + return; + } + send_rc = kcp_client_send_text(ack_client, state->control_ack_target_peer, payload); + if (send_rc != 0) { + client_to_close = state->control_ack_client; + state->control_ack_client = NULL; + state->control_ack_connect_requested = 1; + } + pthread_mutex_unlock(&state->control_ack_mutex); + + free(payload); + if (client_to_close != NULL) { + close_control_ack_client(&client_to_close); + } +} + static int unix_dgram_client_init(unix_dgram_client_t *client, const char *dest_path) { struct sockaddr_un bind_addr; pid_t pid; @@ -618,11 +844,17 @@ static void *control_thread_main(void *arg) { kcp_client_runtime_stats_snapshot(client, &state->control_stats.transport); pthread_mutex_unlock(&state->control_stats.mutex); } + control_ack_manager_request_connect(state); while (!*state->stop_requested) { message_t msg; int rc; kcp_client_state_t client_state; + int ack_sampled = 0; + int log_control_latency = 0; + int64_t recv_unix_nano = 0; + int64_t persist_begin_unix_nano = 0; + int64_t persist_end_unix_nano = 0; update_thread_heartbeat(&state->control_thread_heartbeat_epoch_sec); protocol_message_init(&msg); @@ -727,9 +959,27 @@ static void *control_thread_main(void *arg) { continue; } - if (should_log_control_latency(state, &msg)) { - latencylog_log_message_event(state->control_latency_logger, OMNI_NODE_ROLE_PEER, state->control_peer_id, EVENT_B_APP_RECV, &msg); - latencylog_log_message_event(state->control_latency_logger, OMNI_NODE_ROLE_PEER, state->control_peer_id, EVENT_B_PERSIST_BEGIN, &msg); + ack_sampled = should_send_control_ack(state, &msg); + log_control_latency = ack_sampled || should_log_control_latency(state, &msg); + if (log_control_latency) { + recv_unix_nano = omni_now_unix_nano(); + persist_begin_unix_nano = recv_unix_nano; + latencylog_log_message_event_at( + state->control_latency_logger, + OMNI_NODE_ROLE_PEER, + state->control_peer_id, + EVENT_B_APP_RECV, + recv_unix_nano, + &msg + ); + latencylog_log_message_event_at( + state->control_latency_logger, + OMNI_NODE_ROLE_PEER, + state->control_peer_id, + EVENT_B_PERSIST_BEGIN, + persist_begin_unix_nano, + &msg + ); } if (unix_dgram_client_send(&state->unix_client, msg.body, msg.body_len) != 0) { @@ -748,8 +998,19 @@ static void *control_thread_main(void *arg) { state->control_stats.server_idle_ms = client_state.server_idle_ms; kcp_client_runtime_stats_snapshot(client, &state->control_stats.transport); pthread_mutex_unlock(&state->control_stats.mutex); - if (should_log_control_latency(state, &msg)) { - latencylog_log_message_event(state->control_latency_logger, OMNI_NODE_ROLE_PEER, state->control_peer_id, EVENT_B_PERSIST_END, &msg); + if (log_control_latency) { + persist_end_unix_nano = omni_now_unix_nano(); + latencylog_log_message_event_at( + state->control_latency_logger, + OMNI_NODE_ROLE_PEER, + state->control_peer_id, + EVENT_B_PERSIST_END, + persist_end_unix_nano, + &msg + ); + } + if (ack_sampled) { + maybe_send_control_ack(state, &msg, recv_unix_nano, persist_end_unix_nano, "sample_mod"); } protocol_message_clear(&msg); continue; @@ -771,8 +1032,19 @@ static void *control_thread_main(void *arg) { state->control_stats.server_idle_ms = client_state.server_idle_ms; kcp_client_runtime_stats_snapshot(client, &state->control_stats.transport); pthread_mutex_unlock(&state->control_stats.mutex); - if (should_log_control_latency(state, &msg)) { - latencylog_log_message_event(state->control_latency_logger, OMNI_NODE_ROLE_PEER, state->control_peer_id, EVENT_B_PERSIST_END, &msg); + if (log_control_latency) { + persist_end_unix_nano = omni_now_unix_nano(); + latencylog_log_message_event_at( + state->control_latency_logger, + OMNI_NODE_ROLE_PEER, + state->control_peer_id, + EVENT_B_PERSIST_END, + persist_end_unix_nano, + &msg + ); + } + if (ack_sampled) { + maybe_send_control_ack(state, &msg, recv_unix_nano, persist_end_unix_nano, "sample_mod"); } protocol_message_clear(&msg); } @@ -781,6 +1053,7 @@ static void *control_thread_main(void *arg) { state->control_stats.registered = 0; state->control_stats.server_idle_ms = 0; pthread_mutex_unlock(&state->control_stats.mutex); + control_ack_manager_reset(state, 0); kcp_client_close(client); kcp_client_free(client); if (!*state->stop_requested && !reconnect_immediately) { @@ -802,13 +1075,15 @@ static void print_stats(daemon_state_t *state) { fprintf( stderr, - "[b_side_omnid] video registered=%d frames=%llu bytes=%llu drops=%llu resets=%llu backlog=%u reason=%s srtt=%dms | control registered=%d idle=%ums reconnects=%llu forwarded=%llu invalid=%llu unix_err=%llu srtt=%dms last_reconnect=%s\n", + "[b_side_omnid] video registered=%d frames=%llu bytes=%llu drops=%llu resets=%llu backlog=%u cap2send=%ums avg=%.1fms reason=%s srtt=%dms | control registered=%d idle=%ums reconnects=%llu forwarded=%llu invalid=%llu unix_err=%llu srtt=%dms last_reconnect=%s\n", video_stats.connected, (unsigned long long) video_stats.frames_sent, (unsigned long long) video_stats.bytes_sent, (unsigned long long) video_stats.backpressure_drops, (unsigned long long) video_stats.backlog_resets, video_stats.last_backlog_segments, + video_stats.last_capture_to_send_ms, + video_stats.avg_capture_to_send_ms, video_stats.last_backlog_reason[0] == '\0' ? "-" : video_stats.last_backlog_reason, video_stats.transport.srtt_ms, control_stats.registered, @@ -839,6 +1114,8 @@ int main(void) { state.control_bind_device = env_first_nonempty("OMNI_CONTROL_BIND_DEVICE", "OMNISOCKET_BIND_DEVICE", ""); state.control_peer_id = env_or_default("OMNI_CONTROL_PEER_ID", CONTROL_DEFAULT_PEER_ID); state.control_expected_sender = env_or_default("OMNI_CONTROL_EXPECTED_SENDER", CONTROL_DEFAULT_EXPECTED_SENDER); + state.control_ack_peer_id = env_or_default("OMNI_CONTROL_ACK_PEER_ID", CONTROL_ACK_DEFAULT_PEER_ID); + state.control_ack_target_peer = env_or_default("OMNI_CONTROL_ACK_TARGET_PEER", CONTROL_ACK_DEFAULT_TARGET_PEER); state.control_unix_socket = env_or_default("OMNI_CONTROL_UNIX_SOCKET_PATH", CONTROL_DEFAULT_UNIX_SOCKET); state.runtime_dir = env_or_default("BLITZ_RUNTIME_DIR", DEFAULT_RUNTIME_DIR); state.heartbeat_timeout_sec = env_int_or_default( @@ -847,9 +1124,11 @@ int main(void) { ); state.stats_interval_ms = env_int_or_default("BLITZ_KCP_STATS_INTERVAL_MS", DEFAULT_KCP_STATS_INTERVAL_MS); state.control_latency_sample_mod = env_u64_or_default("BLITZ_CONTROL_LATENCY_LOG_SAMPLE_MOD", DEFAULT_CONTROL_LATENCY_SAMPLE_MOD); + state.control_ack_sample_mod = env_u64_or_default("BLITZ_CONTROL_ACK_SAMPLE_MOD", DEFAULT_CONTROL_ACK_SAMPLE_MOD); state.video_config.progress_callback = video_pipeline_heartbeat_progress; state.video_config.progress_context = &state.video_thread_heartbeat_epoch_sec; state.video_config.stats_logger = NULL; + state.video_config.stage_logger = NULL; state.video_config.stats_interval_ms = state.stats_interval_ms; state.control_server_idle_reconnect_ms = env_int_or_default( "OMNI_CONTROL_SERVER_IDLE_RECONNECT_MS", @@ -889,8 +1168,15 @@ int main(void) { video_pipeline_stats_destroy(&state.video_stats); return 1; } + if (control_ack_manager_init(&state) != 0) { + perror("control_ack_manager_init"); + control_bridge_stats_destroy(&state.control_stats); + video_pipeline_stats_destroy(&state.video_stats); + return 1; + } if (unix_dgram_client_init(&state.unix_client, state.control_unix_socket) != 0) { perror("unix_dgram_client_init"); + control_ack_manager_destroy(&state); control_bridge_stats_destroy(&state.control_stats); video_pipeline_stats_destroy(&state.video_stats); return 1; @@ -905,6 +1191,7 @@ int main(void) { if (install_signal_handler(SIGINT) != 0 || install_signal_handler(SIGTERM) != 0) { perror("install_signal_handler"); unix_dgram_client_close(&state.unix_client); + control_ack_manager_destroy(&state); control_bridge_stats_destroy(&state.control_stats); video_pipeline_stats_destroy(&state.video_stats); return 1; @@ -913,7 +1200,10 @@ int main(void) { { const char *stats_log_path = getenv("BLITZ_KCP_STATS_LOG_PATH"); const char *latency_log_path = getenv("BLITZ_CONTROL_LATENCY_LOG_PATH"); + const char *video_stage_log_path = getenv("BLITZ_VIDEO_STAGE_LOG_PATH"); int latency_enabled = env_int_or_default("BLITZ_CONTROL_LATENCY_LOG_ENABLED", 1); + int video_stage_log_enabled = env_int_or_default("BLITZ_VIDEO_STAGE_LOG_ENABLED", 1); + uint64_t video_stage_log_sample_mod = env_u64_or_default("BLITZ_VIDEO_STAGE_LOG_SAMPLE_MOD", 10); if (stats_log_path != NULL && stats_log_path[0] != '\0') { state.stats_logger = kcp_session_stats_open_jsonl(stats_log_path); @@ -927,16 +1217,33 @@ int main(void) { fprintf(stderr, "[b_side_omnid] warning: failed to open control latency log %s\n", latency_log_path); } } + if (video_stage_log_enabled && video_stage_log_path != NULL && video_stage_log_path[0] != '\0') { + state.video_stage_logger = video_stage_logger_open_jsonl(video_stage_log_path, video_stage_log_sample_mod); + if (state.video_stage_logger == NULL) { + fprintf(stderr, "[b_side_omnid] warning: failed to open video stage log %s\n", video_stage_log_path); + } + } state.video_config.stats_logger = state.stats_logger; + state.video_config.stage_logger = state.video_stage_logger; state.video_config.stats_interval_ms = state.stats_interval_ms; } + if (control_ack_enabled(&state)) { + if (pthread_create(&state.control_ack_thread, NULL, control_ack_thread_main, &state) != 0) { + fprintf(stderr, "[b_side_omnid] warning: failed to start async control ACK manager, ACK sampling disabled\n"); + } else { + state.control_ack_thread_started = 1; + } + } + if (pthread_create(&video_thread, NULL, video_thread_main, &state) != 0) { perror("pthread_create(video_thread)"); unix_dgram_client_close(&state.unix_client); + control_ack_manager_destroy(&state); control_bridge_stats_destroy(&state.control_stats); video_pipeline_stats_destroy(&state.video_stats); latencylog_close(state.control_latency_logger); + video_stage_logger_close(state.video_stage_logger); kcp_session_stats_close(state.stats_logger); return 1; } @@ -945,9 +1252,11 @@ int main(void) { g_stop_requested = 1; pthread_join(video_thread, NULL); unix_dgram_client_close(&state.unix_client); + control_ack_manager_destroy(&state); control_bridge_stats_destroy(&state.control_stats); video_pipeline_stats_destroy(&state.video_stats); latencylog_close(state.control_latency_logger); + video_stage_logger_close(state.video_stage_logger); kcp_session_stats_close(state.stats_logger); return 1; } @@ -964,9 +1273,11 @@ int main(void) { pthread_join(video_thread, NULL); pthread_join(control_thread, NULL); unix_dgram_client_close(&state.unix_client); + control_ack_manager_destroy(&state); control_bridge_stats_destroy(&state.control_stats); video_pipeline_stats_destroy(&state.video_stats); latencylog_close(state.control_latency_logger); + video_stage_logger_close(state.video_stage_logger); kcp_session_stats_close(state.stats_logger); return 0; } diff --git a/include/kcp_session_stats.h b/include/kcp_session_stats.h index a274df6..166f237 100644 --- a/include/kcp_session_stats.h +++ b/include/kcp_session_stats.h @@ -24,8 +24,12 @@ typedef struct kcp_session_stats_record { uint32_t rto_ms; int has_srtt_ms; int32_t srtt_ms; + int has_min_srtt_ms; + int32_t min_srtt_ms; int has_srttvar_ms; int32_t srttvar_ms; + int has_last_feedback_age_ms; + uint32_t last_feedback_age_ms; int has_snd_wnd; uint32_t snd_wnd; int has_rmt_wnd; diff --git a/include/peer_kcp_client.h b/include/peer_kcp_client.h index 84ff7e1..426ec3d 100644 --- a/include/peer_kcp_client.h +++ b/include/peer_kcp_client.h @@ -28,6 +28,7 @@ kcp_client_t *kcp_client_dial(const char *server_addr, const char *dial_addr, co const char *kcp_client_id(const kcp_client_t *client); int kcp_client_send_text(kcp_client_t *client, const char *to, const char *text); int kcp_client_send_binary(kcp_client_t *client, const char *to, const void *data, size_t data_len); +int kcp_client_send_binary_with_id(kcp_client_t *client, const char *to, const void *data, size_t data_len, uint64_t *out_id); int kcp_client_send_file_path(kcp_client_t *client, const char *to, const char *path); int kcp_client_receive_timed(kcp_client_t *client, message_t *out_msg, int timeout_ms); int kcp_client_receive(kcp_client_t *client, message_t *out_msg); diff --git a/include/transport_kcp.h b/include/transport_kcp.h index 375fc6b..f9e1bbc 100644 --- a/include/transport_kcp.h +++ b/include/transport_kcp.h @@ -56,7 +56,9 @@ typedef struct kcp_runtime_stats { uint32_t conv; uint32_t rto_ms; int32_t srtt_ms; + int32_t min_srtt_ms; int32_t srttvar_ms; + uint32_t last_feedback_age_ms; uint32_t snd_wnd; uint32_t rmt_wnd; uint32_t inflight; diff --git a/include/video_pipeline.h b/include/video_pipeline.h index a07c932..fd198f8 100644 --- a/include/video_pipeline.h +++ b/include/video_pipeline.h @@ -6,22 +6,34 @@ #include #include "gps_buffer.h" +#include "omni_common.h" #include "peer_kcp_client.h" #ifdef __cplusplus extern "C" { #endif +#if defined(__GNUC__) +typedef struct __attribute__((packed)) video_pipeline_packet_metadata { +#else typedef struct video_pipeline_packet_metadata { +#endif uint64_t timestamp_ms; double latitude; double longitude; + uint32_t capture_to_send_ms; } video_pipeline_packet_metadata_t; +typedef struct video_stage_logger { + omni_file_logger_t file_logger; + int enabled; + uint64_t sample_mod; +} video_stage_logger_t; + typedef void (*video_pipeline_progress_fn)(void *context); #if defined(__STDC_VERSION__) && __STDC_VERSION__ >= 201112L -_Static_assert(sizeof(video_pipeline_packet_metadata_t) == 24, "video trailer metadata must be 24 bytes"); +_Static_assert(sizeof(video_pipeline_packet_metadata_t) == 28, "video trailer metadata must be 28 bytes"); #endif typedef struct video_pipeline_config { @@ -43,6 +55,7 @@ typedef struct video_pipeline_config { int hard_backpressure_hold_ms; int frame_stall_reconnect_ms; kcp_session_stats_logger_t *stats_logger; + video_stage_logger_t *stage_logger; int stats_interval_ms; video_pipeline_progress_fn progress_callback; void *progress_context; @@ -57,6 +70,8 @@ typedef struct video_pipeline_stats { uint64_t backlog_resets; uint64_t last_frame_bytes; uint32_t last_backlog_segments; + uint32_t last_capture_to_send_ms; + double avg_capture_to_send_ms; int connected; char last_error[256]; char last_backlog_reason[128]; @@ -70,6 +85,8 @@ void video_pipeline_config_load_env(video_pipeline_config_t *config); int video_pipeline_stats_init(video_pipeline_stats_t *stats); void video_pipeline_stats_destroy(video_pipeline_stats_t *stats); void video_pipeline_stats_snapshot(video_pipeline_stats_t *stats, video_pipeline_stats_t *out_stats); +video_stage_logger_t *video_stage_logger_open_jsonl(const char *path, uint64_t sample_mod); +void video_stage_logger_close(video_stage_logger_t *logger); int video_pipeline_run(const video_pipeline_config_t *config, video_pipeline_stats_t *stats, volatile sig_atomic_t *stop_requested); #ifdef __cplusplus diff --git a/python/omnisocket/_omnisocket.c b/python/omnisocket/_omnisocket.c index cd55a6c..1ba6830 100644 --- a/python/omnisocket/_omnisocket.c +++ b/python/omnisocket/_omnisocket.c @@ -119,7 +119,9 @@ static PyObject *build_kcp_stats_dict(const omnisocket_session_kcp_stats_t *stat SET_KCP_STAT("conv", PyLong_FromUnsignedLong(stats->conv)); SET_KCP_STAT("rto_ms", PyLong_FromUnsignedLong(stats->rto_ms)); SET_KCP_STAT("srtt_ms", PyLong_FromLong(stats->srtt_ms)); + SET_KCP_STAT("min_srtt_ms", PyLong_FromLong(stats->min_srtt_ms)); SET_KCP_STAT("srttvar_ms", PyLong_FromLong(stats->srttvar_ms)); + SET_KCP_STAT("last_feedback_age_ms", PyLong_FromUnsignedLong(stats->last_feedback_age_ms)); SET_KCP_STAT("snd_wnd", PyLong_FromUnsignedLong(stats->snd_wnd)); SET_KCP_STAT("rmt_wnd", PyLong_FromUnsignedLong(stats->rmt_wnd)); SET_KCP_STAT("inflight", PyLong_FromUnsignedLong(stats->inflight)); @@ -279,6 +281,29 @@ static PyObject *PyOmniSession_send(PyOmniSession *self, PyObject *args, PyObjec Py_RETURN_NONE; } +static PyObject *PyOmniSession_send_with_id(PyOmniSession *self, PyObject *args, PyObject *kwargs) { + const char *to; + Py_buffer payload; + int rc; + uint64_t message_id = 0; + static char *kwlist[] = {"to", "data", NULL}; + + memset(&payload, 0, sizeof(payload)); + if (!PyArg_ParseTupleAndKeywords(args, kwargs, "sy*", kwlist, &to, &payload)) { + return NULL; + } + + Py_BEGIN_ALLOW_THREADS + rc = omnisocket_session_send_with_id(&self->session, to, payload.buf, (size_t) payload.len, &message_id); + Py_END_ALLOW_THREADS + + PyBuffer_Release(&payload); + if (rc != 0) { + return PyErr_SetFromErrno(PyExc_OSError); + } + return PyLong_FromUnsignedLongLong((unsigned long long) message_id); +} + static PyObject *PyOmniSession_recv(PyOmniSession *self, PyObject *args, PyObject *kwargs) { int timeout_ms = -1; int rc; @@ -379,6 +404,7 @@ static PyMethodDef PyOmniSession_methods[] = { {"connect", (PyCFunction) PyOmniSession_connect, METH_VARARGS | METH_KEYWORDS, NULL}, {"close", (PyCFunction) PyOmniSession_close, METH_NOARGS, NULL}, {"send", (PyCFunction) PyOmniSession_send, METH_VARARGS | METH_KEYWORDS, NULL}, + {"send_with_id", (PyCFunction) PyOmniSession_send_with_id, METH_VARARGS | METH_KEYWORDS, NULL}, {"recv", (PyCFunction) PyOmniSession_recv, METH_VARARGS | METH_KEYWORDS, PyOmniSession_recv_doc}, {"recv_into", (PyCFunction) PyOmniSession_recv_into, METH_VARARGS | METH_KEYWORDS, PyOmniSession_recv_into_doc}, {"stats", (PyCFunction) PyOmniSession_stats, METH_NOARGS, NULL}, diff --git a/python/omnisocket/omnisocket_client.c b/python/omnisocket/omnisocket_client.c index 79eff06..3dd0a0f 100644 --- a/python/omnisocket/omnisocket_client.c +++ b/python/omnisocket/omnisocket_client.c @@ -167,6 +167,16 @@ int omnisocket_session_close(omnisocket_session_t *session) { } int omnisocket_session_send(omnisocket_session_t *session, const char *to, const void *data, size_t data_len) { + return omnisocket_session_send_with_id(session, to, data, data_len, NULL); +} + +int omnisocket_session_send_with_id( + omnisocket_session_t *session, + const char *to, + const void *data, + size_t data_len, + uint64_t *out_message_id +) { kcp_client_t *client; int rc; @@ -178,7 +188,7 @@ int omnisocket_session_send(omnisocket_session_t *session, const char *to, const if (omnisocket_session_begin_client_op(session, &client) != 0) { return -1; } - rc = kcp_client_send_binary(client, to, data, data_len); + rc = kcp_client_send_binary_with_id(client, to, data, data_len, out_message_id); pthread_mutex_lock(&session->mutex); if (rc == 0) { session->stats.send_calls += 1; @@ -297,7 +307,9 @@ void omnisocket_session_kcp_stats_snapshot(omnisocket_session_t *session, omniso out_stats->conv = runtime_stats.conv; out_stats->rto_ms = runtime_stats.rto_ms; out_stats->srtt_ms = runtime_stats.srtt_ms; + out_stats->min_srtt_ms = runtime_stats.min_srtt_ms; out_stats->srttvar_ms = runtime_stats.srttvar_ms; + out_stats->last_feedback_age_ms = runtime_stats.last_feedback_age_ms; out_stats->snd_wnd = runtime_stats.snd_wnd; out_stats->rmt_wnd = runtime_stats.rmt_wnd; out_stats->inflight = runtime_stats.inflight; diff --git a/python/omnisocket/omnisocket_client.h b/python/omnisocket/omnisocket_client.h index a6842ab..b5e0db8 100644 --- a/python/omnisocket/omnisocket_client.h +++ b/python/omnisocket/omnisocket_client.h @@ -22,7 +22,9 @@ typedef struct omnisocket_session_kcp_stats { uint32_t conv; uint32_t rto_ms; int32_t srtt_ms; + int32_t min_srtt_ms; int32_t srttvar_ms; + uint32_t last_feedback_age_ms; uint32_t snd_wnd; uint32_t rmt_wnd; uint32_t inflight; @@ -72,6 +74,13 @@ int omnisocket_session_connect( ); int omnisocket_session_close(omnisocket_session_t *session); int omnisocket_session_send(omnisocket_session_t *session, const char *to, const void *data, size_t data_len); +int omnisocket_session_send_with_id( + omnisocket_session_t *session, + const char *to, + const void *data, + size_t data_len, + uint64_t *out_message_id +); int omnisocket_session_recv(omnisocket_session_t *session, message_t *out_msg, int timeout_ms); int omnisocket_session_recv_into( omnisocket_session_t *session, diff --git a/ros-control-py/udp_teleop_bridge/udp_teleop_bridge/omni_transport.py b/ros-control-py/udp_teleop_bridge/udp_teleop_bridge/omni_transport.py index 6c0353c..3978ac3 100644 --- a/ros-control-py/udp_teleop_bridge/udp_teleop_bridge/omni_transport.py +++ b/ros-control-py/udp_teleop_bridge/udp_teleop_bridge/omni_transport.py @@ -72,6 +72,12 @@ class OmniTransport: def send(self, *, to: str, data: bytes) -> None: self._session.send(to=to, data=data) + def send_with_id(self, *, to: str, data: bytes) -> int: + if not hasattr(self._session, 'send_with_id'): + self._session.send(to=to, data=data) + raise RuntimeError('send_with_id is not available on this omnisocket build') + return int(self._session.send_with_id(to=to, data=data)) + def recv(self, *, timeout_ms: int = -1): return self._session.recv(timeout_ms=timeout_ms) diff --git a/scripts/boot/robot-boot.env b/scripts/boot/robot-boot.env index 62ae87c..152a737 100644 --- a/scripts/boot/robot-boot.env +++ b/scripts/boot/robot-boot.env @@ -32,6 +32,9 @@ BLITZ_OMNID_THREAD_HEARTBEAT_TIMEOUT_SEC="15" BLITZ_KCP_STATS_INTERVAL_MS="1000" BLITZ_CONTROL_LATENCY_LOG_ENABLED="1" BLITZ_CONTROL_LATENCY_LOG_SAMPLE_MOD="100" +BLITZ_CONTROL_ACK_SAMPLE_MOD="10" +BLITZ_VIDEO_STAGE_LOG_ENABLED="1" +BLITZ_VIDEO_STAGE_LOG_SAMPLE_MOD="10" BLITZ_5G_LINK_LOG_INTERVAL_SEC="5" BLITZ_JSONL_FLUSH_INTERVAL_MS="1000" BLITZ_JSONL_FLUSH_BYTES="262144" @@ -53,3 +56,5 @@ OMNI_CAMERA_DEVICE="/dev/v4l/by-path/platform-a80aa10000.usb-usb-0:3.2:1.4-video # Boot units run b_side_omnid as root directly, so nested sudo must stay off. B_SIDE_OMNID_USE_SUDO="0" +OMNI_CONTROL_ACK_PEER_ID="peer-b-ctrl-ack" +OMNI_CONTROL_ACK_TARGET_PEER="peer-a-ctrl-ack" diff --git a/scripts/dev/aggregate-latency-estimates.py b/scripts/dev/aggregate-latency-estimates.py new file mode 100644 index 0000000..32a8a92 --- /dev/null +++ b/scripts/dev/aggregate-latency-estimates.py @@ -0,0 +1,286 @@ +#!/usr/bin/env python3 +from __future__ import annotations + +import argparse +from datetime import datetime, timezone +import html +import json +from pathlib import Path +from typing import Any + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description="Aggregate run logs into control/video latency estimate outputs.") + parser.add_argument("--run-dir", required=True, help="Run directory containing JSONL logs.") + parser.add_argument("--output-dir", help="Output directory. Defaults to --run-dir.") + return parser.parse_args() + + +def iter_jsonl(path: Path) -> list[dict[str, Any]]: + records: list[dict[str, Any]] = [] + if not path.exists(): + return records + with path.open("r", encoding="utf-8") as handle: + for raw_line in handle: + line = raw_line.strip() + if not line: + continue + try: + payload = json.loads(line) + except json.JSONDecodeError: + continue + if isinstance(payload, dict): + records.append(payload) + return records + + +def load_glob_jsonl(run_dir: Path, pattern: str) -> list[dict[str, Any]]: + records: list[dict[str, Any]] = [] + for path in sorted(run_dir.glob(pattern)): + records.extend(iter_jsonl(path)) + return records + + +def write_jsonl(path: Path, records: list[dict[str, Any]]) -> None: + with path.open("w", encoding="utf-8") as handle: + for record in records: + handle.write(json.dumps(record, ensure_ascii=False, separators=(",", ":"))) + handle.write("\n") + + +def parse_unix_ms(value: Any) -> int | None: + if value is None: + return None + if isinstance(value, (int, float)): + return int(value) + text = str(value).strip() + if not text: + return None + if text.endswith("Z"): + text = f"{text[:-1]}+00:00" + try: + return int(datetime.fromisoformat(text).astimezone(timezone.utc).timestamp() * 1000) + except ValueError: + return None + + +def flatten_net_epoch(samples: list[dict[str, Any]]) -> list[dict[str, Any]]: + flattened: list[dict[str, Any]] = [] + for sample in samples: + links = sample.get("links") or {} + a_to_d = (links.get("a_to_d") or {}).get("sessions") or {} + d_to_b = (links.get("d_to_b") or {}).get("sessions") or {} + a_control = (a_to_d.get("control") or {}).get("kcp") or {} + d_control = (d_to_b.get("control") or {}).get("kcp") or {} + a_video = (a_to_d.get("video") or {}).get("kcp") or {} + d_video = (d_to_b.get("video") or {}).get("kcp") or {} + flattened.append( + { + "updated_at": sample.get("updated_at"), + "a_to_d_control_srtt_ms": a_control.get("srtt_ms"), + "a_to_d_control_min_srtt_ms": a_control.get("min_srtt_ms"), + "d_to_b_control_srtt_ms": d_control.get("srtt_ms"), + "d_to_b_control_min_srtt_ms": d_control.get("min_srtt_ms"), + "a_to_d_video_srtt_ms": a_video.get("srtt_ms"), + "a_to_d_video_min_srtt_ms": a_video.get("min_srtt_ms"), + "d_to_b_video_srtt_ms": d_video.get("srtt_ms"), + "d_to_b_video_min_srtt_ms": d_video.get("min_srtt_ms"), + "a_to_d_control_feedback_age_ms": a_control.get("last_feedback_age_ms"), + "d_to_b_control_feedback_age_ms": d_control.get("last_feedback_age_ms"), + "a_to_d_video_feedback_age_ms": a_video.get("last_feedback_age_ms"), + "d_to_b_video_feedback_age_ms": d_video.get("last_feedback_age_ms"), + "a_to_d_control_retrans_delta": ((a_to_d.get("control") or {}).get("trend") or {}).get("retrans_delta"), + "d_to_b_control_retrans_delta": ((d_to_b.get("control") or {}).get("trend") or {}).get("retrans_delta"), + "a_to_d_video_retrans_delta": ((a_to_d.get("video") or {}).get("trend") or {}).get("retrans_delta"), + "d_to_b_video_retrans_delta": ((d_to_b.get("video") or {}).get("trend") or {}).get("retrans_delta"), + "a_to_d_video_window_pressure_pct": a_video.get("window_pressure_pct"), + "d_to_b_video_window_pressure_pct": d_video.get("window_pressure_pct"), + "robot_health": sample.get("robot_health"), + } + ) + return flattened + + +def aggregate_control_estimates( + network_samples: list[dict[str, Any]], + control_events: list[dict[str, Any]], + control_acks: list[dict[str, Any]], +) -> list[dict[str, Any]]: + if control_acks: + return control_acks + + fallback: list[dict[str, Any]] = [] + for sample in network_samples: + estimate = sample.get("latency_estimate") or {} + fallback.append( + { + "updated_at": sample.get("updated_at"), + "estimate_method": "srtt_fallback", + "control_loop_rtt_ms": estimate.get("control_loop_rtt_ms"), + "control_to_persist_est_ms": estimate.get("control_to_persist_est_ms"), + "control_oneway_srtt_est_ms": estimate.get("control_oneway_srtt_est_ms"), + "control_oneway_bestcase_est_ms": estimate.get("control_oneway_bestcase_est_ms"), + "source_event_count": len(control_events), + } + ) + return fallback + + +def aggregate_video_estimates( + network_samples: list[dict[str, Any]], + frame_recv_records: list[dict[str, Any]], + display_probe_records: list[dict[str, Any]], +) -> list[dict[str, Any]]: + network_timeline = sorted( + ( + (updated_at_ms, sample.get("latency_estimate") or {}) + for sample in network_samples + for updated_at_ms in [parse_unix_ms(sample.get("updated_at"))] + if updated_at_ms is not None + ), + key=lambda item: item[0], + ) + probes_by_seq = { + int(record["frame_seq"]): record + for record in display_probe_records + if record.get("frame_seq") is not None + } + estimates: list[dict[str, Any]] = [] + timeline_index = 0 + + for record in frame_recv_records: + frame_seq = record.get("frame_seq") + if frame_seq is None: + continue + probe = probes_by_seq.get(int(frame_seq)) + backend_received_unix_ns = record.get("backend_received_unix_ns") + backend_received_unix_ms = None + try: + if backend_received_unix_ns is not None: + backend_received_unix_ms = int(int(backend_received_unix_ns) / 1_000_000) + except (TypeError, ValueError): + backend_received_unix_ms = None + + latency_estimate: dict[str, Any] = {} + if backend_received_unix_ms is not None and network_timeline: + while timeline_index + 1 < len(network_timeline) and network_timeline[timeline_index + 1][0] <= backend_received_unix_ms: + timeline_index += 1 + if network_timeline[timeline_index][0] <= backend_received_unix_ms: + latency_estimate = network_timeline[timeline_index][1] + + network_oneway = latency_estimate.get("video_network_oneway_est_ms") + capture_to_send = record.get("b_side_capture_to_send_ms") + partial_est = None + if capture_to_send is not None or network_oneway is not None: + partial_est = round(float(capture_to_send or 0.0) + float(network_oneway or 0.0), 3) + a_recv_to_paint_ms = None + if probe is not None and probe.get("backend_received_unix_ns") is not None and probe.get("paint_unix_ms") is not None: + a_recv_to_paint_ms = round( + float(probe["paint_unix_ms"]) - (int(probe["backend_received_unix_ns"]) / 1_000_000.0), + 3, + ) + video_e2e_est_ms = round(partial_est + a_recv_to_paint_ms, 3) if partial_est is not None and a_recv_to_paint_ms is not None else None + estimates.append( + { + "frame_seq": frame_seq, + "backend_received_unix_ns": record.get("backend_received_unix_ns"), + "frame_hash": record.get("frame_hash"), + "estimate_method": "capture_to_send+srtt/2+recv_to_paint" if video_e2e_est_ms is not None else "capture_to_send+srtt/2", + "video_network_oneway_est_ms": network_oneway, + "b_side_capture_to_send_ms": capture_to_send, + "a_recv_to_paint_ms": a_recv_to_paint_ms, + "video_partial_est_ms": partial_est, + "video_e2e_est_ms": video_e2e_est_ms, + "sequence_gap": record.get("sequence_gap"), + "repeat_flag": record.get("repeat_flag"), + "sender_clock_delta_ms_raw": record.get("sender_clock_delta_ms_raw"), + } + ) + return estimates + + +def write_html_summary( + path: Path, + *, + net_epochs: list[dict[str, Any]], + control_estimates: list[dict[str, Any]], + video_estimates: list[dict[str, Any]], +) -> None: + latest_control = control_estimates[-1] if control_estimates else {} + latest_video = video_estimates[-1] if video_estimates else {} + latest_net = net_epochs[-1] if net_epochs else {} + html_text = f""" + + + + Latency Estimates + + + +

Latency Estimates

+
+
+

Control

+

loop RTT: {html.escape(str(latest_control.get("control_loop_rtt_ms")))}

+

to persist: {html.escape(str(latest_control.get("control_to_persist_est_ms")))}

+

method: {html.escape(str(latest_control.get("estimate_method")))}

+

samples: {len(control_estimates)}

+
+
+

Video

+

network one-way: {html.escape(str(latest_video.get("video_network_oneway_est_ms")))}

+

partial: {html.escape(str(latest_video.get("video_partial_est_ms")))}

+

end-to-end: {html.escape(str(latest_video.get("video_e2e_est_ms")))}

+

samples: {len(video_estimates)}

+
+
+

Net Epoch

+

a→d control srtt: {html.escape(str(latest_net.get("a_to_d_control_srtt_ms")))}

+

d→b control srtt: {html.escape(str(latest_net.get("d_to_b_control_srtt_ms")))}

+

a→d video srtt: {html.escape(str(latest_net.get("a_to_d_video_srtt_ms")))}

+

d→b video srtt: {html.escape(str(latest_net.get("d_to_b_video_srtt_ms")))}

+
+
+ + +""" + path.write_text(html_text, encoding="utf-8") + + +def main() -> int: + args = parse_args() + run_dir = Path(args.run_dir).resolve() + output_dir = Path(args.output_dir).resolve() if args.output_dir else run_dir + output_dir.mkdir(parents=True, exist_ok=True) + + network_samples = load_glob_jsonl(run_dir, "a-network-summary.*.jsonl") + control_events = load_glob_jsonl(run_dir, "a-control-events.*.jsonl") + control_acks = load_glob_jsonl(run_dir, "a-control-acks.*.jsonl") + frame_recv_records = load_glob_jsonl(run_dir, "a-video-frame-recv.*.jsonl") + display_probe_records = load_glob_jsonl(run_dir, "a-video-display-probe.*.jsonl") + + net_epochs = flatten_net_epoch(network_samples) + control_estimates = aggregate_control_estimates(network_samples, control_events, control_acks) + video_estimates = aggregate_video_estimates(network_samples, frame_recv_records, display_probe_records) + + write_jsonl(output_dir / "net-epoch-summary.jsonl", net_epochs) + write_jsonl(output_dir / "control-latency-estimates.jsonl", control_estimates) + write_jsonl(output_dir / "video-latency-estimates.jsonl", video_estimates) + write_html_summary( + output_dir / "latency-estimates.html", + net_epochs=net_epochs, + control_estimates=control_estimates, + video_estimates=video_estimates, + ) + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/scripts/dev/load-env.sh b/scripts/dev/load-env.sh index c966cb3..4d0fde4 100644 --- a/scripts/dev/load-env.sh +++ b/scripts/dev/load-env.sh @@ -124,7 +124,7 @@ export OMNISOCKET_TELEMETRY_INTERVAL_MS="${OMNISOCKET_TELEMETRY_INTERVAL_MS:-100 export OMNISOCKET_TELEMETRY_STALE_AFTER_MS="${OMNISOCKET_TELEMETRY_STALE_AFTER_MS:-3000}" export OMNI_NETWORK_SUMMARY_LOG_ENABLED="${OMNI_NETWORK_SUMMARY_LOG_ENABLED:-1}" export OMNI_NETWORK_SUMMARY_LOG_PATH="${OMNI_NETWORK_SUMMARY_LOG_PATH:-${OMNISOCKETGO_ROOT}/logs/a-network-summary.jsonl}" -export OMNI_NETWORK_SUMMARY_LOG_INTERVAL_MS="${OMNI_NETWORK_SUMMARY_LOG_INTERVAL_MS:-2000}" +export OMNI_NETWORK_SUMMARY_LOG_INTERVAL_MS="${OMNI_NETWORK_SUMMARY_LOG_INTERVAL_MS:-1000}" export OMNI_NETWORK_SUMMARY_LOG_REQUEST_TIMEOUT_SEC="${OMNI_NETWORK_SUMMARY_LOG_REQUEST_TIMEOUT_SEC:-3}" export CONTROL_SIDE_OMNISOCKET_SERVER_ADDR="${CONTROL_SIDE_OMNISOCKET_SERVER_ADDR:-}" export CONTROL_SIDE_OMNISOCKET_RELAY_VIA="${CONTROL_SIDE_OMNISOCKET_RELAY_VIA:-}" @@ -152,6 +152,8 @@ export OMNI_VIDEO_RELAY_VIA="${OMNI_VIDEO_RELAY_VIA:-${ROBOT_SIDE_OMNISOCKET_REL export OMNI_CONTROL_SERVER_ADDR="${OMNI_CONTROL_SERVER_ADDR:-${ROBOT_SIDE_OMNISOCKET_SERVER_ADDR:-}}" export OMNI_CONTROL_RELAY_VIA="${OMNI_CONTROL_RELAY_VIA:-${ROBOT_SIDE_OMNISOCKET_RELAY_VIA:-}}" export OMNI_CONTROL_UNIX_SOCKET_PATH="${OMNI_CONTROL_UNIX_SOCKET_PATH:-${ROBOT_RECEIVER_LOCAL_SOCKET_PATH}}" +export OMNI_CONTROL_ACK_PEER_ID="${OMNI_CONTROL_ACK_PEER_ID:-peer-b-ctrl-ack}" +export OMNI_CONTROL_ACK_TARGET_PEER="${OMNI_CONTROL_ACK_TARGET_PEER:-peer-a-ctrl-ack}" export B_SIDE_OMNID_USE_SUDO="${B_SIDE_OMNID_USE_SUDO:-1}" export BLITZ_RUNTIME_DIR="${BLITZ_RUNTIME_DIR:-${OMNISOCKETGO_ROOT}/logs/runtime}" export BLITZ_RUN_ROOT="${BLITZ_RUN_ROOT:-${OMNISOCKETGO_ROOT}/logs}" @@ -167,6 +169,9 @@ export BLITZ_TIME_SERVER_IP="${BLITZ_TIME_SERVER_IP:-}" export BLITZ_KCP_STATS_INTERVAL_MS="${BLITZ_KCP_STATS_INTERVAL_MS:-1000}" export BLITZ_CONTROL_LATENCY_LOG_ENABLED="${BLITZ_CONTROL_LATENCY_LOG_ENABLED:-1}" export BLITZ_CONTROL_LATENCY_LOG_SAMPLE_MOD="${BLITZ_CONTROL_LATENCY_LOG_SAMPLE_MOD:-100}" +export BLITZ_CONTROL_ACK_SAMPLE_MOD="${BLITZ_CONTROL_ACK_SAMPLE_MOD:-10}" +export BLITZ_VIDEO_STAGE_LOG_ENABLED="${BLITZ_VIDEO_STAGE_LOG_ENABLED:-1}" +export BLITZ_VIDEO_STAGE_LOG_SAMPLE_MOD="${BLITZ_VIDEO_STAGE_LOG_SAMPLE_MOD:-10}" export BLITZ_5G_LINK_LOG_INTERVAL_SEC="${BLITZ_5G_LINK_LOG_INTERVAL_SEC:-5}" export BLITZ_JSONL_FLUSH_INTERVAL_MS="${BLITZ_JSONL_FLUSH_INTERVAL_MS:-1000}" export BLITZ_JSONL_FLUSH_BYTES="${BLITZ_JSONL_FLUSH_BYTES:-262144}" @@ -294,12 +299,17 @@ blitz_dev_prepare_backend_logging_env() { export OMNI_NETWORK_SUMMARY_LOG_PATH OMNI_NETWORK_SUMMARY_LOG_PATH="$(blitz_dev_component_log_path "a-network-summary")" fi + export BLITZ_A_CONTROL_EVENTS_LOG_PATH="${BLITZ_A_CONTROL_EVENTS_LOG_PATH:-$(blitz_dev_component_log_path "a-control-events")}" + export BLITZ_A_CONTROL_ACKS_LOG_PATH="${BLITZ_A_CONTROL_ACKS_LOG_PATH:-$(blitz_dev_component_log_path "a-control-acks")}" + export BLITZ_A_VIDEO_FRAME_RECV_LOG_PATH="${BLITZ_A_VIDEO_FRAME_RECV_LOG_PATH:-$(blitz_dev_component_log_path "a-video-frame-recv")}" + export BLITZ_A_VIDEO_DISPLAY_PROBE_LOG_PATH="${BLITZ_A_VIDEO_DISPLAY_PROBE_LOG_PATH:-$(blitz_dev_component_log_path "a-video-display-probe")}" } blitz_dev_prepare_bside_logging_env() { blitz_dev_init_instance_context export BLITZ_KCP_STATS_LOG_PATH="${BLITZ_KCP_STATS_LOG_PATH:-$(blitz_dev_component_log_path "b-kcp-session-stats")}" export BLITZ_CONTROL_LATENCY_LOG_PATH="${BLITZ_CONTROL_LATENCY_LOG_PATH:-$(blitz_dev_component_log_path "b-control-latency")}" + export BLITZ_VIDEO_STAGE_LOG_PATH="${BLITZ_VIDEO_STAGE_LOG_PATH:-$(blitz_dev_component_log_path "b-video-frame-stages")}" } blitz_dev_prepare_5g_logging_env() { diff --git a/scripts/dev/robot-remote.env b/scripts/dev/robot-remote.env index 9974659..ebb29b1 100644 --- a/scripts/dev/robot-remote.env +++ b/scripts/dev/robot-remote.env @@ -26,7 +26,7 @@ OMNISOCKET_TELEMETRY_INTERVAL_MS="1000" OMNISOCKET_TELEMETRY_STALE_AFTER_MS="3000" OMNI_NETWORK_SUMMARY_LOG_ENABLED="1" OMNI_NETWORK_SUMMARY_LOG_PATH="${OMNISOCKETGO_ROOT}/logs/a-network-summary.jsonl" -OMNI_NETWORK_SUMMARY_LOG_INTERVAL_MS="5000" +OMNI_NETWORK_SUMMARY_LOG_INTERVAL_MS="1000" OMNI_NETWORK_SUMMARY_LOG_REQUEST_TIMEOUT_SEC="3" FRONTEND_HOST="0.0.0.0" @@ -63,6 +63,11 @@ OMNI_CONTROL_EXPECTED_SENDER="peer-a-ctrl" OMNI_CONTROL_SERVER_ADDR="${ROBOT_SIDE_OMNISOCKET_SERVER_ADDR}" OMNI_CONTROL_RELAY_VIA="${ROBOT_SIDE_OMNISOCKET_RELAY_VIA}" OMNI_CONTROL_UNIX_SOCKET_PATH="${ROBOT_RECEIVER_LOCAL_SOCKET_PATH}" +OMNI_CONTROL_ACK_PEER_ID="peer-b-ctrl-ack" +OMNI_CONTROL_ACK_TARGET_PEER="peer-a-ctrl-ack" +BLITZ_CONTROL_ACK_SAMPLE_MOD="10" +BLITZ_VIDEO_STAGE_LOG_ENABLED="1" +BLITZ_VIDEO_STAGE_LOG_SAMPLE_MOD="10" OMNI_CONTROL_SERVER_IDLE_RECONNECT_MS="30000" # A-side backend video freshness guard. Used by scripts/dev/start-backend.sh. diff --git a/src/kcp_session_stats.c b/src/kcp_session_stats.c index ad474cc..c0b8ef8 100644 --- a/src/kcp_session_stats.c +++ b/src/kcp_session_stats.c @@ -156,10 +156,18 @@ int kcp_session_stats_log(kcp_session_stats_logger_t *logger, const kcp_session_ kcp_session_stats_appendf(&line, &line_len, ",\"srtt_ms\":%d", record->srtt_ms) != 0) { goto cleanup; } + if (record->has_min_srtt_ms && + kcp_session_stats_appendf(&line, &line_len, ",\"min_srtt_ms\":%d", record->min_srtt_ms) != 0) { + goto cleanup; + } if (record->has_srttvar_ms && kcp_session_stats_appendf(&line, &line_len, ",\"srttvar_ms\":%d", record->srttvar_ms) != 0) { goto cleanup; } + if (record->has_last_feedback_age_ms && + kcp_session_stats_appendf(&line, &line_len, ",\"last_feedback_age_ms\":%u", record->last_feedback_age_ms) != 0) { + goto cleanup; + } if (record->has_snd_wnd && kcp_session_stats_appendf(&line, &line_len, ",\"snd_wnd\":%u", record->snd_wnd) != 0) { goto cleanup; diff --git a/src/peer_kcp_client.c b/src/peer_kcp_client.c index 88ec65c..733f076 100644 --- a/src/peer_kcp_client.c +++ b/src/peer_kcp_client.c @@ -477,6 +477,16 @@ int kcp_client_send_text(kcp_client_t *client, const char *to, const char *text) } int kcp_client_send_binary(kcp_client_t *client, const char *to, const void *data, size_t data_len) { + return kcp_client_send_binary_with_id(client, to, data, data_len, NULL); +} + +int kcp_client_send_binary_with_id( + kcp_client_t *client, + const char *to, + const void *data, + size_t data_len, + uint64_t *out_id +) { message_t msg; uint64_t id; @@ -508,6 +518,9 @@ int kcp_client_send_binary(kcp_client_t *client, const char *to, const void *dat protocol_message_clear(&msg); return -1; } + if (out_id != NULL) { + *out_id = id; + } protocol_message_clear(&msg); return 0; } diff --git a/src/server_kcp_hub.c b/src/server_kcp_hub.c index 8a0dfb1..cdcf2f3 100644 --- a/src/server_kcp_hub.c +++ b/src/server_kcp_hub.c @@ -228,7 +228,9 @@ static int kcp_hub_add_runtime_stats_json(cJSON *object, const kcp_runtime_stats cJSON_AddNumberToObject(object, "conv", (double) stats->conv) == NULL || cJSON_AddNumberToObject(object, "rto_ms", (double) stats->rto_ms) == NULL || cJSON_AddNumberToObject(object, "srtt_ms", (double) stats->srtt_ms) == NULL || + cJSON_AddNumberToObject(object, "min_srtt_ms", (double) stats->min_srtt_ms) == NULL || cJSON_AddNumberToObject(object, "srttvar_ms", (double) stats->srttvar_ms) == NULL || + cJSON_AddNumberToObject(object, "last_feedback_age_ms", (double) stats->last_feedback_age_ms) == NULL || cJSON_AddNumberToObject(object, "snd_wnd", (double) stats->snd_wnd) == NULL || cJSON_AddNumberToObject(object, "rmt_wnd", (double) stats->rmt_wnd) == NULL || cJSON_AddNumberToObject(object, "inflight", (double) stats->inflight) == NULL || diff --git a/src/transport_kcp.c b/src/transport_kcp.c index b4b93d1..fbb3a51 100644 --- a/src/transport_kcp.c +++ b/src/transport_kcp.c @@ -72,6 +72,8 @@ struct kcp_conn { uint64_t pending_in_errs; uint64_t pending_kcp_in_errs; protocol_frame_decoder_t decoder; + int32_t min_srtt_ms; + uint32_t last_feedback_ms; uint8_t scratch[KCP_RECV_CHUNK_SIZE]; latency_logger_t *logger; char node_role[OMNI_MAX_NODE_ROLE]; @@ -307,6 +309,26 @@ static uint64_t kcp_counter_diff(uint64_t previous, uint64_t current) { return current < previous ? 0 : current - previous; } +static void kcp_conn_update_min_srtt_locked(kcp_conn_t *conn) { + int32_t srtt_ms; + + if (conn == NULL || conn->kcp == NULL) { + return; + } + srtt_ms = conn->kcp->rx_srtt; + if (srtt_ms > 0 && (conn->min_srtt_ms <= 0 || srtt_ms < conn->min_srtt_ms)) { + conn->min_srtt_ms = srtt_ms; + } +} + +static void kcp_conn_note_feedback_locked(kcp_conn_t *conn) { + if (conn == NULL) { + return; + } + conn->last_feedback_ms = omni_now_millis32(); + kcp_conn_update_min_srtt_locked(conn); +} + static int kcp_process_sampler_matches(const kcp_process_sampler_t *sampler, kcp_session_stats_logger_t *logger, const char *node_role, const char *node_id, int stats_interval_ms) { if (sampler == NULL) { return 0; @@ -1093,8 +1115,13 @@ static void kcp_log_session_snapshot(kcp_conn_t *conn, const char *reason) { record.rto_ms = conn->kcp->rx_rto; record.has_srtt_ms = 1; record.srtt_ms = conn->kcp->rx_srtt; + kcp_conn_update_min_srtt_locked(conn); + record.has_min_srtt_ms = conn->min_srtt_ms > 0; + record.min_srtt_ms = conn->min_srtt_ms; record.has_srttvar_ms = 1; record.srttvar_ms = conn->kcp->rx_rttval; + record.has_last_feedback_age_ms = conn->last_feedback_ms != 0; + record.last_feedback_age_ms = conn->last_feedback_ms == 0 ? 0 : (omni_now_millis32() - conn->last_feedback_ms); record.has_snd_wnd = 1; record.snd_wnd = conn->kcp->snd_wnd; record.has_rmt_wnd = 1; @@ -1268,6 +1295,7 @@ static void *kcp_client_recv_thread_main(void *arg) { if (ikcp_input(conn->kcp, (const char *) buffer, n) != 0) { kcp_conn_record_error(conn); } else { + kcp_conn_note_feedback_locked(conn); kcp_conn_record_input(conn, (int) n, segment_count); } pthread_mutex_unlock(&conn->kcp_mu); @@ -1630,6 +1658,7 @@ static void *kcp_listener_recv_thread_main(void *arg) { if (ikcp_input(conn->kcp, (const char *) buffer, n) != 0) { kcp_conn_record_error(conn); } else { + kcp_conn_note_feedback_locked(conn); kcp_conn_record_input(conn, (int) n, segment_count); } pthread_mutex_unlock(&conn->kcp_mu); @@ -1907,7 +1936,10 @@ void kcp_conn_runtime_stats_snapshot(kcp_conn_t *conn, kcp_runtime_stats_t *out_ out_stats->conv = conn->kcp->conv; out_stats->rto_ms = conn->kcp->rx_rto; out_stats->srtt_ms = conn->kcp->rx_srtt; + kcp_conn_update_min_srtt_locked(conn); + out_stats->min_srtt_ms = conn->min_srtt_ms; out_stats->srttvar_ms = conn->kcp->rx_rttval; + out_stats->last_feedback_age_ms = conn->last_feedback_ms == 0 ? 0 : (omni_now_millis32() - conn->last_feedback_ms); out_stats->snd_wnd = conn->kcp->snd_wnd; out_stats->rmt_wnd = conn->kcp->rmt_wnd; out_stats->inflight = conn->kcp->snd_nxt - conn->kcp->snd_una; diff --git a/src/video_pipeline.c b/src/video_pipeline.c index bad5bd1..32baf0f 100644 --- a/src/video_pipeline.c +++ b/src/video_pipeline.c @@ -46,6 +46,7 @@ typedef struct video_sender { char target_peer[OMNI_MAX_PEER_ID]; uint8_t *send_buffer; size_t send_buffer_cap; + uint64_t next_frame_seq; } video_sender_t; static int video_pipeline_stop_requested(volatile sig_atomic_t *stop_requested) { @@ -212,6 +213,7 @@ void video_pipeline_config_init(video_pipeline_config_t *config) { config->hard_backpressure_hold_ms = VIDEO_HARD_BACKPRESSURE_HOLD_MS_DEFAULT; config->frame_stall_reconnect_ms = VIDEO_DEFAULT_FRAME_STALL_RECONNECT_MS; config->stats_logger = NULL; + config->stage_logger = NULL; config->stats_interval_ms = 1000; } @@ -272,6 +274,8 @@ void video_pipeline_stats_snapshot(video_pipeline_stats_t *stats, video_pipeline out_stats->backlog_resets = stats->backlog_resets; out_stats->last_frame_bytes = stats->last_frame_bytes; out_stats->last_backlog_segments = stats->last_backlog_segments; + out_stats->last_capture_to_send_ms = stats->last_capture_to_send_ms; + out_stats->avg_capture_to_send_ms = stats->avg_capture_to_send_ms; out_stats->connected = stats->connected; snprintf(out_stats->last_error, sizeof(out_stats->last_error), "%s", stats->last_error); snprintf(out_stats->last_backlog_reason, sizeof(out_stats->last_backlog_reason), "%s", stats->last_backlog_reason); @@ -644,10 +648,12 @@ static int video_sender_drain_pending_messages(video_sender_t *sender) { static int video_sender_send_packet( video_sender_t *sender, const AVPacket *encoded_pkt, - const video_pipeline_packet_metadata_t *metadata + const video_pipeline_packet_metadata_t *metadata, + uint64_t *out_frame_seq ) { uint8_t *payload; size_t payload_len; + uint64_t frame_seq; int rc; if (sender == NULL || sender->client == NULL || encoded_pkt == NULL || metadata == NULL) { @@ -655,18 +661,31 @@ static int video_sender_send_packet( return -1; } - payload_len = (size_t) encoded_pkt->size + sizeof(*metadata); + frame_seq = sender->next_frame_seq + 1U; + payload_len = 8U + (size_t) encoded_pkt->size + sizeof(*metadata); if (video_sender_ensure_buffer_capacity(sender, payload_len) != 0) { return -1; } payload = sender->send_buffer; - memcpy(payload, encoded_pkt->data, (size_t) encoded_pkt->size); - memcpy(payload + encoded_pkt->size, metadata, sizeof(*metadata)); + payload[0] = (uint8_t) (frame_seq >> 56); + payload[1] = (uint8_t) (frame_seq >> 48); + payload[2] = (uint8_t) (frame_seq >> 40); + payload[3] = (uint8_t) (frame_seq >> 32); + payload[4] = (uint8_t) (frame_seq >> 24); + payload[5] = (uint8_t) (frame_seq >> 16); + payload[6] = (uint8_t) (frame_seq >> 8); + payload[7] = (uint8_t) frame_seq; + memcpy(payload + 8U, encoded_pkt->data, (size_t) encoded_pkt->size); + memcpy(payload + 8U + (size_t) encoded_pkt->size, metadata, sizeof(*metadata)); rc = kcp_client_send_binary(sender->client, sender->target_peer, payload, payload_len); if (rc != 0) { return rc; } + sender->next_frame_seq = frame_seq; + if (out_frame_seq != NULL) { + *out_frame_seq = frame_seq; + } rc = video_sender_drain_pending_messages(sender); return rc; } @@ -735,6 +754,109 @@ static void video_pipeline_note_backpressure( pthread_mutex_unlock(&stats->mutex); } +static void video_pipeline_note_capture_to_send(video_pipeline_stats_t *stats, uint32_t capture_to_send_ms) { + if (stats == NULL) { + return; + } + pthread_mutex_lock(&stats->mutex); + stats->last_capture_to_send_ms = capture_to_send_ms; + if (stats->avg_capture_to_send_ms <= 0.0) { + stats->avg_capture_to_send_ms = (double) capture_to_send_ms; + } else { + stats->avg_capture_to_send_ms = stats->avg_capture_to_send_ms * 0.9 + (double) capture_to_send_ms * 0.1; + } + pthread_mutex_unlock(&stats->mutex); +} + +static int video_stage_logger_should_log(const video_stage_logger_t *logger, uint64_t frame_seq) { + if (logger == NULL || !logger->enabled) { + return 0; + } + if (logger->sample_mod <= 1U) { + return 1; + } + return frame_seq % logger->sample_mod == 0U; +} + +static void video_stage_logger_log_frame( + video_stage_logger_t *logger, + uint64_t frame_seq, + double capture_ms, + double decode_ms, + double scale_ms, + double encode_ms, + double send_ms, + double pipeline_total_ms, + size_t jpeg_bytes, + uint64_t kcp_out_seg_delta, + uint32_t backlog_segments, + double window_pressure_pct, + int32_t video_srtt_ms +) { + char *line; + + if (!video_stage_logger_should_log(logger, frame_seq)) { + return; + } + line = omni_strdup_printf( + "{\"ts_unix_nano\":%" PRId64 ",\"frame_seq\":%" PRIu64 ",\"capture_ms\":%.3f,\"decode_ms\":%.3f,\"scale_ms\":%.3f,\"encode_ms\":%.3f,\"send_ms\":%.3f,\"pipeline_total_ms\":%.3f,\"jpeg_bytes\":%zu,\"kcp_out_seg_delta\":%" PRIu64 ",\"backlog_segments\":%u,\"window_pressure_pct\":%.3f,\"video_srtt_ms\":%d}", + omni_now_unix_nano(), + frame_seq, + capture_ms, + decode_ms, + scale_ms, + encode_ms, + send_ms, + pipeline_total_ms, + jpeg_bytes, + kcp_out_seg_delta, + backlog_segments, + window_pressure_pct, + video_srtt_ms + ); + if (line == NULL) { + return; + } + (void) omni_file_logger_write_line(&logger->file_logger, line); + free(line); +} + +video_stage_logger_t *video_stage_logger_open_jsonl(const char *path, uint64_t sample_mod) { + video_stage_logger_t *logger; + FILE *file; + + if (path == NULL || path[0] == '\0') { + return NULL; + } + if (omni_ensure_parent_dir(path) != 0) { + return NULL; + } + file = fopen(path, "ab"); + if (file == NULL) { + return NULL; + } + logger = (video_stage_logger_t *) calloc(1, sizeof(*logger)); + if (logger == NULL) { + fclose(file); + return NULL; + } + omni_file_logger_init_path(&logger->file_logger, file, path, 0); + logger->enabled = 1; + logger->sample_mod = sample_mod == 0U ? 1U : sample_mod; + return logger; +} + +void video_stage_logger_close(video_stage_logger_t *logger) { + if (logger == NULL) { + return; + } + if (logger->file_logger.file != NULL) { + fclose(logger->file_logger.file); + } + omni_file_logger_destroy(&logger->file_logger); + free(logger); +} + static int video_server_error_requires_reconnect(const char *message) { if (message == NULL || message[0] == '\0') { return 0; @@ -932,7 +1054,9 @@ int video_pipeline_run(const video_pipeline_config_t *config, video_pipeline_sta AVFrame *scaled_frame = NULL; AVPacket *encoded_pkt = NULL; kcp_runtime_stats_t transport_stats; + kcp_runtime_stats_t transport_after_send; int select_rc; + int should_log_stage = 0; double total_start_ms = 0.0; double capture_start_ms = 0.0; double capture_end_ms = 0.0; @@ -947,8 +1071,13 @@ int video_pipeline_run(const video_pipeline_config_t *config, video_pipeline_sta video_pipeline_packet_metadata_t packet_metadata; char reconnect_reason[256]; int frame_number = frame_index + 1; + uint64_t frame_seq = 0; + uint64_t out_segs_before_send = 0; + uint64_t out_segs_after_send = 0; + uint32_t capture_to_send_ms = 0; memset(&transport_stats, 0, sizeof(transport_stats)); + memset(&transport_after_send, 0, sizeof(transport_after_send)); memset(&packet_metadata, 0, sizeof(packet_metadata)); reconnect_reason[0] = '\0'; video_pipeline_report_progress(config); @@ -956,9 +1085,7 @@ int video_pipeline_run(const video_pipeline_config_t *config, video_pipeline_sta if (config->max_frames > 0 && frame_index >= config->max_frames) { break; } - if (config->enable_timing_logs) { - total_start_ms = video_pipeline_now_ms(); - } + total_start_ms = video_pipeline_now_ms(); FD_ZERO(&fds); FD_SET(fd, &fds); @@ -972,9 +1099,7 @@ int video_pipeline_run(const video_pipeline_config_t *config, video_pipeline_sta video_pipeline_set_errno_error(stats, "failed waiting for camera frame"); goto cleanup; } - if (config->enable_timing_logs) { - capture_start_ms = video_pipeline_now_ms(); - } + capture_start_ms = video_pipeline_now_ms(); memset(&buf, 0, sizeof(buf)); buf.type = V4L2_BUF_TYPE_VIDEO_CAPTURE; @@ -983,10 +1108,8 @@ int video_pipeline_run(const video_pipeline_config_t *config, video_pipeline_sta video_pipeline_set_errno_error(stats, "failed to dequeue V4L2 buffer"); goto cleanup; } - if (config->enable_timing_logs) { - capture_end_ms = video_pipeline_now_ms(); - decode_start_ms = capture_end_ms; - } + capture_end_ms = video_pipeline_now_ms(); + decode_start_ms = capture_end_ms; if (decode_mjpeg_frame(decoder, (const uint8_t *) buffers[buf.index].start, (int) buf.bytesused, &decoded_frame) != 0) { if (config->enable_timing_logs) { @@ -995,10 +1118,8 @@ int video_pipeline_run(const video_pipeline_config_t *config, video_pipeline_sta (void) ioctl(fd, VIDIOC_QBUF, &buf); continue; } - if (config->enable_timing_logs) { - decode_end_ms = video_pipeline_now_ms(); - scale_start_ms = decode_end_ms; - } + decode_end_ms = video_pipeline_now_ms(); + scale_start_ms = decode_end_ms; if ( ensure_scale_context( &sws_ctx, @@ -1018,10 +1139,8 @@ int video_pipeline_run(const video_pipeline_config_t *config, video_pipeline_sta (void) ioctl(fd, VIDIOC_QBUF, &buf); continue; } - if (config->enable_timing_logs) { - scale_end_ms = video_pipeline_now_ms(); - encode_start_ms = scale_end_ms; - } + scale_end_ms = video_pipeline_now_ms(); + encode_start_ms = scale_end_ms; if (encode_frame(encoder, scaled_frame, &encoded_pkt) != 0) { if (config->enable_timing_logs) { video_pipeline_print_timing_failure(frame_number, "encode"); @@ -1031,10 +1150,8 @@ int video_pipeline_run(const video_pipeline_config_t *config, video_pipeline_sta (void) ioctl(fd, VIDIOC_QBUF, &buf); continue; } - if (config->enable_timing_logs) { - encode_end_ms = video_pipeline_now_ms(); - send_start_ms = encode_end_ms; - } + encode_end_ms = video_pipeline_now_ms(); + send_start_ms = encode_end_ms; { gps_video_sample_t gps_sample = get_latest_gps_for_video(); @@ -1186,7 +1303,13 @@ int video_pipeline_run(const video_pipeline_config_t *config, video_pipeline_sta continue; } - if (video_sender_send_packet(&sender, encoded_pkt, &packet_metadata) != 0) { + capture_to_send_ms = send_start_ms <= capture_start_ms + ? 0U + : (uint32_t) (send_start_ms - capture_start_ms + 0.5); + packet_metadata.capture_to_send_ms = capture_to_send_ms; + out_segs_before_send = transport_stats.out_segs_total; + + if (video_sender_send_packet(&sender, encoded_pkt, &packet_metadata, &frame_seq) != 0) { pthread_mutex_lock(&stats->mutex); stats->send_errors += 1; pthread_mutex_unlock(&stats->mutex); @@ -1200,19 +1323,43 @@ int video_pipeline_run(const video_pipeline_config_t *config, video_pipeline_sta (void) ioctl(fd, VIDIOC_QBUF, &buf); goto cleanup; } - if (config->enable_timing_logs) { - send_end_ms = video_pipeline_now_ms(); + send_end_ms = video_pipeline_now_ms(); + should_log_stage = video_stage_logger_should_log(config->stage_logger, frame_seq); + if (should_log_stage) { + kcp_client_runtime_stats_snapshot(sender.client, &transport_after_send); + out_segs_after_send = transport_after_send.out_segs_total; + } else { + transport_after_send = transport_stats; + out_segs_after_send = out_segs_before_send; } + video_pipeline_note_capture_to_send(stats, capture_to_send_ms); pthread_mutex_lock(&stats->mutex); stats->frames_sent += 1; stats->bytes_sent += (uint64_t) encoded_pkt->size; stats->last_frame_bytes = (uint64_t) encoded_pkt->size; - kcp_client_runtime_stats_snapshot(sender.client, &stats->transport); + stats->transport = transport_after_send; pthread_mutex_unlock(&stats->mutex); have_sent_frame = 1; last_successful_send_ms = omni_now_millis32(); soft_drops_since_last_send = 0; + if (should_log_stage) { + video_stage_logger_log_frame( + config->stage_logger, + frame_seq, + capture_end_ms - capture_start_ms, + decode_end_ms - decode_start_ms, + scale_end_ms - scale_start_ms, + encode_end_ms - encode_start_ms, + send_end_ms - send_start_ms, + send_end_ms - total_start_ms, + (size_t) encoded_pkt->size, + out_segs_after_send >= out_segs_before_send ? out_segs_after_send - out_segs_before_send : 0U, + video_sender_backlog_segments(&transport_after_send), + transport_after_send.window_pressure_pct, + transport_after_send.srtt_ms + ); + } if (config->enable_timing_logs) { video_pipeline_print_timing_row( frame_number,