feat: 增加日志模块

This commit is contained in:
2026-04-18 12:52:39 +08:00
parent b700dab484
commit 212459a8e4
18 changed files with 938 additions and 42 deletions

View File

@@ -20,6 +20,8 @@
#define CONTROL_DEFAULT_PEER_ID "peer-b-ctrl" #define CONTROL_DEFAULT_PEER_ID "peer-b-ctrl"
#define CONTROL_DEFAULT_EXPECTED_SENDER "peer-a-ctrl" #define CONTROL_DEFAULT_EXPECTED_SENDER "peer-a-ctrl"
#define CONTROL_ACK_DEFAULT_PEER_ID "peer-b-ctrl-ack"
#define CONTROL_ACK_DEFAULT_TARGET_PEER "peer-a-ctrl-ack"
#define CONTROL_DEFAULT_UNIX_SOCKET "/tmp/omnisocket-b-side-cmd.sock" #define CONTROL_DEFAULT_UNIX_SOCKET "/tmp/omnisocket-b-side-cmd.sock"
#define CONTROL_DEFAULT_SERVER_IDLE_RECONNECT_MS 3000 #define CONTROL_DEFAULT_SERVER_IDLE_RECONNECT_MS 3000
#define DEFAULT_RUNTIME_DIR "/run/blitz-robot" #define DEFAULT_RUNTIME_DIR "/run/blitz-robot"
@@ -29,6 +31,7 @@
#define DEFAULT_THREAD_HEARTBEAT_TIMEOUT_SEC 15 #define DEFAULT_THREAD_HEARTBEAT_TIMEOUT_SEC 15
#define DEFAULT_KCP_STATS_INTERVAL_MS 1000 #define DEFAULT_KCP_STATS_INTERVAL_MS 1000
#define DEFAULT_CONTROL_LATENCY_SAMPLE_MOD 100 #define DEFAULT_CONTROL_LATENCY_SAMPLE_MOD 100
#define DEFAULT_CONTROL_ACK_SAMPLE_MOD 10
#define EXIT_CODE_VIDEO_THREAD_STALLED 101 #define EXIT_CODE_VIDEO_THREAD_STALLED 101
#define EXIT_CODE_CONTROL_THREAD_STALLED 102 #define EXIT_CODE_CONTROL_THREAD_STALLED 102
@@ -64,21 +67,32 @@ typedef struct daemon_state {
const char *control_bind_device; const char *control_bind_device;
const char *control_peer_id; const char *control_peer_id;
const char *control_expected_sender; const char *control_expected_sender;
const char *control_ack_peer_id;
const char *control_ack_target_peer;
const char *control_unix_socket; const char *control_unix_socket;
int control_server_idle_reconnect_ms; int control_server_idle_reconnect_ms;
const char *runtime_dir; const char *runtime_dir;
int heartbeat_timeout_sec; int heartbeat_timeout_sec;
int stats_interval_ms; int stats_interval_ms;
uint64_t control_latency_sample_mod; uint64_t control_latency_sample_mod;
uint64_t control_ack_sample_mod;
char status_file_path[512]; char status_file_path[512];
char video_thread_fault_file[512]; char video_thread_fault_file[512];
char control_thread_fault_file[512]; char control_thread_fault_file[512];
atomic_long video_thread_heartbeat_epoch_sec; atomic_long video_thread_heartbeat_epoch_sec;
atomic_long control_thread_heartbeat_epoch_sec; atomic_long control_thread_heartbeat_epoch_sec;
atomic_int control_ack_shutdown_requested;
kcp_session_stats_logger_t *stats_logger; kcp_session_stats_logger_t *stats_logger;
latency_logger_t *control_latency_logger; latency_logger_t *control_latency_logger;
video_stage_logger_t *video_stage_logger;
unix_dgram_client_t unix_client; unix_dgram_client_t unix_client;
control_bridge_stats_t control_stats; control_bridge_stats_t control_stats;
pthread_mutex_t control_ack_mutex;
pthread_t control_ack_thread;
kcp_client_t *control_ack_client;
int control_ack_thread_started;
int control_ack_connect_requested;
int control_ack_connect_inflight;
} daemon_state_t; } daemon_state_t;
static volatile sig_atomic_t g_stop_requested = 0; static volatile sig_atomic_t g_stop_requested = 0;
@@ -180,6 +194,19 @@ static int should_log_control_latency(const daemon_state_t *state, const message
return msg->id % sample_mod == 0U; return msg->id % sample_mod == 0U;
} }
static int should_send_control_ack(const daemon_state_t *state, const message_t *msg) {
uint64_t sample_mod;
if (state == NULL || msg == NULL) {
return 0;
}
sample_mod = state->control_ack_sample_mod;
if (sample_mod <= 1U) {
return 1;
}
return msg->id % sample_mod == 0U;
}
static void video_pipeline_heartbeat_progress(void *context) { static void video_pipeline_heartbeat_progress(void *context) {
update_thread_heartbeat((atomic_long *) context); update_thread_heartbeat((atomic_long *) context);
} }
@@ -259,6 +286,62 @@ static void control_bridge_stats_destroy(control_bridge_stats_t *stats) {
static void unix_dgram_client_close(unix_dgram_client_t *client); static void unix_dgram_client_close(unix_dgram_client_t *client);
static void control_bridge_stats_snapshot(control_bridge_stats_t *stats, control_bridge_stats_t *out_stats); static void control_bridge_stats_snapshot(control_bridge_stats_t *stats, control_bridge_stats_t *out_stats);
static void close_control_ack_client(kcp_client_t **client_ptr);
static int control_ack_enabled(const daemon_state_t *state) {
return state != NULL
&& state->control_ack_peer_id != NULL
&& state->control_ack_peer_id[0] != '\0'
&& state->control_ack_target_peer != NULL
&& state->control_ack_target_peer[0] != '\0';
}
static int control_ack_manager_init(daemon_state_t *state) {
int rc;
if (state == NULL) {
errno = EINVAL;
return -1;
}
rc = pthread_mutex_init(&state->control_ack_mutex, NULL);
if (rc != 0) {
errno = rc;
return -1;
}
atomic_init(&state->control_ack_shutdown_requested, 0);
state->control_ack_client = NULL;
state->control_ack_thread_started = 0;
state->control_ack_connect_requested = 0;
state->control_ack_connect_inflight = 0;
return 0;
}
static void control_ack_manager_reset(daemon_state_t *state, int request_connect) {
kcp_client_t *client = NULL;
if (state == NULL) {
return;
}
pthread_mutex_lock(&state->control_ack_mutex);
client = state->control_ack_client;
state->control_ack_client = NULL;
state->control_ack_connect_requested = request_connect && control_ack_enabled(state) && state->control_ack_thread_started;
pthread_mutex_unlock(&state->control_ack_mutex);
close_control_ack_client(&client);
}
static void control_ack_manager_destroy(daemon_state_t *state) {
if (state == NULL) {
return;
}
atomic_store(&state->control_ack_shutdown_requested, 1);
if (state->control_ack_thread_started) {
pthread_join(state->control_ack_thread, NULL);
state->control_ack_thread_started = 0;
}
control_ack_manager_reset(state, 0);
pthread_mutex_destroy(&state->control_ack_mutex);
}
static int write_status_json_atomic(const char *path, cJSON *root) { static int write_status_json_atomic(const char *path, cJSON *root) {
char *json; char *json;
@@ -348,6 +431,8 @@ static int write_daemon_status_file(daemon_state_t *state) {
cJSON_AddNumberToObject(root, "video_frames_sent", (double) video_stats.frames_sent); cJSON_AddNumberToObject(root, "video_frames_sent", (double) video_stats.frames_sent);
cJSON_AddNumberToObject(root, "video_send_errors", (double) video_stats.send_errors); cJSON_AddNumberToObject(root, "video_send_errors", (double) video_stats.send_errors);
cJSON_AddNumberToObject(root, "video_backlog_resets", (double) video_stats.backlog_resets); cJSON_AddNumberToObject(root, "video_backlog_resets", (double) video_stats.backlog_resets);
cJSON_AddNumberToObject(root, "video_last_capture_to_send_ms", (double) video_stats.last_capture_to_send_ms);
cJSON_AddNumberToObject(root, "video_avg_capture_to_send_ms", video_stats.avg_capture_to_send_ms);
cJSON_AddStringToObject(root, "video_last_error", video_stats.last_error); cJSON_AddStringToObject(root, "video_last_error", video_stats.last_error);
cJSON_AddBoolToObject(root, "control_registered", control_stats.registered != 0); cJSON_AddBoolToObject(root, "control_registered", control_stats.registered != 0);
cJSON_AddNumberToObject(root, "control_reconnect_count", (double) control_stats.reconnect_count); cJSON_AddNumberToObject(root, "control_reconnect_count", (double) control_stats.reconnect_count);
@@ -467,6 +552,147 @@ static void control_message_body_to_cstr(const message_t *msg, char *buffer, siz
buffer[copy_len] = '\0'; buffer[copy_len] = '\0';
} }
static kcp_client_t *connect_control_ack_client(const daemon_state_t *state) {
kcp_conn_options_t options;
if (state == NULL || state->control_ack_peer_id == NULL || state->control_ack_peer_id[0] == '\0') {
errno = EINVAL;
return NULL;
}
kcp_conn_options_set_control_defaults(&options);
return kcp_client_dial_with_options(
state->control_server_addr,
state->control_relay_via,
state->control_ack_peer_id,
state->control_bind_ip,
state->control_bind_device,
&options,
NULL,
NULL,
state->stats_logger,
state->stats_interval_ms
);
}
static void close_control_ack_client(kcp_client_t **client_ptr) {
if (client_ptr == NULL || *client_ptr == NULL) {
return;
}
kcp_client_close(*client_ptr);
kcp_client_free(*client_ptr);
*client_ptr = NULL;
}
static void control_ack_manager_request_connect(daemon_state_t *state) {
if (state == NULL || !control_ack_enabled(state) || !state->control_ack_thread_started) {
return;
}
pthread_mutex_lock(&state->control_ack_mutex);
if (state->control_ack_client == NULL) {
state->control_ack_connect_requested = 1;
}
pthread_mutex_unlock(&state->control_ack_mutex);
}
static void *control_ack_thread_main(void *arg) {
daemon_state_t *state = (daemon_state_t *) arg;
while (!atomic_load(&state->control_ack_shutdown_requested) && !*state->stop_requested) {
kcp_client_t *client = NULL;
int connect_failed = 0;
int should_connect = 0;
pthread_mutex_lock(&state->control_ack_mutex);
if (state->control_ack_connect_requested && state->control_ack_client == NULL && !state->control_ack_connect_inflight) {
state->control_ack_connect_inflight = 1;
should_connect = 1;
}
pthread_mutex_unlock(&state->control_ack_mutex);
if (!should_connect) {
usleep(200000);
continue;
}
client = connect_control_ack_client(state);
connect_failed = client == NULL;
pthread_mutex_lock(&state->control_ack_mutex);
state->control_ack_connect_inflight = 0;
if (
client != NULL
&& state->control_ack_connect_requested
&& state->control_ack_client == NULL
&& !atomic_load(&state->control_ack_shutdown_requested)
&& !*state->stop_requested
) {
state->control_ack_client = client;
state->control_ack_connect_requested = 0;
client = NULL;
}
pthread_mutex_unlock(&state->control_ack_mutex);
if (client != NULL) {
close_control_ack_client(&client);
}
if (connect_failed && !atomic_load(&state->control_ack_shutdown_requested) && !*state->stop_requested) {
sleep(1);
}
}
return NULL;
}
static void maybe_send_control_ack(
daemon_state_t *state,
const message_t *msg,
int64_t recv_unix_nano,
int64_t persist_end_unix_nano,
const char *sample_reason
) {
kcp_client_t *ack_client = NULL;
kcp_client_t *client_to_close = NULL;
char *payload = NULL;
int send_rc = -1;
if (
state == NULL || msg == NULL || recv_unix_nano <= 0 || persist_end_unix_nano <= recv_unix_nano
|| !control_ack_enabled(state) || !state->control_ack_thread_started
) {
return;
}
payload = omni_strdup_printf(
"{\"message_id\":%" PRIu64 ",\"ack_phase\":\"persist_end\",\"b_recv_to_persist_us\":%" PRId64 ",\"unix_send_ok\":true,\"sample_reason\":\"%s\"}",
msg->id,
(persist_end_unix_nano - recv_unix_nano) / 1000,
sample_reason == NULL ? "sample_mod" : sample_reason
);
if (payload == NULL) {
return;
}
pthread_mutex_lock(&state->control_ack_mutex);
ack_client = state->control_ack_client;
if (ack_client == NULL) {
state->control_ack_connect_requested = 1;
pthread_mutex_unlock(&state->control_ack_mutex);
free(payload);
return;
}
send_rc = kcp_client_send_text(ack_client, state->control_ack_target_peer, payload);
if (send_rc != 0) {
client_to_close = state->control_ack_client;
state->control_ack_client = NULL;
state->control_ack_connect_requested = 1;
}
pthread_mutex_unlock(&state->control_ack_mutex);
free(payload);
if (client_to_close != NULL) {
close_control_ack_client(&client_to_close);
}
}
static int unix_dgram_client_init(unix_dgram_client_t *client, const char *dest_path) { static int unix_dgram_client_init(unix_dgram_client_t *client, const char *dest_path) {
struct sockaddr_un bind_addr; struct sockaddr_un bind_addr;
pid_t pid; pid_t pid;
@@ -618,11 +844,17 @@ static void *control_thread_main(void *arg) {
kcp_client_runtime_stats_snapshot(client, &state->control_stats.transport); kcp_client_runtime_stats_snapshot(client, &state->control_stats.transport);
pthread_mutex_unlock(&state->control_stats.mutex); pthread_mutex_unlock(&state->control_stats.mutex);
} }
control_ack_manager_request_connect(state);
while (!*state->stop_requested) { while (!*state->stop_requested) {
message_t msg; message_t msg;
int rc; int rc;
kcp_client_state_t client_state; kcp_client_state_t client_state;
int ack_sampled = 0;
int log_control_latency = 0;
int64_t recv_unix_nano = 0;
int64_t persist_begin_unix_nano = 0;
int64_t persist_end_unix_nano = 0;
update_thread_heartbeat(&state->control_thread_heartbeat_epoch_sec); update_thread_heartbeat(&state->control_thread_heartbeat_epoch_sec);
protocol_message_init(&msg); protocol_message_init(&msg);
@@ -727,9 +959,27 @@ static void *control_thread_main(void *arg) {
continue; continue;
} }
if (should_log_control_latency(state, &msg)) { ack_sampled = should_send_control_ack(state, &msg);
latencylog_log_message_event(state->control_latency_logger, OMNI_NODE_ROLE_PEER, state->control_peer_id, EVENT_B_APP_RECV, &msg); log_control_latency = ack_sampled || should_log_control_latency(state, &msg);
latencylog_log_message_event(state->control_latency_logger, OMNI_NODE_ROLE_PEER, state->control_peer_id, EVENT_B_PERSIST_BEGIN, &msg); if (log_control_latency) {
recv_unix_nano = omni_now_unix_nano();
persist_begin_unix_nano = recv_unix_nano;
latencylog_log_message_event_at(
state->control_latency_logger,
OMNI_NODE_ROLE_PEER,
state->control_peer_id,
EVENT_B_APP_RECV,
recv_unix_nano,
&msg
);
latencylog_log_message_event_at(
state->control_latency_logger,
OMNI_NODE_ROLE_PEER,
state->control_peer_id,
EVENT_B_PERSIST_BEGIN,
persist_begin_unix_nano,
&msg
);
} }
if (unix_dgram_client_send(&state->unix_client, msg.body, msg.body_len) != 0) { if (unix_dgram_client_send(&state->unix_client, msg.body, msg.body_len) != 0) {
@@ -748,8 +998,19 @@ static void *control_thread_main(void *arg) {
state->control_stats.server_idle_ms = client_state.server_idle_ms; state->control_stats.server_idle_ms = client_state.server_idle_ms;
kcp_client_runtime_stats_snapshot(client, &state->control_stats.transport); kcp_client_runtime_stats_snapshot(client, &state->control_stats.transport);
pthread_mutex_unlock(&state->control_stats.mutex); pthread_mutex_unlock(&state->control_stats.mutex);
if (should_log_control_latency(state, &msg)) { if (log_control_latency) {
latencylog_log_message_event(state->control_latency_logger, OMNI_NODE_ROLE_PEER, state->control_peer_id, EVENT_B_PERSIST_END, &msg); persist_end_unix_nano = omni_now_unix_nano();
latencylog_log_message_event_at(
state->control_latency_logger,
OMNI_NODE_ROLE_PEER,
state->control_peer_id,
EVENT_B_PERSIST_END,
persist_end_unix_nano,
&msg
);
}
if (ack_sampled) {
maybe_send_control_ack(state, &msg, recv_unix_nano, persist_end_unix_nano, "sample_mod");
} }
protocol_message_clear(&msg); protocol_message_clear(&msg);
continue; continue;
@@ -771,8 +1032,19 @@ static void *control_thread_main(void *arg) {
state->control_stats.server_idle_ms = client_state.server_idle_ms; state->control_stats.server_idle_ms = client_state.server_idle_ms;
kcp_client_runtime_stats_snapshot(client, &state->control_stats.transport); kcp_client_runtime_stats_snapshot(client, &state->control_stats.transport);
pthread_mutex_unlock(&state->control_stats.mutex); pthread_mutex_unlock(&state->control_stats.mutex);
if (should_log_control_latency(state, &msg)) { if (log_control_latency) {
latencylog_log_message_event(state->control_latency_logger, OMNI_NODE_ROLE_PEER, state->control_peer_id, EVENT_B_PERSIST_END, &msg); persist_end_unix_nano = omni_now_unix_nano();
latencylog_log_message_event_at(
state->control_latency_logger,
OMNI_NODE_ROLE_PEER,
state->control_peer_id,
EVENT_B_PERSIST_END,
persist_end_unix_nano,
&msg
);
}
if (ack_sampled) {
maybe_send_control_ack(state, &msg, recv_unix_nano, persist_end_unix_nano, "sample_mod");
} }
protocol_message_clear(&msg); protocol_message_clear(&msg);
} }
@@ -781,6 +1053,7 @@ static void *control_thread_main(void *arg) {
state->control_stats.registered = 0; state->control_stats.registered = 0;
state->control_stats.server_idle_ms = 0; state->control_stats.server_idle_ms = 0;
pthread_mutex_unlock(&state->control_stats.mutex); pthread_mutex_unlock(&state->control_stats.mutex);
control_ack_manager_reset(state, 0);
kcp_client_close(client); kcp_client_close(client);
kcp_client_free(client); kcp_client_free(client);
if (!*state->stop_requested && !reconnect_immediately) { if (!*state->stop_requested && !reconnect_immediately) {
@@ -802,13 +1075,15 @@ static void print_stats(daemon_state_t *state) {
fprintf( fprintf(
stderr, stderr,
"[b_side_omnid] video registered=%d frames=%llu bytes=%llu drops=%llu resets=%llu backlog=%u reason=%s srtt=%dms | control registered=%d idle=%ums reconnects=%llu forwarded=%llu invalid=%llu unix_err=%llu srtt=%dms last_reconnect=%s\n", "[b_side_omnid] video registered=%d frames=%llu bytes=%llu drops=%llu resets=%llu backlog=%u cap2send=%ums avg=%.1fms reason=%s srtt=%dms | control registered=%d idle=%ums reconnects=%llu forwarded=%llu invalid=%llu unix_err=%llu srtt=%dms last_reconnect=%s\n",
video_stats.connected, video_stats.connected,
(unsigned long long) video_stats.frames_sent, (unsigned long long) video_stats.frames_sent,
(unsigned long long) video_stats.bytes_sent, (unsigned long long) video_stats.bytes_sent,
(unsigned long long) video_stats.backpressure_drops, (unsigned long long) video_stats.backpressure_drops,
(unsigned long long) video_stats.backlog_resets, (unsigned long long) video_stats.backlog_resets,
video_stats.last_backlog_segments, video_stats.last_backlog_segments,
video_stats.last_capture_to_send_ms,
video_stats.avg_capture_to_send_ms,
video_stats.last_backlog_reason[0] == '\0' ? "-" : video_stats.last_backlog_reason, video_stats.last_backlog_reason[0] == '\0' ? "-" : video_stats.last_backlog_reason,
video_stats.transport.srtt_ms, video_stats.transport.srtt_ms,
control_stats.registered, control_stats.registered,
@@ -839,6 +1114,8 @@ int main(void) {
state.control_bind_device = env_first_nonempty("OMNI_CONTROL_BIND_DEVICE", "OMNISOCKET_BIND_DEVICE", ""); state.control_bind_device = env_first_nonempty("OMNI_CONTROL_BIND_DEVICE", "OMNISOCKET_BIND_DEVICE", "");
state.control_peer_id = env_or_default("OMNI_CONTROL_PEER_ID", CONTROL_DEFAULT_PEER_ID); state.control_peer_id = env_or_default("OMNI_CONTROL_PEER_ID", CONTROL_DEFAULT_PEER_ID);
state.control_expected_sender = env_or_default("OMNI_CONTROL_EXPECTED_SENDER", CONTROL_DEFAULT_EXPECTED_SENDER); state.control_expected_sender = env_or_default("OMNI_CONTROL_EXPECTED_SENDER", CONTROL_DEFAULT_EXPECTED_SENDER);
state.control_ack_peer_id = env_or_default("OMNI_CONTROL_ACK_PEER_ID", CONTROL_ACK_DEFAULT_PEER_ID);
state.control_ack_target_peer = env_or_default("OMNI_CONTROL_ACK_TARGET_PEER", CONTROL_ACK_DEFAULT_TARGET_PEER);
state.control_unix_socket = env_or_default("OMNI_CONTROL_UNIX_SOCKET_PATH", CONTROL_DEFAULT_UNIX_SOCKET); state.control_unix_socket = env_or_default("OMNI_CONTROL_UNIX_SOCKET_PATH", CONTROL_DEFAULT_UNIX_SOCKET);
state.runtime_dir = env_or_default("BLITZ_RUNTIME_DIR", DEFAULT_RUNTIME_DIR); state.runtime_dir = env_or_default("BLITZ_RUNTIME_DIR", DEFAULT_RUNTIME_DIR);
state.heartbeat_timeout_sec = env_int_or_default( state.heartbeat_timeout_sec = env_int_or_default(
@@ -847,9 +1124,11 @@ int main(void) {
); );
state.stats_interval_ms = env_int_or_default("BLITZ_KCP_STATS_INTERVAL_MS", DEFAULT_KCP_STATS_INTERVAL_MS); state.stats_interval_ms = env_int_or_default("BLITZ_KCP_STATS_INTERVAL_MS", DEFAULT_KCP_STATS_INTERVAL_MS);
state.control_latency_sample_mod = env_u64_or_default("BLITZ_CONTROL_LATENCY_LOG_SAMPLE_MOD", DEFAULT_CONTROL_LATENCY_SAMPLE_MOD); state.control_latency_sample_mod = env_u64_or_default("BLITZ_CONTROL_LATENCY_LOG_SAMPLE_MOD", DEFAULT_CONTROL_LATENCY_SAMPLE_MOD);
state.control_ack_sample_mod = env_u64_or_default("BLITZ_CONTROL_ACK_SAMPLE_MOD", DEFAULT_CONTROL_ACK_SAMPLE_MOD);
state.video_config.progress_callback = video_pipeline_heartbeat_progress; state.video_config.progress_callback = video_pipeline_heartbeat_progress;
state.video_config.progress_context = &state.video_thread_heartbeat_epoch_sec; state.video_config.progress_context = &state.video_thread_heartbeat_epoch_sec;
state.video_config.stats_logger = NULL; state.video_config.stats_logger = NULL;
state.video_config.stage_logger = NULL;
state.video_config.stats_interval_ms = state.stats_interval_ms; state.video_config.stats_interval_ms = state.stats_interval_ms;
state.control_server_idle_reconnect_ms = env_int_or_default( state.control_server_idle_reconnect_ms = env_int_or_default(
"OMNI_CONTROL_SERVER_IDLE_RECONNECT_MS", "OMNI_CONTROL_SERVER_IDLE_RECONNECT_MS",
@@ -889,8 +1168,15 @@ int main(void) {
video_pipeline_stats_destroy(&state.video_stats); video_pipeline_stats_destroy(&state.video_stats);
return 1; return 1;
} }
if (control_ack_manager_init(&state) != 0) {
perror("control_ack_manager_init");
control_bridge_stats_destroy(&state.control_stats);
video_pipeline_stats_destroy(&state.video_stats);
return 1;
}
if (unix_dgram_client_init(&state.unix_client, state.control_unix_socket) != 0) { if (unix_dgram_client_init(&state.unix_client, state.control_unix_socket) != 0) {
perror("unix_dgram_client_init"); perror("unix_dgram_client_init");
control_ack_manager_destroy(&state);
control_bridge_stats_destroy(&state.control_stats); control_bridge_stats_destroy(&state.control_stats);
video_pipeline_stats_destroy(&state.video_stats); video_pipeline_stats_destroy(&state.video_stats);
return 1; return 1;
@@ -905,6 +1191,7 @@ int main(void) {
if (install_signal_handler(SIGINT) != 0 || install_signal_handler(SIGTERM) != 0) { if (install_signal_handler(SIGINT) != 0 || install_signal_handler(SIGTERM) != 0) {
perror("install_signal_handler"); perror("install_signal_handler");
unix_dgram_client_close(&state.unix_client); unix_dgram_client_close(&state.unix_client);
control_ack_manager_destroy(&state);
control_bridge_stats_destroy(&state.control_stats); control_bridge_stats_destroy(&state.control_stats);
video_pipeline_stats_destroy(&state.video_stats); video_pipeline_stats_destroy(&state.video_stats);
return 1; return 1;
@@ -913,7 +1200,10 @@ int main(void) {
{ {
const char *stats_log_path = getenv("BLITZ_KCP_STATS_LOG_PATH"); const char *stats_log_path = getenv("BLITZ_KCP_STATS_LOG_PATH");
const char *latency_log_path = getenv("BLITZ_CONTROL_LATENCY_LOG_PATH"); const char *latency_log_path = getenv("BLITZ_CONTROL_LATENCY_LOG_PATH");
const char *video_stage_log_path = getenv("BLITZ_VIDEO_STAGE_LOG_PATH");
int latency_enabled = env_int_or_default("BLITZ_CONTROL_LATENCY_LOG_ENABLED", 1); int latency_enabled = env_int_or_default("BLITZ_CONTROL_LATENCY_LOG_ENABLED", 1);
int video_stage_log_enabled = env_int_or_default("BLITZ_VIDEO_STAGE_LOG_ENABLED", 1);
uint64_t video_stage_log_sample_mod = env_u64_or_default("BLITZ_VIDEO_STAGE_LOG_SAMPLE_MOD", 10);
if (stats_log_path != NULL && stats_log_path[0] != '\0') { if (stats_log_path != NULL && stats_log_path[0] != '\0') {
state.stats_logger = kcp_session_stats_open_jsonl(stats_log_path); state.stats_logger = kcp_session_stats_open_jsonl(stats_log_path);
@@ -927,16 +1217,33 @@ int main(void) {
fprintf(stderr, "[b_side_omnid] warning: failed to open control latency log %s\n", latency_log_path); fprintf(stderr, "[b_side_omnid] warning: failed to open control latency log %s\n", latency_log_path);
} }
} }
if (video_stage_log_enabled && video_stage_log_path != NULL && video_stage_log_path[0] != '\0') {
state.video_stage_logger = video_stage_logger_open_jsonl(video_stage_log_path, video_stage_log_sample_mod);
if (state.video_stage_logger == NULL) {
fprintf(stderr, "[b_side_omnid] warning: failed to open video stage log %s\n", video_stage_log_path);
}
}
state.video_config.stats_logger = state.stats_logger; state.video_config.stats_logger = state.stats_logger;
state.video_config.stage_logger = state.video_stage_logger;
state.video_config.stats_interval_ms = state.stats_interval_ms; state.video_config.stats_interval_ms = state.stats_interval_ms;
} }
if (control_ack_enabled(&state)) {
if (pthread_create(&state.control_ack_thread, NULL, control_ack_thread_main, &state) != 0) {
fprintf(stderr, "[b_side_omnid] warning: failed to start async control ACK manager, ACK sampling disabled\n");
} else {
state.control_ack_thread_started = 1;
}
}
if (pthread_create(&video_thread, NULL, video_thread_main, &state) != 0) { if (pthread_create(&video_thread, NULL, video_thread_main, &state) != 0) {
perror("pthread_create(video_thread)"); perror("pthread_create(video_thread)");
unix_dgram_client_close(&state.unix_client); unix_dgram_client_close(&state.unix_client);
control_ack_manager_destroy(&state);
control_bridge_stats_destroy(&state.control_stats); control_bridge_stats_destroy(&state.control_stats);
video_pipeline_stats_destroy(&state.video_stats); video_pipeline_stats_destroy(&state.video_stats);
latencylog_close(state.control_latency_logger); latencylog_close(state.control_latency_logger);
video_stage_logger_close(state.video_stage_logger);
kcp_session_stats_close(state.stats_logger); kcp_session_stats_close(state.stats_logger);
return 1; return 1;
} }
@@ -945,9 +1252,11 @@ int main(void) {
g_stop_requested = 1; g_stop_requested = 1;
pthread_join(video_thread, NULL); pthread_join(video_thread, NULL);
unix_dgram_client_close(&state.unix_client); unix_dgram_client_close(&state.unix_client);
control_ack_manager_destroy(&state);
control_bridge_stats_destroy(&state.control_stats); control_bridge_stats_destroy(&state.control_stats);
video_pipeline_stats_destroy(&state.video_stats); video_pipeline_stats_destroy(&state.video_stats);
latencylog_close(state.control_latency_logger); latencylog_close(state.control_latency_logger);
video_stage_logger_close(state.video_stage_logger);
kcp_session_stats_close(state.stats_logger); kcp_session_stats_close(state.stats_logger);
return 1; return 1;
} }
@@ -964,9 +1273,11 @@ int main(void) {
pthread_join(video_thread, NULL); pthread_join(video_thread, NULL);
pthread_join(control_thread, NULL); pthread_join(control_thread, NULL);
unix_dgram_client_close(&state.unix_client); unix_dgram_client_close(&state.unix_client);
control_ack_manager_destroy(&state);
control_bridge_stats_destroy(&state.control_stats); control_bridge_stats_destroy(&state.control_stats);
video_pipeline_stats_destroy(&state.video_stats); video_pipeline_stats_destroy(&state.video_stats);
latencylog_close(state.control_latency_logger); latencylog_close(state.control_latency_logger);
video_stage_logger_close(state.video_stage_logger);
kcp_session_stats_close(state.stats_logger); kcp_session_stats_close(state.stats_logger);
return 0; return 0;
} }

View File

@@ -24,8 +24,12 @@ typedef struct kcp_session_stats_record {
uint32_t rto_ms; uint32_t rto_ms;
int has_srtt_ms; int has_srtt_ms;
int32_t srtt_ms; int32_t srtt_ms;
int has_min_srtt_ms;
int32_t min_srtt_ms;
int has_srttvar_ms; int has_srttvar_ms;
int32_t srttvar_ms; int32_t srttvar_ms;
int has_last_feedback_age_ms;
uint32_t last_feedback_age_ms;
int has_snd_wnd; int has_snd_wnd;
uint32_t snd_wnd; uint32_t snd_wnd;
int has_rmt_wnd; int has_rmt_wnd;

View File

@@ -28,6 +28,7 @@ kcp_client_t *kcp_client_dial(const char *server_addr, const char *dial_addr, co
const char *kcp_client_id(const kcp_client_t *client); const char *kcp_client_id(const kcp_client_t *client);
int kcp_client_send_text(kcp_client_t *client, const char *to, const char *text); int kcp_client_send_text(kcp_client_t *client, const char *to, const char *text);
int kcp_client_send_binary(kcp_client_t *client, const char *to, const void *data, size_t data_len); int kcp_client_send_binary(kcp_client_t *client, const char *to, const void *data, size_t data_len);
int kcp_client_send_binary_with_id(kcp_client_t *client, const char *to, const void *data, size_t data_len, uint64_t *out_id);
int kcp_client_send_file_path(kcp_client_t *client, const char *to, const char *path); int kcp_client_send_file_path(kcp_client_t *client, const char *to, const char *path);
int kcp_client_receive_timed(kcp_client_t *client, message_t *out_msg, int timeout_ms); int kcp_client_receive_timed(kcp_client_t *client, message_t *out_msg, int timeout_ms);
int kcp_client_receive(kcp_client_t *client, message_t *out_msg); int kcp_client_receive(kcp_client_t *client, message_t *out_msg);

View File

@@ -56,7 +56,9 @@ typedef struct kcp_runtime_stats {
uint32_t conv; uint32_t conv;
uint32_t rto_ms; uint32_t rto_ms;
int32_t srtt_ms; int32_t srtt_ms;
int32_t min_srtt_ms;
int32_t srttvar_ms; int32_t srttvar_ms;
uint32_t last_feedback_age_ms;
uint32_t snd_wnd; uint32_t snd_wnd;
uint32_t rmt_wnd; uint32_t rmt_wnd;
uint32_t inflight; uint32_t inflight;

View File

@@ -6,22 +6,34 @@
#include <stdint.h> #include <stdint.h>
#include "gps_buffer.h" #include "gps_buffer.h"
#include "omni_common.h"
#include "peer_kcp_client.h" #include "peer_kcp_client.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
#if defined(__GNUC__)
typedef struct __attribute__((packed)) video_pipeline_packet_metadata {
#else
typedef struct video_pipeline_packet_metadata { typedef struct video_pipeline_packet_metadata {
#endif
uint64_t timestamp_ms; uint64_t timestamp_ms;
double latitude; double latitude;
double longitude; double longitude;
uint32_t capture_to_send_ms;
} video_pipeline_packet_metadata_t; } video_pipeline_packet_metadata_t;
typedef struct video_stage_logger {
omni_file_logger_t file_logger;
int enabled;
uint64_t sample_mod;
} video_stage_logger_t;
typedef void (*video_pipeline_progress_fn)(void *context); typedef void (*video_pipeline_progress_fn)(void *context);
#if defined(__STDC_VERSION__) && __STDC_VERSION__ >= 201112L #if defined(__STDC_VERSION__) && __STDC_VERSION__ >= 201112L
_Static_assert(sizeof(video_pipeline_packet_metadata_t) == 24, "video trailer metadata must be 24 bytes"); _Static_assert(sizeof(video_pipeline_packet_metadata_t) == 28, "video trailer metadata must be 28 bytes");
#endif #endif
typedef struct video_pipeline_config { typedef struct video_pipeline_config {
@@ -43,6 +55,7 @@ typedef struct video_pipeline_config {
int hard_backpressure_hold_ms; int hard_backpressure_hold_ms;
int frame_stall_reconnect_ms; int frame_stall_reconnect_ms;
kcp_session_stats_logger_t *stats_logger; kcp_session_stats_logger_t *stats_logger;
video_stage_logger_t *stage_logger;
int stats_interval_ms; int stats_interval_ms;
video_pipeline_progress_fn progress_callback; video_pipeline_progress_fn progress_callback;
void *progress_context; void *progress_context;
@@ -57,6 +70,8 @@ typedef struct video_pipeline_stats {
uint64_t backlog_resets; uint64_t backlog_resets;
uint64_t last_frame_bytes; uint64_t last_frame_bytes;
uint32_t last_backlog_segments; uint32_t last_backlog_segments;
uint32_t last_capture_to_send_ms;
double avg_capture_to_send_ms;
int connected; int connected;
char last_error[256]; char last_error[256];
char last_backlog_reason[128]; char last_backlog_reason[128];
@@ -70,6 +85,8 @@ void video_pipeline_config_load_env(video_pipeline_config_t *config);
int video_pipeline_stats_init(video_pipeline_stats_t *stats); int video_pipeline_stats_init(video_pipeline_stats_t *stats);
void video_pipeline_stats_destroy(video_pipeline_stats_t *stats); void video_pipeline_stats_destroy(video_pipeline_stats_t *stats);
void video_pipeline_stats_snapshot(video_pipeline_stats_t *stats, video_pipeline_stats_t *out_stats); void video_pipeline_stats_snapshot(video_pipeline_stats_t *stats, video_pipeline_stats_t *out_stats);
video_stage_logger_t *video_stage_logger_open_jsonl(const char *path, uint64_t sample_mod);
void video_stage_logger_close(video_stage_logger_t *logger);
int video_pipeline_run(const video_pipeline_config_t *config, video_pipeline_stats_t *stats, volatile sig_atomic_t *stop_requested); int video_pipeline_run(const video_pipeline_config_t *config, video_pipeline_stats_t *stats, volatile sig_atomic_t *stop_requested);
#ifdef __cplusplus #ifdef __cplusplus

View File

@@ -119,7 +119,9 @@ static PyObject *build_kcp_stats_dict(const omnisocket_session_kcp_stats_t *stat
SET_KCP_STAT("conv", PyLong_FromUnsignedLong(stats->conv)); SET_KCP_STAT("conv", PyLong_FromUnsignedLong(stats->conv));
SET_KCP_STAT("rto_ms", PyLong_FromUnsignedLong(stats->rto_ms)); SET_KCP_STAT("rto_ms", PyLong_FromUnsignedLong(stats->rto_ms));
SET_KCP_STAT("srtt_ms", PyLong_FromLong(stats->srtt_ms)); SET_KCP_STAT("srtt_ms", PyLong_FromLong(stats->srtt_ms));
SET_KCP_STAT("min_srtt_ms", PyLong_FromLong(stats->min_srtt_ms));
SET_KCP_STAT("srttvar_ms", PyLong_FromLong(stats->srttvar_ms)); SET_KCP_STAT("srttvar_ms", PyLong_FromLong(stats->srttvar_ms));
SET_KCP_STAT("last_feedback_age_ms", PyLong_FromUnsignedLong(stats->last_feedback_age_ms));
SET_KCP_STAT("snd_wnd", PyLong_FromUnsignedLong(stats->snd_wnd)); SET_KCP_STAT("snd_wnd", PyLong_FromUnsignedLong(stats->snd_wnd));
SET_KCP_STAT("rmt_wnd", PyLong_FromUnsignedLong(stats->rmt_wnd)); SET_KCP_STAT("rmt_wnd", PyLong_FromUnsignedLong(stats->rmt_wnd));
SET_KCP_STAT("inflight", PyLong_FromUnsignedLong(stats->inflight)); SET_KCP_STAT("inflight", PyLong_FromUnsignedLong(stats->inflight));
@@ -279,6 +281,29 @@ static PyObject *PyOmniSession_send(PyOmniSession *self, PyObject *args, PyObjec
Py_RETURN_NONE; Py_RETURN_NONE;
} }
static PyObject *PyOmniSession_send_with_id(PyOmniSession *self, PyObject *args, PyObject *kwargs) {
const char *to;
Py_buffer payload;
int rc;
uint64_t message_id = 0;
static char *kwlist[] = {"to", "data", NULL};
memset(&payload, 0, sizeof(payload));
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "sy*", kwlist, &to, &payload)) {
return NULL;
}
Py_BEGIN_ALLOW_THREADS
rc = omnisocket_session_send_with_id(&self->session, to, payload.buf, (size_t) payload.len, &message_id);
Py_END_ALLOW_THREADS
PyBuffer_Release(&payload);
if (rc != 0) {
return PyErr_SetFromErrno(PyExc_OSError);
}
return PyLong_FromUnsignedLongLong((unsigned long long) message_id);
}
static PyObject *PyOmniSession_recv(PyOmniSession *self, PyObject *args, PyObject *kwargs) { static PyObject *PyOmniSession_recv(PyOmniSession *self, PyObject *args, PyObject *kwargs) {
int timeout_ms = -1; int timeout_ms = -1;
int rc; int rc;
@@ -379,6 +404,7 @@ static PyMethodDef PyOmniSession_methods[] = {
{"connect", (PyCFunction) PyOmniSession_connect, METH_VARARGS | METH_KEYWORDS, NULL}, {"connect", (PyCFunction) PyOmniSession_connect, METH_VARARGS | METH_KEYWORDS, NULL},
{"close", (PyCFunction) PyOmniSession_close, METH_NOARGS, NULL}, {"close", (PyCFunction) PyOmniSession_close, METH_NOARGS, NULL},
{"send", (PyCFunction) PyOmniSession_send, METH_VARARGS | METH_KEYWORDS, NULL}, {"send", (PyCFunction) PyOmniSession_send, METH_VARARGS | METH_KEYWORDS, NULL},
{"send_with_id", (PyCFunction) PyOmniSession_send_with_id, METH_VARARGS | METH_KEYWORDS, NULL},
{"recv", (PyCFunction) PyOmniSession_recv, METH_VARARGS | METH_KEYWORDS, PyOmniSession_recv_doc}, {"recv", (PyCFunction) PyOmniSession_recv, METH_VARARGS | METH_KEYWORDS, PyOmniSession_recv_doc},
{"recv_into", (PyCFunction) PyOmniSession_recv_into, METH_VARARGS | METH_KEYWORDS, PyOmniSession_recv_into_doc}, {"recv_into", (PyCFunction) PyOmniSession_recv_into, METH_VARARGS | METH_KEYWORDS, PyOmniSession_recv_into_doc},
{"stats", (PyCFunction) PyOmniSession_stats, METH_NOARGS, NULL}, {"stats", (PyCFunction) PyOmniSession_stats, METH_NOARGS, NULL},

View File

@@ -167,6 +167,16 @@ int omnisocket_session_close(omnisocket_session_t *session) {
} }
int omnisocket_session_send(omnisocket_session_t *session, const char *to, const void *data, size_t data_len) { int omnisocket_session_send(omnisocket_session_t *session, const char *to, const void *data, size_t data_len) {
return omnisocket_session_send_with_id(session, to, data, data_len, NULL);
}
int omnisocket_session_send_with_id(
omnisocket_session_t *session,
const char *to,
const void *data,
size_t data_len,
uint64_t *out_message_id
) {
kcp_client_t *client; kcp_client_t *client;
int rc; int rc;
@@ -178,7 +188,7 @@ int omnisocket_session_send(omnisocket_session_t *session, const char *to, const
if (omnisocket_session_begin_client_op(session, &client) != 0) { if (omnisocket_session_begin_client_op(session, &client) != 0) {
return -1; return -1;
} }
rc = kcp_client_send_binary(client, to, data, data_len); rc = kcp_client_send_binary_with_id(client, to, data, data_len, out_message_id);
pthread_mutex_lock(&session->mutex); pthread_mutex_lock(&session->mutex);
if (rc == 0) { if (rc == 0) {
session->stats.send_calls += 1; session->stats.send_calls += 1;
@@ -297,7 +307,9 @@ void omnisocket_session_kcp_stats_snapshot(omnisocket_session_t *session, omniso
out_stats->conv = runtime_stats.conv; out_stats->conv = runtime_stats.conv;
out_stats->rto_ms = runtime_stats.rto_ms; out_stats->rto_ms = runtime_stats.rto_ms;
out_stats->srtt_ms = runtime_stats.srtt_ms; out_stats->srtt_ms = runtime_stats.srtt_ms;
out_stats->min_srtt_ms = runtime_stats.min_srtt_ms;
out_stats->srttvar_ms = runtime_stats.srttvar_ms; out_stats->srttvar_ms = runtime_stats.srttvar_ms;
out_stats->last_feedback_age_ms = runtime_stats.last_feedback_age_ms;
out_stats->snd_wnd = runtime_stats.snd_wnd; out_stats->snd_wnd = runtime_stats.snd_wnd;
out_stats->rmt_wnd = runtime_stats.rmt_wnd; out_stats->rmt_wnd = runtime_stats.rmt_wnd;
out_stats->inflight = runtime_stats.inflight; out_stats->inflight = runtime_stats.inflight;

View File

@@ -22,7 +22,9 @@ typedef struct omnisocket_session_kcp_stats {
uint32_t conv; uint32_t conv;
uint32_t rto_ms; uint32_t rto_ms;
int32_t srtt_ms; int32_t srtt_ms;
int32_t min_srtt_ms;
int32_t srttvar_ms; int32_t srttvar_ms;
uint32_t last_feedback_age_ms;
uint32_t snd_wnd; uint32_t snd_wnd;
uint32_t rmt_wnd; uint32_t rmt_wnd;
uint32_t inflight; uint32_t inflight;
@@ -72,6 +74,13 @@ int omnisocket_session_connect(
); );
int omnisocket_session_close(omnisocket_session_t *session); int omnisocket_session_close(omnisocket_session_t *session);
int omnisocket_session_send(omnisocket_session_t *session, const char *to, const void *data, size_t data_len); int omnisocket_session_send(omnisocket_session_t *session, const char *to, const void *data, size_t data_len);
int omnisocket_session_send_with_id(
omnisocket_session_t *session,
const char *to,
const void *data,
size_t data_len,
uint64_t *out_message_id
);
int omnisocket_session_recv(omnisocket_session_t *session, message_t *out_msg, int timeout_ms); int omnisocket_session_recv(omnisocket_session_t *session, message_t *out_msg, int timeout_ms);
int omnisocket_session_recv_into( int omnisocket_session_recv_into(
omnisocket_session_t *session, omnisocket_session_t *session,

View File

@@ -72,6 +72,12 @@ class OmniTransport:
def send(self, *, to: str, data: bytes) -> None: def send(self, *, to: str, data: bytes) -> None:
self._session.send(to=to, data=data) self._session.send(to=to, data=data)
def send_with_id(self, *, to: str, data: bytes) -> int:
if not hasattr(self._session, 'send_with_id'):
self._session.send(to=to, data=data)
raise RuntimeError('send_with_id is not available on this omnisocket build')
return int(self._session.send_with_id(to=to, data=data))
def recv(self, *, timeout_ms: int = -1): def recv(self, *, timeout_ms: int = -1):
return self._session.recv(timeout_ms=timeout_ms) return self._session.recv(timeout_ms=timeout_ms)

View File

@@ -32,6 +32,9 @@ BLITZ_OMNID_THREAD_HEARTBEAT_TIMEOUT_SEC="15"
BLITZ_KCP_STATS_INTERVAL_MS="1000" BLITZ_KCP_STATS_INTERVAL_MS="1000"
BLITZ_CONTROL_LATENCY_LOG_ENABLED="1" BLITZ_CONTROL_LATENCY_LOG_ENABLED="1"
BLITZ_CONTROL_LATENCY_LOG_SAMPLE_MOD="100" BLITZ_CONTROL_LATENCY_LOG_SAMPLE_MOD="100"
BLITZ_CONTROL_ACK_SAMPLE_MOD="10"
BLITZ_VIDEO_STAGE_LOG_ENABLED="1"
BLITZ_VIDEO_STAGE_LOG_SAMPLE_MOD="10"
BLITZ_5G_LINK_LOG_INTERVAL_SEC="5" BLITZ_5G_LINK_LOG_INTERVAL_SEC="5"
BLITZ_JSONL_FLUSH_INTERVAL_MS="1000" BLITZ_JSONL_FLUSH_INTERVAL_MS="1000"
BLITZ_JSONL_FLUSH_BYTES="262144" BLITZ_JSONL_FLUSH_BYTES="262144"
@@ -53,3 +56,5 @@ OMNI_CAMERA_DEVICE="/dev/v4l/by-path/platform-a80aa10000.usb-usb-0:3.2:1.4-video
# Boot units run b_side_omnid as root directly, so nested sudo must stay off. # Boot units run b_side_omnid as root directly, so nested sudo must stay off.
B_SIDE_OMNID_USE_SUDO="0" B_SIDE_OMNID_USE_SUDO="0"
OMNI_CONTROL_ACK_PEER_ID="peer-b-ctrl-ack"
OMNI_CONTROL_ACK_TARGET_PEER="peer-a-ctrl-ack"

View File

@@ -0,0 +1,286 @@
#!/usr/bin/env python3
from __future__ import annotations
import argparse
from datetime import datetime, timezone
import html
import json
from pathlib import Path
from typing import Any
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Aggregate run logs into control/video latency estimate outputs.")
parser.add_argument("--run-dir", required=True, help="Run directory containing JSONL logs.")
parser.add_argument("--output-dir", help="Output directory. Defaults to --run-dir.")
return parser.parse_args()
def iter_jsonl(path: Path) -> list[dict[str, Any]]:
records: list[dict[str, Any]] = []
if not path.exists():
return records
with path.open("r", encoding="utf-8") as handle:
for raw_line in handle:
line = raw_line.strip()
if not line:
continue
try:
payload = json.loads(line)
except json.JSONDecodeError:
continue
if isinstance(payload, dict):
records.append(payload)
return records
def load_glob_jsonl(run_dir: Path, pattern: str) -> list[dict[str, Any]]:
records: list[dict[str, Any]] = []
for path in sorted(run_dir.glob(pattern)):
records.extend(iter_jsonl(path))
return records
def write_jsonl(path: Path, records: list[dict[str, Any]]) -> None:
with path.open("w", encoding="utf-8") as handle:
for record in records:
handle.write(json.dumps(record, ensure_ascii=False, separators=(",", ":")))
handle.write("\n")
def parse_unix_ms(value: Any) -> int | None:
if value is None:
return None
if isinstance(value, (int, float)):
return int(value)
text = str(value).strip()
if not text:
return None
if text.endswith("Z"):
text = f"{text[:-1]}+00:00"
try:
return int(datetime.fromisoformat(text).astimezone(timezone.utc).timestamp() * 1000)
except ValueError:
return None
def flatten_net_epoch(samples: list[dict[str, Any]]) -> list[dict[str, Any]]:
flattened: list[dict[str, Any]] = []
for sample in samples:
links = sample.get("links") or {}
a_to_d = (links.get("a_to_d") or {}).get("sessions") or {}
d_to_b = (links.get("d_to_b") or {}).get("sessions") or {}
a_control = (a_to_d.get("control") or {}).get("kcp") or {}
d_control = (d_to_b.get("control") or {}).get("kcp") or {}
a_video = (a_to_d.get("video") or {}).get("kcp") or {}
d_video = (d_to_b.get("video") or {}).get("kcp") or {}
flattened.append(
{
"updated_at": sample.get("updated_at"),
"a_to_d_control_srtt_ms": a_control.get("srtt_ms"),
"a_to_d_control_min_srtt_ms": a_control.get("min_srtt_ms"),
"d_to_b_control_srtt_ms": d_control.get("srtt_ms"),
"d_to_b_control_min_srtt_ms": d_control.get("min_srtt_ms"),
"a_to_d_video_srtt_ms": a_video.get("srtt_ms"),
"a_to_d_video_min_srtt_ms": a_video.get("min_srtt_ms"),
"d_to_b_video_srtt_ms": d_video.get("srtt_ms"),
"d_to_b_video_min_srtt_ms": d_video.get("min_srtt_ms"),
"a_to_d_control_feedback_age_ms": a_control.get("last_feedback_age_ms"),
"d_to_b_control_feedback_age_ms": d_control.get("last_feedback_age_ms"),
"a_to_d_video_feedback_age_ms": a_video.get("last_feedback_age_ms"),
"d_to_b_video_feedback_age_ms": d_video.get("last_feedback_age_ms"),
"a_to_d_control_retrans_delta": ((a_to_d.get("control") or {}).get("trend") or {}).get("retrans_delta"),
"d_to_b_control_retrans_delta": ((d_to_b.get("control") or {}).get("trend") or {}).get("retrans_delta"),
"a_to_d_video_retrans_delta": ((a_to_d.get("video") or {}).get("trend") or {}).get("retrans_delta"),
"d_to_b_video_retrans_delta": ((d_to_b.get("video") or {}).get("trend") or {}).get("retrans_delta"),
"a_to_d_video_window_pressure_pct": a_video.get("window_pressure_pct"),
"d_to_b_video_window_pressure_pct": d_video.get("window_pressure_pct"),
"robot_health": sample.get("robot_health"),
}
)
return flattened
def aggregate_control_estimates(
network_samples: list[dict[str, Any]],
control_events: list[dict[str, Any]],
control_acks: list[dict[str, Any]],
) -> list[dict[str, Any]]:
if control_acks:
return control_acks
fallback: list[dict[str, Any]] = []
for sample in network_samples:
estimate = sample.get("latency_estimate") or {}
fallback.append(
{
"updated_at": sample.get("updated_at"),
"estimate_method": "srtt_fallback",
"control_loop_rtt_ms": estimate.get("control_loop_rtt_ms"),
"control_to_persist_est_ms": estimate.get("control_to_persist_est_ms"),
"control_oneway_srtt_est_ms": estimate.get("control_oneway_srtt_est_ms"),
"control_oneway_bestcase_est_ms": estimate.get("control_oneway_bestcase_est_ms"),
"source_event_count": len(control_events),
}
)
return fallback
def aggregate_video_estimates(
network_samples: list[dict[str, Any]],
frame_recv_records: list[dict[str, Any]],
display_probe_records: list[dict[str, Any]],
) -> list[dict[str, Any]]:
network_timeline = sorted(
(
(updated_at_ms, sample.get("latency_estimate") or {})
for sample in network_samples
for updated_at_ms in [parse_unix_ms(sample.get("updated_at"))]
if updated_at_ms is not None
),
key=lambda item: item[0],
)
probes_by_seq = {
int(record["frame_seq"]): record
for record in display_probe_records
if record.get("frame_seq") is not None
}
estimates: list[dict[str, Any]] = []
timeline_index = 0
for record in frame_recv_records:
frame_seq = record.get("frame_seq")
if frame_seq is None:
continue
probe = probes_by_seq.get(int(frame_seq))
backend_received_unix_ns = record.get("backend_received_unix_ns")
backend_received_unix_ms = None
try:
if backend_received_unix_ns is not None:
backend_received_unix_ms = int(int(backend_received_unix_ns) / 1_000_000)
except (TypeError, ValueError):
backend_received_unix_ms = None
latency_estimate: dict[str, Any] = {}
if backend_received_unix_ms is not None and network_timeline:
while timeline_index + 1 < len(network_timeline) and network_timeline[timeline_index + 1][0] <= backend_received_unix_ms:
timeline_index += 1
if network_timeline[timeline_index][0] <= backend_received_unix_ms:
latency_estimate = network_timeline[timeline_index][1]
network_oneway = latency_estimate.get("video_network_oneway_est_ms")
capture_to_send = record.get("b_side_capture_to_send_ms")
partial_est = None
if capture_to_send is not None or network_oneway is not None:
partial_est = round(float(capture_to_send or 0.0) + float(network_oneway or 0.0), 3)
a_recv_to_paint_ms = None
if probe is not None and probe.get("backend_received_unix_ns") is not None and probe.get("paint_unix_ms") is not None:
a_recv_to_paint_ms = round(
float(probe["paint_unix_ms"]) - (int(probe["backend_received_unix_ns"]) / 1_000_000.0),
3,
)
video_e2e_est_ms = round(partial_est + a_recv_to_paint_ms, 3) if partial_est is not None and a_recv_to_paint_ms is not None else None
estimates.append(
{
"frame_seq": frame_seq,
"backend_received_unix_ns": record.get("backend_received_unix_ns"),
"frame_hash": record.get("frame_hash"),
"estimate_method": "capture_to_send+srtt/2+recv_to_paint" if video_e2e_est_ms is not None else "capture_to_send+srtt/2",
"video_network_oneway_est_ms": network_oneway,
"b_side_capture_to_send_ms": capture_to_send,
"a_recv_to_paint_ms": a_recv_to_paint_ms,
"video_partial_est_ms": partial_est,
"video_e2e_est_ms": video_e2e_est_ms,
"sequence_gap": record.get("sequence_gap"),
"repeat_flag": record.get("repeat_flag"),
"sender_clock_delta_ms_raw": record.get("sender_clock_delta_ms_raw"),
}
)
return estimates
def write_html_summary(
path: Path,
*,
net_epochs: list[dict[str, Any]],
control_estimates: list[dict[str, Any]],
video_estimates: list[dict[str, Any]],
) -> None:
latest_control = control_estimates[-1] if control_estimates else {}
latest_video = video_estimates[-1] if video_estimates else {}
latest_net = net_epochs[-1] if net_epochs else {}
html_text = f"""<!doctype html>
<html lang="en">
<head>
<meta charset="utf-8">
<title>Latency Estimates</title>
<style>
body {{ font-family: Arial, sans-serif; margin: 24px; background: #0b1020; color: #eef2ff; }}
.grid {{ display: grid; grid-template-columns: repeat(3, minmax(0, 1fr)); gap: 16px; }}
.card {{ border: 1px solid #334155; border-radius: 8px; padding: 16px; background: #111827; }}
h1, h2 {{ margin-top: 0; }}
p {{ margin: 6px 0; line-height: 1.5; }}
code {{ color: #93c5fd; }}
</style>
</head>
<body>
<h1>Latency Estimates</h1>
<div class="grid">
<section class="card">
<h2>Control</h2>
<p><strong>loop RTT:</strong> {html.escape(str(latest_control.get("control_loop_rtt_ms")))}</p>
<p><strong>to persist:</strong> {html.escape(str(latest_control.get("control_to_persist_est_ms")))}</p>
<p><strong>method:</strong> {html.escape(str(latest_control.get("estimate_method")))}</p>
<p><strong>samples:</strong> {len(control_estimates)}</p>
</section>
<section class="card">
<h2>Video</h2>
<p><strong>network one-way:</strong> {html.escape(str(latest_video.get("video_network_oneway_est_ms")))}</p>
<p><strong>partial:</strong> {html.escape(str(latest_video.get("video_partial_est_ms")))}</p>
<p><strong>end-to-end:</strong> {html.escape(str(latest_video.get("video_e2e_est_ms")))}</p>
<p><strong>samples:</strong> {len(video_estimates)}</p>
</section>
<section class="card">
<h2>Net Epoch</h2>
<p><strong>a→d control srtt:</strong> {html.escape(str(latest_net.get("a_to_d_control_srtt_ms")))}</p>
<p><strong>d→b control srtt:</strong> {html.escape(str(latest_net.get("d_to_b_control_srtt_ms")))}</p>
<p><strong>a→d video srtt:</strong> {html.escape(str(latest_net.get("a_to_d_video_srtt_ms")))}</p>
<p><strong>d→b video srtt:</strong> {html.escape(str(latest_net.get("d_to_b_video_srtt_ms")))}</p>
</section>
</div>
</body>
</html>
"""
path.write_text(html_text, encoding="utf-8")
def main() -> int:
args = parse_args()
run_dir = Path(args.run_dir).resolve()
output_dir = Path(args.output_dir).resolve() if args.output_dir else run_dir
output_dir.mkdir(parents=True, exist_ok=True)
network_samples = load_glob_jsonl(run_dir, "a-network-summary.*.jsonl")
control_events = load_glob_jsonl(run_dir, "a-control-events.*.jsonl")
control_acks = load_glob_jsonl(run_dir, "a-control-acks.*.jsonl")
frame_recv_records = load_glob_jsonl(run_dir, "a-video-frame-recv.*.jsonl")
display_probe_records = load_glob_jsonl(run_dir, "a-video-display-probe.*.jsonl")
net_epochs = flatten_net_epoch(network_samples)
control_estimates = aggregate_control_estimates(network_samples, control_events, control_acks)
video_estimates = aggregate_video_estimates(network_samples, frame_recv_records, display_probe_records)
write_jsonl(output_dir / "net-epoch-summary.jsonl", net_epochs)
write_jsonl(output_dir / "control-latency-estimates.jsonl", control_estimates)
write_jsonl(output_dir / "video-latency-estimates.jsonl", video_estimates)
write_html_summary(
output_dir / "latency-estimates.html",
net_epochs=net_epochs,
control_estimates=control_estimates,
video_estimates=video_estimates,
)
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -124,7 +124,7 @@ export OMNISOCKET_TELEMETRY_INTERVAL_MS="${OMNISOCKET_TELEMETRY_INTERVAL_MS:-100
export OMNISOCKET_TELEMETRY_STALE_AFTER_MS="${OMNISOCKET_TELEMETRY_STALE_AFTER_MS:-3000}" export OMNISOCKET_TELEMETRY_STALE_AFTER_MS="${OMNISOCKET_TELEMETRY_STALE_AFTER_MS:-3000}"
export OMNI_NETWORK_SUMMARY_LOG_ENABLED="${OMNI_NETWORK_SUMMARY_LOG_ENABLED:-1}" export OMNI_NETWORK_SUMMARY_LOG_ENABLED="${OMNI_NETWORK_SUMMARY_LOG_ENABLED:-1}"
export OMNI_NETWORK_SUMMARY_LOG_PATH="${OMNI_NETWORK_SUMMARY_LOG_PATH:-${OMNISOCKETGO_ROOT}/logs/a-network-summary.jsonl}" export OMNI_NETWORK_SUMMARY_LOG_PATH="${OMNI_NETWORK_SUMMARY_LOG_PATH:-${OMNISOCKETGO_ROOT}/logs/a-network-summary.jsonl}"
export OMNI_NETWORK_SUMMARY_LOG_INTERVAL_MS="${OMNI_NETWORK_SUMMARY_LOG_INTERVAL_MS:-2000}" export OMNI_NETWORK_SUMMARY_LOG_INTERVAL_MS="${OMNI_NETWORK_SUMMARY_LOG_INTERVAL_MS:-1000}"
export OMNI_NETWORK_SUMMARY_LOG_REQUEST_TIMEOUT_SEC="${OMNI_NETWORK_SUMMARY_LOG_REQUEST_TIMEOUT_SEC:-3}" export OMNI_NETWORK_SUMMARY_LOG_REQUEST_TIMEOUT_SEC="${OMNI_NETWORK_SUMMARY_LOG_REQUEST_TIMEOUT_SEC:-3}"
export CONTROL_SIDE_OMNISOCKET_SERVER_ADDR="${CONTROL_SIDE_OMNISOCKET_SERVER_ADDR:-}" export CONTROL_SIDE_OMNISOCKET_SERVER_ADDR="${CONTROL_SIDE_OMNISOCKET_SERVER_ADDR:-}"
export CONTROL_SIDE_OMNISOCKET_RELAY_VIA="${CONTROL_SIDE_OMNISOCKET_RELAY_VIA:-}" export CONTROL_SIDE_OMNISOCKET_RELAY_VIA="${CONTROL_SIDE_OMNISOCKET_RELAY_VIA:-}"
@@ -152,6 +152,8 @@ export OMNI_VIDEO_RELAY_VIA="${OMNI_VIDEO_RELAY_VIA:-${ROBOT_SIDE_OMNISOCKET_REL
export OMNI_CONTROL_SERVER_ADDR="${OMNI_CONTROL_SERVER_ADDR:-${ROBOT_SIDE_OMNISOCKET_SERVER_ADDR:-}}" export OMNI_CONTROL_SERVER_ADDR="${OMNI_CONTROL_SERVER_ADDR:-${ROBOT_SIDE_OMNISOCKET_SERVER_ADDR:-}}"
export OMNI_CONTROL_RELAY_VIA="${OMNI_CONTROL_RELAY_VIA:-${ROBOT_SIDE_OMNISOCKET_RELAY_VIA:-}}" export OMNI_CONTROL_RELAY_VIA="${OMNI_CONTROL_RELAY_VIA:-${ROBOT_SIDE_OMNISOCKET_RELAY_VIA:-}}"
export OMNI_CONTROL_UNIX_SOCKET_PATH="${OMNI_CONTROL_UNIX_SOCKET_PATH:-${ROBOT_RECEIVER_LOCAL_SOCKET_PATH}}" export OMNI_CONTROL_UNIX_SOCKET_PATH="${OMNI_CONTROL_UNIX_SOCKET_PATH:-${ROBOT_RECEIVER_LOCAL_SOCKET_PATH}}"
export OMNI_CONTROL_ACK_PEER_ID="${OMNI_CONTROL_ACK_PEER_ID:-peer-b-ctrl-ack}"
export OMNI_CONTROL_ACK_TARGET_PEER="${OMNI_CONTROL_ACK_TARGET_PEER:-peer-a-ctrl-ack}"
export B_SIDE_OMNID_USE_SUDO="${B_SIDE_OMNID_USE_SUDO:-1}" export B_SIDE_OMNID_USE_SUDO="${B_SIDE_OMNID_USE_SUDO:-1}"
export BLITZ_RUNTIME_DIR="${BLITZ_RUNTIME_DIR:-${OMNISOCKETGO_ROOT}/logs/runtime}" export BLITZ_RUNTIME_DIR="${BLITZ_RUNTIME_DIR:-${OMNISOCKETGO_ROOT}/logs/runtime}"
export BLITZ_RUN_ROOT="${BLITZ_RUN_ROOT:-${OMNISOCKETGO_ROOT}/logs}" export BLITZ_RUN_ROOT="${BLITZ_RUN_ROOT:-${OMNISOCKETGO_ROOT}/logs}"
@@ -167,6 +169,9 @@ export BLITZ_TIME_SERVER_IP="${BLITZ_TIME_SERVER_IP:-}"
export BLITZ_KCP_STATS_INTERVAL_MS="${BLITZ_KCP_STATS_INTERVAL_MS:-1000}" export BLITZ_KCP_STATS_INTERVAL_MS="${BLITZ_KCP_STATS_INTERVAL_MS:-1000}"
export BLITZ_CONTROL_LATENCY_LOG_ENABLED="${BLITZ_CONTROL_LATENCY_LOG_ENABLED:-1}" export BLITZ_CONTROL_LATENCY_LOG_ENABLED="${BLITZ_CONTROL_LATENCY_LOG_ENABLED:-1}"
export BLITZ_CONTROL_LATENCY_LOG_SAMPLE_MOD="${BLITZ_CONTROL_LATENCY_LOG_SAMPLE_MOD:-100}" export BLITZ_CONTROL_LATENCY_LOG_SAMPLE_MOD="${BLITZ_CONTROL_LATENCY_LOG_SAMPLE_MOD:-100}"
export BLITZ_CONTROL_ACK_SAMPLE_MOD="${BLITZ_CONTROL_ACK_SAMPLE_MOD:-10}"
export BLITZ_VIDEO_STAGE_LOG_ENABLED="${BLITZ_VIDEO_STAGE_LOG_ENABLED:-1}"
export BLITZ_VIDEO_STAGE_LOG_SAMPLE_MOD="${BLITZ_VIDEO_STAGE_LOG_SAMPLE_MOD:-10}"
export BLITZ_5G_LINK_LOG_INTERVAL_SEC="${BLITZ_5G_LINK_LOG_INTERVAL_SEC:-5}" export BLITZ_5G_LINK_LOG_INTERVAL_SEC="${BLITZ_5G_LINK_LOG_INTERVAL_SEC:-5}"
export BLITZ_JSONL_FLUSH_INTERVAL_MS="${BLITZ_JSONL_FLUSH_INTERVAL_MS:-1000}" export BLITZ_JSONL_FLUSH_INTERVAL_MS="${BLITZ_JSONL_FLUSH_INTERVAL_MS:-1000}"
export BLITZ_JSONL_FLUSH_BYTES="${BLITZ_JSONL_FLUSH_BYTES:-262144}" export BLITZ_JSONL_FLUSH_BYTES="${BLITZ_JSONL_FLUSH_BYTES:-262144}"
@@ -294,12 +299,17 @@ blitz_dev_prepare_backend_logging_env() {
export OMNI_NETWORK_SUMMARY_LOG_PATH export OMNI_NETWORK_SUMMARY_LOG_PATH
OMNI_NETWORK_SUMMARY_LOG_PATH="$(blitz_dev_component_log_path "a-network-summary")" OMNI_NETWORK_SUMMARY_LOG_PATH="$(blitz_dev_component_log_path "a-network-summary")"
fi fi
export BLITZ_A_CONTROL_EVENTS_LOG_PATH="${BLITZ_A_CONTROL_EVENTS_LOG_PATH:-$(blitz_dev_component_log_path "a-control-events")}"
export BLITZ_A_CONTROL_ACKS_LOG_PATH="${BLITZ_A_CONTROL_ACKS_LOG_PATH:-$(blitz_dev_component_log_path "a-control-acks")}"
export BLITZ_A_VIDEO_FRAME_RECV_LOG_PATH="${BLITZ_A_VIDEO_FRAME_RECV_LOG_PATH:-$(blitz_dev_component_log_path "a-video-frame-recv")}"
export BLITZ_A_VIDEO_DISPLAY_PROBE_LOG_PATH="${BLITZ_A_VIDEO_DISPLAY_PROBE_LOG_PATH:-$(blitz_dev_component_log_path "a-video-display-probe")}"
} }
blitz_dev_prepare_bside_logging_env() { blitz_dev_prepare_bside_logging_env() {
blitz_dev_init_instance_context blitz_dev_init_instance_context
export BLITZ_KCP_STATS_LOG_PATH="${BLITZ_KCP_STATS_LOG_PATH:-$(blitz_dev_component_log_path "b-kcp-session-stats")}" export BLITZ_KCP_STATS_LOG_PATH="${BLITZ_KCP_STATS_LOG_PATH:-$(blitz_dev_component_log_path "b-kcp-session-stats")}"
export BLITZ_CONTROL_LATENCY_LOG_PATH="${BLITZ_CONTROL_LATENCY_LOG_PATH:-$(blitz_dev_component_log_path "b-control-latency")}" export BLITZ_CONTROL_LATENCY_LOG_PATH="${BLITZ_CONTROL_LATENCY_LOG_PATH:-$(blitz_dev_component_log_path "b-control-latency")}"
export BLITZ_VIDEO_STAGE_LOG_PATH="${BLITZ_VIDEO_STAGE_LOG_PATH:-$(blitz_dev_component_log_path "b-video-frame-stages")}"
} }
blitz_dev_prepare_5g_logging_env() { blitz_dev_prepare_5g_logging_env() {

View File

@@ -26,7 +26,7 @@ OMNISOCKET_TELEMETRY_INTERVAL_MS="1000"
OMNISOCKET_TELEMETRY_STALE_AFTER_MS="3000" OMNISOCKET_TELEMETRY_STALE_AFTER_MS="3000"
OMNI_NETWORK_SUMMARY_LOG_ENABLED="1" OMNI_NETWORK_SUMMARY_LOG_ENABLED="1"
OMNI_NETWORK_SUMMARY_LOG_PATH="${OMNISOCKETGO_ROOT}/logs/a-network-summary.jsonl" OMNI_NETWORK_SUMMARY_LOG_PATH="${OMNISOCKETGO_ROOT}/logs/a-network-summary.jsonl"
OMNI_NETWORK_SUMMARY_LOG_INTERVAL_MS="5000" OMNI_NETWORK_SUMMARY_LOG_INTERVAL_MS="1000"
OMNI_NETWORK_SUMMARY_LOG_REQUEST_TIMEOUT_SEC="3" OMNI_NETWORK_SUMMARY_LOG_REQUEST_TIMEOUT_SEC="3"
FRONTEND_HOST="0.0.0.0" FRONTEND_HOST="0.0.0.0"
@@ -63,6 +63,11 @@ OMNI_CONTROL_EXPECTED_SENDER="peer-a-ctrl"
OMNI_CONTROL_SERVER_ADDR="${ROBOT_SIDE_OMNISOCKET_SERVER_ADDR}" OMNI_CONTROL_SERVER_ADDR="${ROBOT_SIDE_OMNISOCKET_SERVER_ADDR}"
OMNI_CONTROL_RELAY_VIA="${ROBOT_SIDE_OMNISOCKET_RELAY_VIA}" OMNI_CONTROL_RELAY_VIA="${ROBOT_SIDE_OMNISOCKET_RELAY_VIA}"
OMNI_CONTROL_UNIX_SOCKET_PATH="${ROBOT_RECEIVER_LOCAL_SOCKET_PATH}" OMNI_CONTROL_UNIX_SOCKET_PATH="${ROBOT_RECEIVER_LOCAL_SOCKET_PATH}"
OMNI_CONTROL_ACK_PEER_ID="peer-b-ctrl-ack"
OMNI_CONTROL_ACK_TARGET_PEER="peer-a-ctrl-ack"
BLITZ_CONTROL_ACK_SAMPLE_MOD="10"
BLITZ_VIDEO_STAGE_LOG_ENABLED="1"
BLITZ_VIDEO_STAGE_LOG_SAMPLE_MOD="10"
OMNI_CONTROL_SERVER_IDLE_RECONNECT_MS="30000" OMNI_CONTROL_SERVER_IDLE_RECONNECT_MS="30000"
# A-side backend video freshness guard. Used by scripts/dev/start-backend.sh. # A-side backend video freshness guard. Used by scripts/dev/start-backend.sh.

View File

@@ -156,10 +156,18 @@ int kcp_session_stats_log(kcp_session_stats_logger_t *logger, const kcp_session_
kcp_session_stats_appendf(&line, &line_len, ",\"srtt_ms\":%d", record->srtt_ms) != 0) { kcp_session_stats_appendf(&line, &line_len, ",\"srtt_ms\":%d", record->srtt_ms) != 0) {
goto cleanup; goto cleanup;
} }
if (record->has_min_srtt_ms &&
kcp_session_stats_appendf(&line, &line_len, ",\"min_srtt_ms\":%d", record->min_srtt_ms) != 0) {
goto cleanup;
}
if (record->has_srttvar_ms && if (record->has_srttvar_ms &&
kcp_session_stats_appendf(&line, &line_len, ",\"srttvar_ms\":%d", record->srttvar_ms) != 0) { kcp_session_stats_appendf(&line, &line_len, ",\"srttvar_ms\":%d", record->srttvar_ms) != 0) {
goto cleanup; goto cleanup;
} }
if (record->has_last_feedback_age_ms &&
kcp_session_stats_appendf(&line, &line_len, ",\"last_feedback_age_ms\":%u", record->last_feedback_age_ms) != 0) {
goto cleanup;
}
if (record->has_snd_wnd && if (record->has_snd_wnd &&
kcp_session_stats_appendf(&line, &line_len, ",\"snd_wnd\":%u", record->snd_wnd) != 0) { kcp_session_stats_appendf(&line, &line_len, ",\"snd_wnd\":%u", record->snd_wnd) != 0) {
goto cleanup; goto cleanup;

View File

@@ -477,6 +477,16 @@ int kcp_client_send_text(kcp_client_t *client, const char *to, const char *text)
} }
int kcp_client_send_binary(kcp_client_t *client, const char *to, const void *data, size_t data_len) { int kcp_client_send_binary(kcp_client_t *client, const char *to, const void *data, size_t data_len) {
return kcp_client_send_binary_with_id(client, to, data, data_len, NULL);
}
int kcp_client_send_binary_with_id(
kcp_client_t *client,
const char *to,
const void *data,
size_t data_len,
uint64_t *out_id
) {
message_t msg; message_t msg;
uint64_t id; uint64_t id;
@@ -508,6 +518,9 @@ int kcp_client_send_binary(kcp_client_t *client, const char *to, const void *dat
protocol_message_clear(&msg); protocol_message_clear(&msg);
return -1; return -1;
} }
if (out_id != NULL) {
*out_id = id;
}
protocol_message_clear(&msg); protocol_message_clear(&msg);
return 0; return 0;
} }

View File

@@ -228,7 +228,9 @@ static int kcp_hub_add_runtime_stats_json(cJSON *object, const kcp_runtime_stats
cJSON_AddNumberToObject(object, "conv", (double) stats->conv) == NULL || cJSON_AddNumberToObject(object, "conv", (double) stats->conv) == NULL ||
cJSON_AddNumberToObject(object, "rto_ms", (double) stats->rto_ms) == NULL || cJSON_AddNumberToObject(object, "rto_ms", (double) stats->rto_ms) == NULL ||
cJSON_AddNumberToObject(object, "srtt_ms", (double) stats->srtt_ms) == NULL || cJSON_AddNumberToObject(object, "srtt_ms", (double) stats->srtt_ms) == NULL ||
cJSON_AddNumberToObject(object, "min_srtt_ms", (double) stats->min_srtt_ms) == NULL ||
cJSON_AddNumberToObject(object, "srttvar_ms", (double) stats->srttvar_ms) == NULL || cJSON_AddNumberToObject(object, "srttvar_ms", (double) stats->srttvar_ms) == NULL ||
cJSON_AddNumberToObject(object, "last_feedback_age_ms", (double) stats->last_feedback_age_ms) == NULL ||
cJSON_AddNumberToObject(object, "snd_wnd", (double) stats->snd_wnd) == NULL || cJSON_AddNumberToObject(object, "snd_wnd", (double) stats->snd_wnd) == NULL ||
cJSON_AddNumberToObject(object, "rmt_wnd", (double) stats->rmt_wnd) == NULL || cJSON_AddNumberToObject(object, "rmt_wnd", (double) stats->rmt_wnd) == NULL ||
cJSON_AddNumberToObject(object, "inflight", (double) stats->inflight) == NULL || cJSON_AddNumberToObject(object, "inflight", (double) stats->inflight) == NULL ||

View File

@@ -72,6 +72,8 @@ struct kcp_conn {
uint64_t pending_in_errs; uint64_t pending_in_errs;
uint64_t pending_kcp_in_errs; uint64_t pending_kcp_in_errs;
protocol_frame_decoder_t decoder; protocol_frame_decoder_t decoder;
int32_t min_srtt_ms;
uint32_t last_feedback_ms;
uint8_t scratch[KCP_RECV_CHUNK_SIZE]; uint8_t scratch[KCP_RECV_CHUNK_SIZE];
latency_logger_t *logger; latency_logger_t *logger;
char node_role[OMNI_MAX_NODE_ROLE]; char node_role[OMNI_MAX_NODE_ROLE];
@@ -307,6 +309,26 @@ static uint64_t kcp_counter_diff(uint64_t previous, uint64_t current) {
return current < previous ? 0 : current - previous; return current < previous ? 0 : current - previous;
} }
static void kcp_conn_update_min_srtt_locked(kcp_conn_t *conn) {
int32_t srtt_ms;
if (conn == NULL || conn->kcp == NULL) {
return;
}
srtt_ms = conn->kcp->rx_srtt;
if (srtt_ms > 0 && (conn->min_srtt_ms <= 0 || srtt_ms < conn->min_srtt_ms)) {
conn->min_srtt_ms = srtt_ms;
}
}
static void kcp_conn_note_feedback_locked(kcp_conn_t *conn) {
if (conn == NULL) {
return;
}
conn->last_feedback_ms = omni_now_millis32();
kcp_conn_update_min_srtt_locked(conn);
}
static int kcp_process_sampler_matches(const kcp_process_sampler_t *sampler, kcp_session_stats_logger_t *logger, const char *node_role, const char *node_id, int stats_interval_ms) { static int kcp_process_sampler_matches(const kcp_process_sampler_t *sampler, kcp_session_stats_logger_t *logger, const char *node_role, const char *node_id, int stats_interval_ms) {
if (sampler == NULL) { if (sampler == NULL) {
return 0; return 0;
@@ -1093,8 +1115,13 @@ static void kcp_log_session_snapshot(kcp_conn_t *conn, const char *reason) {
record.rto_ms = conn->kcp->rx_rto; record.rto_ms = conn->kcp->rx_rto;
record.has_srtt_ms = 1; record.has_srtt_ms = 1;
record.srtt_ms = conn->kcp->rx_srtt; record.srtt_ms = conn->kcp->rx_srtt;
kcp_conn_update_min_srtt_locked(conn);
record.has_min_srtt_ms = conn->min_srtt_ms > 0;
record.min_srtt_ms = conn->min_srtt_ms;
record.has_srttvar_ms = 1; record.has_srttvar_ms = 1;
record.srttvar_ms = conn->kcp->rx_rttval; record.srttvar_ms = conn->kcp->rx_rttval;
record.has_last_feedback_age_ms = conn->last_feedback_ms != 0;
record.last_feedback_age_ms = conn->last_feedback_ms == 0 ? 0 : (omni_now_millis32() - conn->last_feedback_ms);
record.has_snd_wnd = 1; record.has_snd_wnd = 1;
record.snd_wnd = conn->kcp->snd_wnd; record.snd_wnd = conn->kcp->snd_wnd;
record.has_rmt_wnd = 1; record.has_rmt_wnd = 1;
@@ -1268,6 +1295,7 @@ static void *kcp_client_recv_thread_main(void *arg) {
if (ikcp_input(conn->kcp, (const char *) buffer, n) != 0) { if (ikcp_input(conn->kcp, (const char *) buffer, n) != 0) {
kcp_conn_record_error(conn); kcp_conn_record_error(conn);
} else { } else {
kcp_conn_note_feedback_locked(conn);
kcp_conn_record_input(conn, (int) n, segment_count); kcp_conn_record_input(conn, (int) n, segment_count);
} }
pthread_mutex_unlock(&conn->kcp_mu); pthread_mutex_unlock(&conn->kcp_mu);
@@ -1630,6 +1658,7 @@ static void *kcp_listener_recv_thread_main(void *arg) {
if (ikcp_input(conn->kcp, (const char *) buffer, n) != 0) { if (ikcp_input(conn->kcp, (const char *) buffer, n) != 0) {
kcp_conn_record_error(conn); kcp_conn_record_error(conn);
} else { } else {
kcp_conn_note_feedback_locked(conn);
kcp_conn_record_input(conn, (int) n, segment_count); kcp_conn_record_input(conn, (int) n, segment_count);
} }
pthread_mutex_unlock(&conn->kcp_mu); pthread_mutex_unlock(&conn->kcp_mu);
@@ -1907,7 +1936,10 @@ void kcp_conn_runtime_stats_snapshot(kcp_conn_t *conn, kcp_runtime_stats_t *out_
out_stats->conv = conn->kcp->conv; out_stats->conv = conn->kcp->conv;
out_stats->rto_ms = conn->kcp->rx_rto; out_stats->rto_ms = conn->kcp->rx_rto;
out_stats->srtt_ms = conn->kcp->rx_srtt; out_stats->srtt_ms = conn->kcp->rx_srtt;
kcp_conn_update_min_srtt_locked(conn);
out_stats->min_srtt_ms = conn->min_srtt_ms;
out_stats->srttvar_ms = conn->kcp->rx_rttval; out_stats->srttvar_ms = conn->kcp->rx_rttval;
out_stats->last_feedback_age_ms = conn->last_feedback_ms == 0 ? 0 : (omni_now_millis32() - conn->last_feedback_ms);
out_stats->snd_wnd = conn->kcp->snd_wnd; out_stats->snd_wnd = conn->kcp->snd_wnd;
out_stats->rmt_wnd = conn->kcp->rmt_wnd; out_stats->rmt_wnd = conn->kcp->rmt_wnd;
out_stats->inflight = conn->kcp->snd_nxt - conn->kcp->snd_una; out_stats->inflight = conn->kcp->snd_nxt - conn->kcp->snd_una;

View File

@@ -46,6 +46,7 @@ typedef struct video_sender {
char target_peer[OMNI_MAX_PEER_ID]; char target_peer[OMNI_MAX_PEER_ID];
uint8_t *send_buffer; uint8_t *send_buffer;
size_t send_buffer_cap; size_t send_buffer_cap;
uint64_t next_frame_seq;
} video_sender_t; } video_sender_t;
static int video_pipeline_stop_requested(volatile sig_atomic_t *stop_requested) { static int video_pipeline_stop_requested(volatile sig_atomic_t *stop_requested) {
@@ -212,6 +213,7 @@ void video_pipeline_config_init(video_pipeline_config_t *config) {
config->hard_backpressure_hold_ms = VIDEO_HARD_BACKPRESSURE_HOLD_MS_DEFAULT; config->hard_backpressure_hold_ms = VIDEO_HARD_BACKPRESSURE_HOLD_MS_DEFAULT;
config->frame_stall_reconnect_ms = VIDEO_DEFAULT_FRAME_STALL_RECONNECT_MS; config->frame_stall_reconnect_ms = VIDEO_DEFAULT_FRAME_STALL_RECONNECT_MS;
config->stats_logger = NULL; config->stats_logger = NULL;
config->stage_logger = NULL;
config->stats_interval_ms = 1000; config->stats_interval_ms = 1000;
} }
@@ -272,6 +274,8 @@ void video_pipeline_stats_snapshot(video_pipeline_stats_t *stats, video_pipeline
out_stats->backlog_resets = stats->backlog_resets; out_stats->backlog_resets = stats->backlog_resets;
out_stats->last_frame_bytes = stats->last_frame_bytes; out_stats->last_frame_bytes = stats->last_frame_bytes;
out_stats->last_backlog_segments = stats->last_backlog_segments; out_stats->last_backlog_segments = stats->last_backlog_segments;
out_stats->last_capture_to_send_ms = stats->last_capture_to_send_ms;
out_stats->avg_capture_to_send_ms = stats->avg_capture_to_send_ms;
out_stats->connected = stats->connected; out_stats->connected = stats->connected;
snprintf(out_stats->last_error, sizeof(out_stats->last_error), "%s", stats->last_error); snprintf(out_stats->last_error, sizeof(out_stats->last_error), "%s", stats->last_error);
snprintf(out_stats->last_backlog_reason, sizeof(out_stats->last_backlog_reason), "%s", stats->last_backlog_reason); snprintf(out_stats->last_backlog_reason, sizeof(out_stats->last_backlog_reason), "%s", stats->last_backlog_reason);
@@ -644,10 +648,12 @@ static int video_sender_drain_pending_messages(video_sender_t *sender) {
static int video_sender_send_packet( static int video_sender_send_packet(
video_sender_t *sender, video_sender_t *sender,
const AVPacket *encoded_pkt, const AVPacket *encoded_pkt,
const video_pipeline_packet_metadata_t *metadata const video_pipeline_packet_metadata_t *metadata,
uint64_t *out_frame_seq
) { ) {
uint8_t *payload; uint8_t *payload;
size_t payload_len; size_t payload_len;
uint64_t frame_seq;
int rc; int rc;
if (sender == NULL || sender->client == NULL || encoded_pkt == NULL || metadata == NULL) { if (sender == NULL || sender->client == NULL || encoded_pkt == NULL || metadata == NULL) {
@@ -655,18 +661,31 @@ static int video_sender_send_packet(
return -1; return -1;
} }
payload_len = (size_t) encoded_pkt->size + sizeof(*metadata); frame_seq = sender->next_frame_seq + 1U;
payload_len = 8U + (size_t) encoded_pkt->size + sizeof(*metadata);
if (video_sender_ensure_buffer_capacity(sender, payload_len) != 0) { if (video_sender_ensure_buffer_capacity(sender, payload_len) != 0) {
return -1; return -1;
} }
payload = sender->send_buffer; payload = sender->send_buffer;
memcpy(payload, encoded_pkt->data, (size_t) encoded_pkt->size); payload[0] = (uint8_t) (frame_seq >> 56);
memcpy(payload + encoded_pkt->size, metadata, sizeof(*metadata)); payload[1] = (uint8_t) (frame_seq >> 48);
payload[2] = (uint8_t) (frame_seq >> 40);
payload[3] = (uint8_t) (frame_seq >> 32);
payload[4] = (uint8_t) (frame_seq >> 24);
payload[5] = (uint8_t) (frame_seq >> 16);
payload[6] = (uint8_t) (frame_seq >> 8);
payload[7] = (uint8_t) frame_seq;
memcpy(payload + 8U, encoded_pkt->data, (size_t) encoded_pkt->size);
memcpy(payload + 8U + (size_t) encoded_pkt->size, metadata, sizeof(*metadata));
rc = kcp_client_send_binary(sender->client, sender->target_peer, payload, payload_len); rc = kcp_client_send_binary(sender->client, sender->target_peer, payload, payload_len);
if (rc != 0) { if (rc != 0) {
return rc; return rc;
} }
sender->next_frame_seq = frame_seq;
if (out_frame_seq != NULL) {
*out_frame_seq = frame_seq;
}
rc = video_sender_drain_pending_messages(sender); rc = video_sender_drain_pending_messages(sender);
return rc; return rc;
} }
@@ -735,6 +754,109 @@ static void video_pipeline_note_backpressure(
pthread_mutex_unlock(&stats->mutex); pthread_mutex_unlock(&stats->mutex);
} }
static void video_pipeline_note_capture_to_send(video_pipeline_stats_t *stats, uint32_t capture_to_send_ms) {
if (stats == NULL) {
return;
}
pthread_mutex_lock(&stats->mutex);
stats->last_capture_to_send_ms = capture_to_send_ms;
if (stats->avg_capture_to_send_ms <= 0.0) {
stats->avg_capture_to_send_ms = (double) capture_to_send_ms;
} else {
stats->avg_capture_to_send_ms = stats->avg_capture_to_send_ms * 0.9 + (double) capture_to_send_ms * 0.1;
}
pthread_mutex_unlock(&stats->mutex);
}
static int video_stage_logger_should_log(const video_stage_logger_t *logger, uint64_t frame_seq) {
if (logger == NULL || !logger->enabled) {
return 0;
}
if (logger->sample_mod <= 1U) {
return 1;
}
return frame_seq % logger->sample_mod == 0U;
}
static void video_stage_logger_log_frame(
video_stage_logger_t *logger,
uint64_t frame_seq,
double capture_ms,
double decode_ms,
double scale_ms,
double encode_ms,
double send_ms,
double pipeline_total_ms,
size_t jpeg_bytes,
uint64_t kcp_out_seg_delta,
uint32_t backlog_segments,
double window_pressure_pct,
int32_t video_srtt_ms
) {
char *line;
if (!video_stage_logger_should_log(logger, frame_seq)) {
return;
}
line = omni_strdup_printf(
"{\"ts_unix_nano\":%" PRId64 ",\"frame_seq\":%" PRIu64 ",\"capture_ms\":%.3f,\"decode_ms\":%.3f,\"scale_ms\":%.3f,\"encode_ms\":%.3f,\"send_ms\":%.3f,\"pipeline_total_ms\":%.3f,\"jpeg_bytes\":%zu,\"kcp_out_seg_delta\":%" PRIu64 ",\"backlog_segments\":%u,\"window_pressure_pct\":%.3f,\"video_srtt_ms\":%d}",
omni_now_unix_nano(),
frame_seq,
capture_ms,
decode_ms,
scale_ms,
encode_ms,
send_ms,
pipeline_total_ms,
jpeg_bytes,
kcp_out_seg_delta,
backlog_segments,
window_pressure_pct,
video_srtt_ms
);
if (line == NULL) {
return;
}
(void) omni_file_logger_write_line(&logger->file_logger, line);
free(line);
}
video_stage_logger_t *video_stage_logger_open_jsonl(const char *path, uint64_t sample_mod) {
video_stage_logger_t *logger;
FILE *file;
if (path == NULL || path[0] == '\0') {
return NULL;
}
if (omni_ensure_parent_dir(path) != 0) {
return NULL;
}
file = fopen(path, "ab");
if (file == NULL) {
return NULL;
}
logger = (video_stage_logger_t *) calloc(1, sizeof(*logger));
if (logger == NULL) {
fclose(file);
return NULL;
}
omni_file_logger_init_path(&logger->file_logger, file, path, 0);
logger->enabled = 1;
logger->sample_mod = sample_mod == 0U ? 1U : sample_mod;
return logger;
}
void video_stage_logger_close(video_stage_logger_t *logger) {
if (logger == NULL) {
return;
}
if (logger->file_logger.file != NULL) {
fclose(logger->file_logger.file);
}
omni_file_logger_destroy(&logger->file_logger);
free(logger);
}
static int video_server_error_requires_reconnect(const char *message) { static int video_server_error_requires_reconnect(const char *message) {
if (message == NULL || message[0] == '\0') { if (message == NULL || message[0] == '\0') {
return 0; return 0;
@@ -932,7 +1054,9 @@ int video_pipeline_run(const video_pipeline_config_t *config, video_pipeline_sta
AVFrame *scaled_frame = NULL; AVFrame *scaled_frame = NULL;
AVPacket *encoded_pkt = NULL; AVPacket *encoded_pkt = NULL;
kcp_runtime_stats_t transport_stats; kcp_runtime_stats_t transport_stats;
kcp_runtime_stats_t transport_after_send;
int select_rc; int select_rc;
int should_log_stage = 0;
double total_start_ms = 0.0; double total_start_ms = 0.0;
double capture_start_ms = 0.0; double capture_start_ms = 0.0;
double capture_end_ms = 0.0; double capture_end_ms = 0.0;
@@ -947,8 +1071,13 @@ int video_pipeline_run(const video_pipeline_config_t *config, video_pipeline_sta
video_pipeline_packet_metadata_t packet_metadata; video_pipeline_packet_metadata_t packet_metadata;
char reconnect_reason[256]; char reconnect_reason[256];
int frame_number = frame_index + 1; int frame_number = frame_index + 1;
uint64_t frame_seq = 0;
uint64_t out_segs_before_send = 0;
uint64_t out_segs_after_send = 0;
uint32_t capture_to_send_ms = 0;
memset(&transport_stats, 0, sizeof(transport_stats)); memset(&transport_stats, 0, sizeof(transport_stats));
memset(&transport_after_send, 0, sizeof(transport_after_send));
memset(&packet_metadata, 0, sizeof(packet_metadata)); memset(&packet_metadata, 0, sizeof(packet_metadata));
reconnect_reason[0] = '\0'; reconnect_reason[0] = '\0';
video_pipeline_report_progress(config); video_pipeline_report_progress(config);
@@ -956,9 +1085,7 @@ int video_pipeline_run(const video_pipeline_config_t *config, video_pipeline_sta
if (config->max_frames > 0 && frame_index >= config->max_frames) { if (config->max_frames > 0 && frame_index >= config->max_frames) {
break; break;
} }
if (config->enable_timing_logs) { total_start_ms = video_pipeline_now_ms();
total_start_ms = video_pipeline_now_ms();
}
FD_ZERO(&fds); FD_ZERO(&fds);
FD_SET(fd, &fds); FD_SET(fd, &fds);
@@ -972,9 +1099,7 @@ int video_pipeline_run(const video_pipeline_config_t *config, video_pipeline_sta
video_pipeline_set_errno_error(stats, "failed waiting for camera frame"); video_pipeline_set_errno_error(stats, "failed waiting for camera frame");
goto cleanup; goto cleanup;
} }
if (config->enable_timing_logs) { capture_start_ms = video_pipeline_now_ms();
capture_start_ms = video_pipeline_now_ms();
}
memset(&buf, 0, sizeof(buf)); memset(&buf, 0, sizeof(buf));
buf.type = V4L2_BUF_TYPE_VIDEO_CAPTURE; buf.type = V4L2_BUF_TYPE_VIDEO_CAPTURE;
@@ -983,10 +1108,8 @@ int video_pipeline_run(const video_pipeline_config_t *config, video_pipeline_sta
video_pipeline_set_errno_error(stats, "failed to dequeue V4L2 buffer"); video_pipeline_set_errno_error(stats, "failed to dequeue V4L2 buffer");
goto cleanup; goto cleanup;
} }
if (config->enable_timing_logs) { capture_end_ms = video_pipeline_now_ms();
capture_end_ms = video_pipeline_now_ms(); decode_start_ms = capture_end_ms;
decode_start_ms = capture_end_ms;
}
if (decode_mjpeg_frame(decoder, (const uint8_t *) buffers[buf.index].start, (int) buf.bytesused, &decoded_frame) != 0) { if (decode_mjpeg_frame(decoder, (const uint8_t *) buffers[buf.index].start, (int) buf.bytesused, &decoded_frame) != 0) {
if (config->enable_timing_logs) { if (config->enable_timing_logs) {
@@ -995,10 +1118,8 @@ int video_pipeline_run(const video_pipeline_config_t *config, video_pipeline_sta
(void) ioctl(fd, VIDIOC_QBUF, &buf); (void) ioctl(fd, VIDIOC_QBUF, &buf);
continue; continue;
} }
if (config->enable_timing_logs) { decode_end_ms = video_pipeline_now_ms();
decode_end_ms = video_pipeline_now_ms(); scale_start_ms = decode_end_ms;
scale_start_ms = decode_end_ms;
}
if ( if (
ensure_scale_context( ensure_scale_context(
&sws_ctx, &sws_ctx,
@@ -1018,10 +1139,8 @@ int video_pipeline_run(const video_pipeline_config_t *config, video_pipeline_sta
(void) ioctl(fd, VIDIOC_QBUF, &buf); (void) ioctl(fd, VIDIOC_QBUF, &buf);
continue; continue;
} }
if (config->enable_timing_logs) { scale_end_ms = video_pipeline_now_ms();
scale_end_ms = video_pipeline_now_ms(); encode_start_ms = scale_end_ms;
encode_start_ms = scale_end_ms;
}
if (encode_frame(encoder, scaled_frame, &encoded_pkt) != 0) { if (encode_frame(encoder, scaled_frame, &encoded_pkt) != 0) {
if (config->enable_timing_logs) { if (config->enable_timing_logs) {
video_pipeline_print_timing_failure(frame_number, "encode"); video_pipeline_print_timing_failure(frame_number, "encode");
@@ -1031,10 +1150,8 @@ int video_pipeline_run(const video_pipeline_config_t *config, video_pipeline_sta
(void) ioctl(fd, VIDIOC_QBUF, &buf); (void) ioctl(fd, VIDIOC_QBUF, &buf);
continue; continue;
} }
if (config->enable_timing_logs) { encode_end_ms = video_pipeline_now_ms();
encode_end_ms = video_pipeline_now_ms(); send_start_ms = encode_end_ms;
send_start_ms = encode_end_ms;
}
{ {
gps_video_sample_t gps_sample = get_latest_gps_for_video(); gps_video_sample_t gps_sample = get_latest_gps_for_video();
@@ -1186,7 +1303,13 @@ int video_pipeline_run(const video_pipeline_config_t *config, video_pipeline_sta
continue; continue;
} }
if (video_sender_send_packet(&sender, encoded_pkt, &packet_metadata) != 0) { capture_to_send_ms = send_start_ms <= capture_start_ms
? 0U
: (uint32_t) (send_start_ms - capture_start_ms + 0.5);
packet_metadata.capture_to_send_ms = capture_to_send_ms;
out_segs_before_send = transport_stats.out_segs_total;
if (video_sender_send_packet(&sender, encoded_pkt, &packet_metadata, &frame_seq) != 0) {
pthread_mutex_lock(&stats->mutex); pthread_mutex_lock(&stats->mutex);
stats->send_errors += 1; stats->send_errors += 1;
pthread_mutex_unlock(&stats->mutex); pthread_mutex_unlock(&stats->mutex);
@@ -1200,19 +1323,43 @@ int video_pipeline_run(const video_pipeline_config_t *config, video_pipeline_sta
(void) ioctl(fd, VIDIOC_QBUF, &buf); (void) ioctl(fd, VIDIOC_QBUF, &buf);
goto cleanup; goto cleanup;
} }
if (config->enable_timing_logs) { send_end_ms = video_pipeline_now_ms();
send_end_ms = video_pipeline_now_ms(); should_log_stage = video_stage_logger_should_log(config->stage_logger, frame_seq);
if (should_log_stage) {
kcp_client_runtime_stats_snapshot(sender.client, &transport_after_send);
out_segs_after_send = transport_after_send.out_segs_total;
} else {
transport_after_send = transport_stats;
out_segs_after_send = out_segs_before_send;
} }
video_pipeline_note_capture_to_send(stats, capture_to_send_ms);
pthread_mutex_lock(&stats->mutex); pthread_mutex_lock(&stats->mutex);
stats->frames_sent += 1; stats->frames_sent += 1;
stats->bytes_sent += (uint64_t) encoded_pkt->size; stats->bytes_sent += (uint64_t) encoded_pkt->size;
stats->last_frame_bytes = (uint64_t) encoded_pkt->size; stats->last_frame_bytes = (uint64_t) encoded_pkt->size;
kcp_client_runtime_stats_snapshot(sender.client, &stats->transport); stats->transport = transport_after_send;
pthread_mutex_unlock(&stats->mutex); pthread_mutex_unlock(&stats->mutex);
have_sent_frame = 1; have_sent_frame = 1;
last_successful_send_ms = omni_now_millis32(); last_successful_send_ms = omni_now_millis32();
soft_drops_since_last_send = 0; soft_drops_since_last_send = 0;
if (should_log_stage) {
video_stage_logger_log_frame(
config->stage_logger,
frame_seq,
capture_end_ms - capture_start_ms,
decode_end_ms - decode_start_ms,
scale_end_ms - scale_start_ms,
encode_end_ms - encode_start_ms,
send_end_ms - send_start_ms,
send_end_ms - total_start_ms,
(size_t) encoded_pkt->size,
out_segs_after_send >= out_segs_before_send ? out_segs_after_send - out_segs_before_send : 0U,
video_sender_backlog_segments(&transport_after_send),
transport_after_send.window_pressure_pct,
transport_after_send.srtt_ms
);
}
if (config->enable_timing_logs) { if (config->enable_timing_logs) {
video_pipeline_print_timing_row( video_pipeline_print_timing_row(
frame_number, frame_number,