fix: C端日志采样、每条 KCP 连接每秒 1 次 periodic snapshot

This commit is contained in:
2026-04-16 11:32:48 +08:00
parent d64329214d
commit 1c845ba51e
4 changed files with 53 additions and 20 deletions

View File

@@ -37,6 +37,8 @@ BLITZ_JSONL_FLUSH_INTERVAL_MS="1000"
BLITZ_JSONL_FLUSH_BYTES="262144"
BLITZ_JSONL_ROTATE_BYTES="134217728"
BLITZ_JSONL_ROTATE_FILES="8"
# Log one normal relay packet out of every N packets. Drop events still log immediately.
OMNI_RELAY_PACKET_LOG_SAMPLE_EVERY="200"
BLITZ_INCIDENT_COMMAND_TIMEOUT_SEC="5"
BLITZ_INCIDENT_TOTAL_TIMEOUT_SEC="30"
BLITZ_NETWORK_FAIL_THRESHOLD="3"

View File

@@ -10,6 +10,8 @@ CONTROL_SIDE_OMNISOCKET_RELAY_VIA="81.70.156.140:10909"
ROBOT_SIDE_OMNISOCKET_SERVER_ADDR="81.70.156.140:10909"
ROBOT_SIDE_OMNISOCKET_RELAY_VIA="106.55.173.235:10909"
# Log one normal relay packet out of every N packets. Drop events still log immediately.
OMNI_RELAY_PACKET_LOG_SAMPLE_EVERY="200"
CONTROL_WS_ALLOWED_ORIGINS="http://127.0.0.1:5173,http://localhost:5173"
VITE_API_BASE_URL="http://127.0.0.1:8001"

View File

@@ -1,10 +1,14 @@
#include "server_udp_relay.h"
#include <arpa/inet.h>
#include <stdatomic.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#define UDP_RELAY_BUF_SIZE (64U * 1024U)
#define UDP_RELAY_ROUTE_TIMEOUT_MS 30000U
#define UDP_RELAY_DEFAULT_PACKET_LOG_SAMPLE_EVERY 200U
struct udp_relay {
int downstream_fd;
@@ -20,6 +24,8 @@ struct udp_relay {
struct udp_relay_route *routes;
pthread_mutex_t lock;
pthread_mutex_t log_mu;
unsigned int packet_log_sample_every;
atomic_ullong packet_log_counter;
pthread_mutex_t state_mu;
pthread_cond_t state_cond;
pthread_t downstream_thread;
@@ -48,6 +54,44 @@ static uint32_t udp_relay_elapsed_ms(uint32_t now_ms, uint32_t then_ms) {
return now_ms - then_ms;
}
static unsigned int udp_relay_packet_log_sample_every(void) {
const char *raw = getenv("OMNI_RELAY_PACKET_LOG_SAMPLE_EVERY");
unsigned long parsed;
char *endptr = NULL;
if (raw == NULL || raw[0] == '\0') {
return UDP_RELAY_DEFAULT_PACKET_LOG_SAMPLE_EVERY;
}
parsed = strtoul(raw, &endptr, 10);
if (endptr == raw || *endptr != '\0') {
return UDP_RELAY_DEFAULT_PACKET_LOG_SAMPLE_EVERY;
}
return (unsigned int) parsed;
}
static int udp_relay_event_should_always_log(const char *event_name) {
return event_name != NULL && strstr(event_name, "_drop_") != NULL;
}
static int udp_relay_should_log_packet(udp_relay_t *relay, const char *event_name) {
unsigned long long seq;
if (relay == NULL) {
return 0;
}
if (udp_relay_event_should_always_log(event_name)) {
return 1;
}
if (relay->packet_log_sample_every == 0U) {
return 0;
}
if (relay->packet_log_sample_every == 1U) {
return 1;
}
seq = atomic_fetch_add_explicit(&relay->packet_log_counter, 1U, memory_order_relaxed) + 1U;
return (seq % (unsigned long long) relay->packet_log_sample_every) == 0U;
}
static void udp_relay_parse_kcp_summary(const uint8_t *packet, size_t len, int *has_conv, uint32_t *conv, size_t *segment_count) {
size_t offset = 0;
size_t count = 0;
@@ -99,6 +143,9 @@ static void udp_relay_print_packet(udp_relay_t *relay, const char *event_name, c
if (relay == NULL) {
return;
}
if (!udp_relay_should_log_packet(relay, event_name)) {
return;
}
if (remote_addr != NULL && remote_addr_len > 0) {
omni_sockaddr_to_string((const struct sockaddr *) remote_addr, remote_addr_len, remote_addr_text, sizeof(remote_addr_text));
@@ -461,6 +508,8 @@ udp_relay_t *udp_relay_open(const char *listen_addr, const char *upstream_addr)
}
pthread_mutex_init(&relay->lock, NULL);
pthread_mutex_init(&relay->log_mu, NULL);
relay->packet_log_sample_every = udp_relay_packet_log_sample_every();
atomic_init(&relay->packet_log_counter, 0U);
pthread_mutex_init(&relay->state_mu, NULL);
pthread_cond_init(&relay->state_cond, NULL);
return relay;

View File

@@ -729,20 +729,6 @@ static void kcp_process_sampler_release(kcp_process_sampler_t *sampler) {
free(sampler);
}
static void kcp_process_sampler_request_sample(kcp_process_sampler_t *sampler, const char *reason) {
if (sampler == NULL) {
return;
}
pthread_mutex_lock(&sampler->lock);
if (!sampler->stopped && !sampler->request_pending) {
sampler->request_pending = 1;
sampler->pending_request_id++;
snprintf(sampler->pending_reason, sizeof(sampler->pending_reason), "%s", reason == NULL ? "" : reason);
pthread_cond_broadcast(&sampler->cond);
}
pthread_mutex_unlock(&sampler->lock);
}
static void kcp_process_sampler_request_sample_and_wait(kcp_process_sampler_t *sampler, const char *reason) {
uint64_t request_id;
@@ -1771,8 +1757,6 @@ int kcp_conn_send(kcp_conn_t *conn, const message_t *msg) {
return -1;
}
latencylog_log_message_event(conn->logger, conn->node_role, conn->node_id, EVENT_SEND_HANDOFF_BEGIN, msg);
kcp_log_session_snapshot(conn, "send_handoff_begin");
kcp_process_sampler_request_sample(conn->process_sampler, "send_handoff_begin");
pthread_mutex_lock(&conn->kcp_mu);
atomic_store(&conn->sock_state->last_send_errno, 0);
conn->kcp->current = omni_now_millis32();
@@ -1791,8 +1775,6 @@ int kcp_conn_send(kcp_conn_t *conn, const message_t *msg) {
free(frame);
return -1;
}
kcp_log_session_snapshot(conn, "send_handoff_end");
kcp_process_sampler_request_sample(conn->process_sampler, "send_handoff_end");
latencylog_log_message_event(conn->logger, conn->node_role, conn->node_id, EVENT_SEND_HANDOFF_END, msg);
free(frame);
return 0;
@@ -1835,8 +1817,6 @@ int kcp_conn_receive_timed(kcp_conn_t *conn, message_t *out_msg, int timeout_ms)
return -1;
}
free(frame);
kcp_log_session_snapshot(conn, "receive");
kcp_process_sampler_request_sample(conn->process_sampler, "receive");
return 0;
}
pthread_mutex_lock(&conn->kcp_mu);