From 1c845ba51efe62b4af9408c0ec27f5bad12d274f Mon Sep 17 00:00:00 2001 From: Mock Date: Thu, 16 Apr 2026 11:32:48 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20C=E7=AB=AF=E6=97=A5=E5=BF=97=E9=87=87?= =?UTF-8?q?=E6=A0=B7=E3=80=81=E6=AF=8F=E6=9D=A1=20KCP=20=E8=BF=9E=E6=8E=A5?= =?UTF-8?q?=E6=AF=8F=E7=A7=92=201=20=E6=AC=A1=20periodic=20snapshot?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- scripts/boot/robot-boot.env | 2 ++ scripts/dev/robot-remote.env | 2 ++ src/server_udp_relay.c | 49 ++++++++++++++++++++++++++++++++++++ src/transport_kcp.c | 20 --------------- 4 files changed, 53 insertions(+), 20 deletions(-) diff --git a/scripts/boot/robot-boot.env b/scripts/boot/robot-boot.env index 890798b..62ae87c 100644 --- a/scripts/boot/robot-boot.env +++ b/scripts/boot/robot-boot.env @@ -37,6 +37,8 @@ BLITZ_JSONL_FLUSH_INTERVAL_MS="1000" BLITZ_JSONL_FLUSH_BYTES="262144" BLITZ_JSONL_ROTATE_BYTES="134217728" BLITZ_JSONL_ROTATE_FILES="8" +# Log one normal relay packet out of every N packets. Drop events still log immediately. +OMNI_RELAY_PACKET_LOG_SAMPLE_EVERY="200" BLITZ_INCIDENT_COMMAND_TIMEOUT_SEC="5" BLITZ_INCIDENT_TOTAL_TIMEOUT_SEC="30" BLITZ_NETWORK_FAIL_THRESHOLD="3" diff --git a/scripts/dev/robot-remote.env b/scripts/dev/robot-remote.env index 54413f4..cc3ba57 100644 --- a/scripts/dev/robot-remote.env +++ b/scripts/dev/robot-remote.env @@ -10,6 +10,8 @@ CONTROL_SIDE_OMNISOCKET_RELAY_VIA="81.70.156.140:10909" ROBOT_SIDE_OMNISOCKET_SERVER_ADDR="81.70.156.140:10909" ROBOT_SIDE_OMNISOCKET_RELAY_VIA="106.55.173.235:10909" +# Log one normal relay packet out of every N packets. Drop events still log immediately. +OMNI_RELAY_PACKET_LOG_SAMPLE_EVERY="200" CONTROL_WS_ALLOWED_ORIGINS="http://127.0.0.1:5173,http://localhost:5173" VITE_API_BASE_URL="http://127.0.0.1:8001" diff --git a/src/server_udp_relay.c b/src/server_udp_relay.c index ad201fb..c562df1 100644 --- a/src/server_udp_relay.c +++ b/src/server_udp_relay.c @@ -1,10 +1,14 @@ #include "server_udp_relay.h" #include +#include +#include +#include #include #define UDP_RELAY_BUF_SIZE (64U * 1024U) #define UDP_RELAY_ROUTE_TIMEOUT_MS 30000U +#define UDP_RELAY_DEFAULT_PACKET_LOG_SAMPLE_EVERY 200U struct udp_relay { int downstream_fd; @@ -20,6 +24,8 @@ struct udp_relay { struct udp_relay_route *routes; pthread_mutex_t lock; pthread_mutex_t log_mu; + unsigned int packet_log_sample_every; + atomic_ullong packet_log_counter; pthread_mutex_t state_mu; pthread_cond_t state_cond; pthread_t downstream_thread; @@ -48,6 +54,44 @@ static uint32_t udp_relay_elapsed_ms(uint32_t now_ms, uint32_t then_ms) { return now_ms - then_ms; } +static unsigned int udp_relay_packet_log_sample_every(void) { + const char *raw = getenv("OMNI_RELAY_PACKET_LOG_SAMPLE_EVERY"); + unsigned long parsed; + char *endptr = NULL; + + if (raw == NULL || raw[0] == '\0') { + return UDP_RELAY_DEFAULT_PACKET_LOG_SAMPLE_EVERY; + } + parsed = strtoul(raw, &endptr, 10); + if (endptr == raw || *endptr != '\0') { + return UDP_RELAY_DEFAULT_PACKET_LOG_SAMPLE_EVERY; + } + return (unsigned int) parsed; +} + +static int udp_relay_event_should_always_log(const char *event_name) { + return event_name != NULL && strstr(event_name, "_drop_") != NULL; +} + +static int udp_relay_should_log_packet(udp_relay_t *relay, const char *event_name) { + unsigned long long seq; + + if (relay == NULL) { + return 0; + } + if (udp_relay_event_should_always_log(event_name)) { + return 1; + } + if (relay->packet_log_sample_every == 0U) { + return 0; + } + if (relay->packet_log_sample_every == 1U) { + return 1; + } + seq = atomic_fetch_add_explicit(&relay->packet_log_counter, 1U, memory_order_relaxed) + 1U; + return (seq % (unsigned long long) relay->packet_log_sample_every) == 0U; +} + static void udp_relay_parse_kcp_summary(const uint8_t *packet, size_t len, int *has_conv, uint32_t *conv, size_t *segment_count) { size_t offset = 0; size_t count = 0; @@ -99,6 +143,9 @@ static void udp_relay_print_packet(udp_relay_t *relay, const char *event_name, c if (relay == NULL) { return; } + if (!udp_relay_should_log_packet(relay, event_name)) { + return; + } if (remote_addr != NULL && remote_addr_len > 0) { omni_sockaddr_to_string((const struct sockaddr *) remote_addr, remote_addr_len, remote_addr_text, sizeof(remote_addr_text)); @@ -461,6 +508,8 @@ udp_relay_t *udp_relay_open(const char *listen_addr, const char *upstream_addr) } pthread_mutex_init(&relay->lock, NULL); pthread_mutex_init(&relay->log_mu, NULL); + relay->packet_log_sample_every = udp_relay_packet_log_sample_every(); + atomic_init(&relay->packet_log_counter, 0U); pthread_mutex_init(&relay->state_mu, NULL); pthread_cond_init(&relay->state_cond, NULL); return relay; diff --git a/src/transport_kcp.c b/src/transport_kcp.c index 8fd21e1..b4b93d1 100644 --- a/src/transport_kcp.c +++ b/src/transport_kcp.c @@ -729,20 +729,6 @@ static void kcp_process_sampler_release(kcp_process_sampler_t *sampler) { free(sampler); } -static void kcp_process_sampler_request_sample(kcp_process_sampler_t *sampler, const char *reason) { - if (sampler == NULL) { - return; - } - pthread_mutex_lock(&sampler->lock); - if (!sampler->stopped && !sampler->request_pending) { - sampler->request_pending = 1; - sampler->pending_request_id++; - snprintf(sampler->pending_reason, sizeof(sampler->pending_reason), "%s", reason == NULL ? "" : reason); - pthread_cond_broadcast(&sampler->cond); - } - pthread_mutex_unlock(&sampler->lock); -} - static void kcp_process_sampler_request_sample_and_wait(kcp_process_sampler_t *sampler, const char *reason) { uint64_t request_id; @@ -1771,8 +1757,6 @@ int kcp_conn_send(kcp_conn_t *conn, const message_t *msg) { return -1; } latencylog_log_message_event(conn->logger, conn->node_role, conn->node_id, EVENT_SEND_HANDOFF_BEGIN, msg); - kcp_log_session_snapshot(conn, "send_handoff_begin"); - kcp_process_sampler_request_sample(conn->process_sampler, "send_handoff_begin"); pthread_mutex_lock(&conn->kcp_mu); atomic_store(&conn->sock_state->last_send_errno, 0); conn->kcp->current = omni_now_millis32(); @@ -1791,8 +1775,6 @@ int kcp_conn_send(kcp_conn_t *conn, const message_t *msg) { free(frame); return -1; } - kcp_log_session_snapshot(conn, "send_handoff_end"); - kcp_process_sampler_request_sample(conn->process_sampler, "send_handoff_end"); latencylog_log_message_event(conn->logger, conn->node_role, conn->node_id, EVENT_SEND_HANDOFF_END, msg); free(frame); return 0; @@ -1835,8 +1817,6 @@ int kcp_conn_receive_timed(kcp_conn_t *conn, message_t *out_msg, int timeout_ms) return -1; } free(frame); - kcp_log_session_snapshot(conn, "receive"); - kcp_process_sampler_request_sample(conn->process_sampler, "receive"); return 0; } pthread_mutex_lock(&conn->kcp_mu);