feat: 增加链路统计信息,两个链路分别显示在前端,D向A汇报D与B的信息

This commit is contained in:
Mock
2026-04-09 13:38:10 +08:00
parent e72f7f3fd9
commit 11e67282c7
19 changed files with 573 additions and 40 deletions

View File

@@ -62,6 +62,7 @@ struct kcp_conn {
int stats_thread_started;
kcp_conn_options_t options;
int update_interval_ms;
atomic_uint_fast64_t total_out_segs;
uint64_t pending_bytes_sent;
uint64_t pending_bytes_received;
uint64_t pending_in_pkts;
@@ -129,6 +130,10 @@ struct kcp_process_sampler {
uint64_t prev_out_segs;
uint64_t prev_in_errs;
uint64_t prev_kcp_in_errs;
uint64_t prev_retrans_segs;
uint64_t prev_fast_retrans_segs;
uint64_t prev_lost_segs;
uint64_t prev_repeat_segs;
atomic_uint_fast64_t bytes_sent;
atomic_uint_fast64_t bytes_received;
atomic_uint_fast64_t in_pkts;
@@ -185,6 +190,20 @@ void kcp_conn_options_set_video_defaults(kcp_conn_options_t *options) {
options->mtu = KCP_VIDEO_MTU;
}
void kcp_conn_options_set_telemetry_defaults(kcp_conn_options_t *options) {
if (options == NULL) {
return;
}
memset(options, 0, sizeof(*options));
options->nodelay = KCP_TELEMETRY_NODELAY;
options->interval_ms = KCP_TELEMETRY_INTERVAL_MS;
options->resend = KCP_TELEMETRY_RESEND;
options->nc = KCP_TELEMETRY_NC;
options->sndwnd = KCP_TELEMETRY_SND_WND;
options->rcvwnd = KCP_TELEMETRY_RCV_WND;
options->mtu = KCP_TELEMETRY_MTU;
}
static int kcp_conn_validate_options(const kcp_conn_options_t *options) {
if (options == NULL) {
errno = EINVAL;
@@ -328,6 +347,7 @@ static void kcp_conn_record_send(kcp_conn_t *conn, int packet_bytes, size_t segm
if (conn == NULL) {
return;
}
atomic_fetch_add_explicit(&conn->total_out_segs, (uint64_t) segments, memory_order_relaxed);
if (conn->process_sampler != NULL) {
kcp_process_sampler_record_send(conn->process_sampler, packet_bytes, segments);
return;
@@ -420,7 +440,14 @@ static void kcp_process_sampler_remove_conn(kcp_process_sampler_t *sampler, kcp_
}
}
static void kcp_process_sampler_collect_gauges(kcp_process_sampler_t *sampler, uint64_t *snd_queue, uint64_t *rcv_queue, uint64_t *snd_buffer) {
static void kcp_process_sampler_collect_gauges(kcp_process_sampler_t *sampler,
uint64_t *snd_queue,
uint64_t *rcv_queue,
uint64_t *snd_buffer,
uint64_t *retrans_segs,
uint64_t *fast_retrans_segs,
uint64_t *lost_segs,
uint64_t *repeat_segs) {
kcp_conn_t *conn;
if (snd_queue != NULL) {
@@ -432,6 +459,18 @@ static void kcp_process_sampler_collect_gauges(kcp_process_sampler_t *sampler, u
if (snd_buffer != NULL) {
*snd_buffer = 0;
}
if (retrans_segs != NULL) {
*retrans_segs = 0;
}
if (fast_retrans_segs != NULL) {
*fast_retrans_segs = 0;
}
if (lost_segs != NULL) {
*lost_segs = 0;
}
if (repeat_segs != NULL) {
*repeat_segs = 0;
}
if (sampler == NULL) {
return;
}
@@ -449,6 +488,18 @@ static void kcp_process_sampler_collect_gauges(kcp_process_sampler_t *sampler, u
if (snd_buffer != NULL) {
*snd_buffer += conn->kcp->nsnd_buf;
}
if (lost_segs != NULL) {
*lost_segs += conn->kcp->timeout_retrans_total;
}
if (fast_retrans_segs != NULL) {
*fast_retrans_segs += conn->kcp->fast_retrans_total;
}
if (retrans_segs != NULL) {
*retrans_segs += conn->kcp->timeout_retrans_total + conn->kcp->fast_retrans_total;
}
if (repeat_segs != NULL) {
*repeat_segs += conn->kcp->duplicate_recv_total;
}
}
pthread_mutex_unlock(&conn->kcp_mu);
}
@@ -468,6 +519,10 @@ static void kcp_process_sampler_log_snapshot(kcp_process_sampler_t *sampler, con
uint64_t snd_queue = 0;
uint64_t rcv_queue = 0;
uint64_t snd_buffer = 0;
uint64_t retrans_segs = 0;
uint64_t fast_retrans_segs = 0;
uint64_t lost_segs = 0;
uint64_t repeat_segs = 0;
if (sampler == NULL || sampler->logger == NULL) {
return;
@@ -481,7 +536,15 @@ static void kcp_process_sampler_log_snapshot(kcp_process_sampler_t *sampler, con
out_segs = atomic_load_explicit(&sampler->out_segs, memory_order_relaxed);
in_errs = atomic_load_explicit(&sampler->in_errs, memory_order_relaxed);
kcp_in_errs = atomic_load_explicit(&sampler->kcp_in_errs, memory_order_relaxed);
kcp_process_sampler_collect_gauges(sampler, &snd_queue, &rcv_queue, &snd_buffer);
kcp_process_sampler_collect_gauges(
sampler,
&snd_queue,
&rcv_queue,
&snd_buffer,
&retrans_segs,
&fast_retrans_segs,
&lost_segs,
&repeat_segs);
memset(&record, 0, sizeof(record));
snprintf(record.record_type, sizeof(record.record_type), "%s", KCP_SESSION_STATS_RECORD_PROCESS_SAMPLE);
@@ -502,6 +565,14 @@ static void kcp_process_sampler_log_snapshot(kcp_process_sampler_t *sampler, con
record.in_segs = kcp_counter_diff(sampler->prev_in_segs, in_segs);
record.has_out_segs = 1;
record.out_segs = kcp_counter_diff(sampler->prev_out_segs, out_segs);
record.has_retrans_segs = 1;
record.retrans_segs = kcp_counter_diff(sampler->prev_retrans_segs, retrans_segs);
record.has_fast_retrans_segs = 1;
record.fast_retrans_segs = kcp_counter_diff(sampler->prev_fast_retrans_segs, fast_retrans_segs);
record.has_lost_segs = 1;
record.lost_segs = kcp_counter_diff(sampler->prev_lost_segs, lost_segs);
record.has_repeat_segs = 1;
record.repeat_segs = kcp_counter_diff(sampler->prev_repeat_segs, repeat_segs);
record.has_in_errs = 1;
record.in_errs = kcp_counter_diff(sampler->prev_in_errs, in_errs);
record.has_kcp_in_errs = 1;
@@ -521,6 +592,10 @@ static void kcp_process_sampler_log_snapshot(kcp_process_sampler_t *sampler, con
sampler->prev_out_pkts = out_pkts;
sampler->prev_in_segs = in_segs;
sampler->prev_out_segs = out_segs;
sampler->prev_retrans_segs = retrans_segs;
sampler->prev_fast_retrans_segs = fast_retrans_segs;
sampler->prev_lost_segs = lost_segs;
sampler->prev_repeat_segs = repeat_segs;
sampler->prev_in_errs = in_errs;
sampler->prev_kcp_in_errs = kcp_in_errs;
@@ -1006,6 +1081,11 @@ static void kcp_log_session_snapshot(kcp_conn_t *conn, const char *reason) {
socklen_t local_len = sizeof(local_addr);
char local_text[OMNI_MAX_ADDR_TEXT];
char remote_text[OMNI_MAX_ADDR_TEXT];
uint32_t inflight = 0;
uint32_t window_limit = 0;
uint64_t out_segs_total = 0;
uint64_t fast_retrans_total = 0;
uint64_t lost_total = 0;
if (conn == NULL || conn->stats_logger == NULL || conn->sock_state == NULL || conn->kcp == NULL) {
return;
}
@@ -1029,7 +1109,38 @@ static void kcp_log_session_snapshot(kcp_conn_t *conn, const char *reason) {
record.srtt_ms = conn->kcp->rx_srtt;
record.has_srttvar_ms = 1;
record.srttvar_ms = conn->kcp->rx_rttval;
record.has_snd_wnd = 1;
record.snd_wnd = conn->kcp->snd_wnd;
record.has_rmt_wnd = 1;
record.rmt_wnd = conn->kcp->rmt_wnd;
inflight = conn->kcp->snd_nxt - conn->kcp->snd_una;
window_limit = conn->kcp->snd_wnd < conn->kcp->rmt_wnd ? conn->kcp->snd_wnd : conn->kcp->rmt_wnd;
record.has_inflight = 1;
record.inflight = inflight;
record.has_window_limit = 1;
record.window_limit = window_limit;
record.has_window_pressure_pct = 1;
record.window_pressure_pct = window_limit == 0 ? 0.0 : ((double) inflight * 100.0) / (double) window_limit;
record.has_ring_buffer_snd_queue = 1;
record.ring_buffer_snd_queue = conn->kcp->nsnd_que;
record.has_ring_buffer_rcv_queue = 1;
record.ring_buffer_rcv_queue = conn->kcp->nrcv_que;
record.has_ring_buffer_snd_buffer = 1;
record.ring_buffer_snd_buffer = conn->kcp->nsnd_buf;
lost_total = conn->kcp->timeout_retrans_total;
fast_retrans_total = conn->kcp->fast_retrans_total;
record.has_retrans_segs = 1;
record.retrans_segs = lost_total + fast_retrans_total;
record.has_fast_retrans_segs = 1;
record.fast_retrans_segs = fast_retrans_total;
record.has_lost_segs = 1;
record.lost_segs = lost_total;
record.has_repeat_segs = 1;
record.repeat_segs = conn->kcp->duplicate_recv_total;
pthread_mutex_unlock(&conn->kcp_mu);
out_segs_total = atomic_load_explicit(&conn->total_out_segs, memory_order_relaxed);
record.has_out_segs = 1;
record.out_segs = out_segs_total;
(void) kcp_session_stats_log(conn->stats_logger, &record);
}
@@ -1788,6 +1899,18 @@ int kcp_conn_local_addr(const kcp_conn_t *conn, struct sockaddr_storage *addr, s
return 0;
}
int kcp_conn_remote_addr(const kcp_conn_t *conn, struct sockaddr_storage *addr, socklen_t *addr_len) {
if (conn == NULL || addr == NULL || addr_len == NULL) {
errno = EINVAL;
return -1;
}
if (conn->remote_addr_len == 0) {
errno = ENOTCONN;
return -1;
}
return omni_clone_sockaddr((const struct sockaddr *) &conn->remote_addr, conn->remote_addr_len, addr, addr_len);
}
void kcp_conn_runtime_stats_snapshot(kcp_conn_t *conn, kcp_runtime_stats_t *out_stats) {
if (out_stats == NULL) {
return;
@@ -1805,9 +1928,21 @@ void kcp_conn_runtime_stats_snapshot(kcp_conn_t *conn, kcp_runtime_stats_t *out_
out_stats->rto_ms = conn->kcp->rx_rto;
out_stats->srtt_ms = conn->kcp->rx_srtt;
out_stats->srttvar_ms = conn->kcp->rx_rttval;
out_stats->snd_wnd = conn->kcp->snd_wnd;
out_stats->rmt_wnd = conn->kcp->rmt_wnd;
out_stats->inflight = conn->kcp->snd_nxt - conn->kcp->snd_una;
out_stats->window_limit = conn->kcp->snd_wnd < conn->kcp->rmt_wnd ? conn->kcp->snd_wnd : conn->kcp->rmt_wnd;
out_stats->window_pressure_pct = out_stats->window_limit == 0
? 0.0
: ((double) out_stats->inflight * 100.0) / (double) out_stats->window_limit;
out_stats->snd_queue = conn->kcp->nsnd_que;
out_stats->rcv_queue = conn->kcp->nrcv_que;
out_stats->snd_buffer = conn->kcp->nsnd_buf;
out_stats->out_segs_total = atomic_load_explicit(&conn->total_out_segs, memory_order_relaxed);
out_stats->fast_retrans_total = conn->kcp->fast_retrans_total;
out_stats->lost_total = conn->kcp->timeout_retrans_total;
out_stats->retrans_total = out_stats->lost_total + out_stats->fast_retrans_total;
out_stats->repeat_total = conn->kcp->duplicate_recv_total;
out_stats->xmit_total = conn->kcp->xmit;
} else {
out_stats->connected = 0;