feat:扩展了结构化日志输出

- 补充了传输层观测指标
  - 增加了 UDP 丢包相关统计
  - README 也同步更新了字段口径
This commit is contained in:
nnbcccscdscdsc
2026-03-17 20:53:43 +08:00
parent ed1cb20da8
commit 4b95d26f13
5 changed files with 823 additions and 12 deletions

View File

@@ -42,6 +42,11 @@ typedef struct PeerRecvFileState {
uint32_t total_chunks;
uint64_t total_bytes;
uint64_t bytes_written;
uint32_t received_chunks;
uint32_t observed_windows;
uint8_t *seq_seen;
uint32_t *recv_window_counts;
size_t recv_window_cap;
} PeerRecvFileState;
typedef struct PeerRuntime {
@@ -49,6 +54,7 @@ typedef struct PeerRuntime {
PeerTransportSession *session;
PeerMode mode;
OmniRole role;
OmniProtocol proto;
int running;
int direct_register_sent;
char client_id[OMNI_PEER_ID_SIZE];
@@ -175,11 +181,239 @@ static void peer_set_bound_peer(PeerRuntime *rt, const char *peer_id)
static void close_recv_file(PeerRuntime *rt)
{
if (!rt || !rt->rx_file.fp) {
if (!rt) {
return;
}
fclose(rt->rx_file.fp);
rt->rx_file.fp = NULL;
if (rt->rx_file.fp) {
fclose(rt->rx_file.fp);
rt->rx_file.fp = NULL;
}
free(rt->rx_file.seq_seen);
free(rt->rx_file.recv_window_counts);
memset(&rt->rx_file, 0, sizeof(rt->rx_file));
}
static int recv_file_prepare_tracking(PeerRecvFileState *state, uint32_t total_chunks)
{
if (!state) {
return OMNI_ERR_PARAM;
}
if (total_chunks == 0) {
return OMNI_OK;
}
state->seq_seen = (uint8_t *)calloc((size_t)total_chunks + 1u, sizeof(uint8_t));
if (!state->seq_seen) {
return OMNI_ERR_GENERIC;
}
return OMNI_OK;
}
static int recv_file_ensure_window_capacity(PeerRecvFileState *state, uint32_t window_id)
{
size_t need;
size_t new_cap;
uint32_t *new_counts;
if (!state) {
return OMNI_ERR_PARAM;
}
need = (size_t)window_id + 1u;
if (need <= state->recv_window_cap) {
return OMNI_OK;
}
new_cap = (state->recv_window_cap == 0) ? 4u : state->recv_window_cap;
while (new_cap < need) {
new_cap *= 2u;
}
new_counts = (uint32_t *)realloc(state->recv_window_counts,
new_cap * sizeof(uint32_t));
if (!new_counts) {
return OMNI_ERR_GENERIC;
}
memset(new_counts + state->recv_window_cap,
0,
(new_cap - state->recv_window_cap) * sizeof(uint32_t));
state->recv_window_counts = new_counts;
state->recv_window_cap = new_cap;
return OMNI_OK;
}
static void recv_file_note_chunk(PeerRecvFileState *state,
uint32_t seq,
uint32_t window_id)
{
if (!state || seq == 0) {
return;
}
if (recv_file_ensure_window_capacity(state, window_id) == OMNI_OK) {
if ((size_t)window_id < state->recv_window_cap) {
state->recv_window_counts[window_id]++;
}
if (window_id + 1u > state->observed_windows) {
state->observed_windows = window_id + 1u;
}
}
if (!state->seq_seen) {
state->received_chunks++;
return;
}
if (state->seq_seen && seq <= state->total_chunks && !state->seq_seen[seq]) {
state->seq_seen[seq] = 1;
state->received_chunks++;
}
}
static void append_csv_uint(char *buf, size_t buf_sz, uint32_t value)
{
size_t len;
if (!buf || buf_sz == 0) {
return;
}
len = strlen(buf);
if (len >= buf_sz - 1u) {
return;
}
snprintf(buf + len, buf_sz - len, "%s%u", len == 0 ? "" : ",", (unsigned)value);
}
static void append_range(char *buf, size_t buf_sz, uint32_t start, uint32_t end)
{
size_t len;
if (!buf || buf_sz == 0) {
return;
}
len = strlen(buf);
if (len >= buf_sz - 1u) {
return;
}
if (start == end) {
snprintf(buf + len, buf_sz - len, "%s%u", len == 0 ? "" : ",", (unsigned)start);
} else {
snprintf(buf + len, buf_sz - len, "%s%u-%u",
len == 0 ? "" : ",",
(unsigned)start,
(unsigned)end);
}
}
static void append_window_count(char *buf,
size_t buf_sz,
uint32_t window_id,
uint32_t count)
{
size_t len;
if (!buf || buf_sz == 0) {
return;
}
len = strlen(buf);
if (len >= buf_sz - 1u) {
return;
}
snprintf(buf + len, buf_sz - len, "%s%u:%u",
len == 0 ? "" : ",",
(unsigned)window_id,
(unsigned)count);
}
static void peer_record_udp_loss_summary(PeerRuntime *rt, uint32_t total_windows)
{
PeerRecvFileState *state;
uint64_t lost_chunks;
uint64_t burst_count = 0;
uint64_t burst_max_len = 0;
char ranges[OMNI_LOGGER_UDP_RANGES_SIZE];
char seq_sample[OMNI_LOGGER_UDP_SEQ_SAMPLE_SIZE];
char recv_window_dist[OMNI_LOGGER_UDP_WINDOW_DIST_SIZE];
uint32_t sample_count = 0;
if (!rt || rt->proto != OMNI_PROTO_UDP) {
return;
}
state = &rt->rx_file;
if (state->total_chunks == 0 || !state->seq_seen) {
return;
}
ranges[0] = '\0';
seq_sample[0] = '\0';
recv_window_dist[0] = '\0';
lost_chunks = 0;
for (uint32_t seq = 1; seq <= state->total_chunks; ++seq) {
if (state->seq_seen[seq]) {
continue;
}
{
uint32_t end = seq;
while (end + 1u <= state->total_chunks && !state->seq_seen[end + 1u]) {
end++;
}
lost_chunks += (uint64_t)(end - seq + 1u);
burst_count++;
if ((uint64_t)(end - seq + 1u) > burst_max_len) {
burst_max_len = (uint64_t)(end - seq + 1u);
}
append_range(ranges, sizeof(ranges), seq, end);
for (uint32_t missing = seq; missing <= end && sample_count < 16u; ++missing) {
append_csv_uint(seq_sample, sizeof(seq_sample), missing);
sample_count++;
}
seq = end;
}
}
for (uint32_t window_id = 0;
window_id < (uint32_t)state->recv_window_cap &&
window_id < total_windows;
++window_id) {
if (state->recv_window_counts[window_id] == 0) {
continue;
}
append_window_count(recv_window_dist,
sizeof(recv_window_dist),
window_id,
state->recv_window_counts[window_id]);
}
logger_on_udp_loss_summary(state->total_chunks,
state->received_chunks,
lost_chunks,
burst_count,
burst_max_len,
ranges,
seq_sample,
recv_window_dist);
}
static void peer_finalize_recv_observability(PeerRuntime *rt, uint32_t total_windows_hint)
{
uint32_t total_windows;
if (!rt || rt->proto != OMNI_PROTO_UDP) {
return;
}
if (rt->rx_file.transfer_id == 0 || rt->rx_file.total_chunks == 0) {
return;
}
total_windows = total_windows_hint;
if (total_windows == 0) {
total_windows = rt->rx_file.observed_windows;
}
if (total_windows == 0) {
total_windows = 1;
}
peer_record_udp_loss_summary(rt, total_windows);
}
static uint64_t compute_file_size(FILE *fp)
@@ -389,6 +623,8 @@ static int peer_send_file(PeerRuntime *rt,
uint32_t total_chunks;
uint32_t transfer_id;
uint32_t total_windows = 1;
uint64_t transfer_start_ms;
uint32_t max_window_id = 0;
int rc = OMNI_OK;
if (!file_path) {
@@ -419,10 +655,13 @@ static int peer_send_file(PeerRuntime *rt,
goto out;
}
logger_reset_transfer_observability();
logger_set_transfer_total(total_bytes);
logger_set_progress(0);
transfer_start_ms = omni_now_ms();
for (uint32_t seq = 1; rt->running && !g_stop; ++seq) {
uint64_t processing_t0 = omni_now_ms();
size_t nread = fread(chunk, 1, chunk_size, fp);
if (nread == 0) {
@@ -437,18 +676,24 @@ static int peer_send_file(PeerRuntime *rt,
if (nread > 0) {
TransferChunkMeta meta;
uint64_t chunk_origin_ts_ms = omni_now_ms();
uint32_t window_id = (uint32_t)((chunk_origin_ts_ms - transfer_start_ms) / 1000u);
omni_transfer_chunk_meta_encode(&meta,
transfer_id,
seq,
total_chunks,
0,
window_id,
total_bytes,
offset,
(uint32_t)nread,
omni_now_ms());
chunk_origin_ts_ms);
memcpy(payload, &meta, TRANSFER_CHUNK_META_SIZE);
memcpy(payload + TRANSFER_CHUNK_META_SIZE, chunk, nread);
if (window_id > max_window_id) {
max_window_id = window_id;
}
logger_on_processing_latency((double)(omni_now_ms() - processing_t0));
rc = peer_send_inner(rt,
effective_dst,
MSG_TYPE_FILE_CHUNK,
@@ -465,6 +710,7 @@ static int peer_send_file(PeerRuntime *rt,
if (rc == OMNI_OK && rt->running && !g_stop) {
TransferEndMeta end_meta;
total_windows = max_window_id + 1u;
omni_transfer_end_meta_encode(&end_meta,
transfer_id,
total_chunks,
@@ -587,6 +833,7 @@ static int peer_open_recv_file(PeerRuntime *rt, const TransferChunkMeta *meta)
}
if (!rt->rx_file.fp || rt->rx_file.transfer_id != meta->transfer_id) {
peer_finalize_recv_observability(rt, 0);
close_recv_file(rt);
rt->rx_file.fp = fopen(rt->output_path, "wb+");
if (!rt->rx_file.fp) {
@@ -596,6 +843,13 @@ static int peer_open_recv_file(PeerRuntime *rt, const TransferChunkMeta *meta)
rt->rx_file.total_chunks = meta->total_chunks;
rt->rx_file.total_bytes = meta->total_bytes;
rt->rx_file.bytes_written = 0;
if (recv_file_prepare_tracking(&rt->rx_file, meta->total_chunks) != OMNI_OK) {
logger_log("WARN", "peer",
"recv_tracking_alloc_failed transfer_id=%u total_chunks=%u",
(unsigned)meta->transfer_id,
(unsigned)meta->total_chunks);
}
logger_reset_transfer_observability();
logger_set_transfer_total(meta->total_bytes);
logger_set_progress(0);
}
@@ -608,6 +862,8 @@ static void handle_file_chunk_message(PeerRuntime *rt,
uint32_t payload_len)
{
TransferChunkMeta meta;
uint64_t processing_t0 = omni_now_ms();
uint64_t now_ms;
if (payload_len < TRANSFER_CHUNK_META_SIZE) {
logger_log("WARN", "peer", "short_file_chunk_payload len=%u", (unsigned)payload_len);
@@ -632,6 +888,8 @@ static void handle_file_chunk_message(PeerRuntime *rt,
return;
}
recv_file_note_chunk(&rt->rx_file, meta.seq, meta.window_id);
if (fseeko(rt->rx_file.fp, (off_t)meta.offset_bytes, SEEK_SET) != 0) {
logger_log("ERROR", "peer", "fseeko_failed offset=%llu errno=%d",
(unsigned long long)meta.offset_bytes,
@@ -645,6 +903,11 @@ static void handle_file_chunk_message(PeerRuntime *rt,
rt->rx_file.bytes_written += meta.chunk_bytes;
logger_set_progress(rt->rx_file.bytes_written);
now_ms = omni_now_ms();
logger_on_processing_latency((double)(now_ms - processing_t0));
if (meta.origin_ts_ms > 0 && now_ms >= meta.origin_ts_ms) {
logger_on_end_to_end_latency((double)(now_ms - meta.origin_ts_ms));
}
}
static void handle_file_end_message(PeerRuntime *rt,
@@ -669,6 +932,9 @@ static void handle_file_end_message(PeerRuntime *rt,
total_chunks = rt->rx_file.total_chunks ? rt->rx_file.total_chunks : end_meta.total_chunks;
total_bytes = rt->rx_file.total_bytes ? rt->rx_file.total_bytes : end_meta.total_bytes;
}
peer_finalize_recv_observability(rt,
end_meta.total_windows > 0 ? end_meta.total_windows
: rt->rx_file.observed_windows);
fprintf(stdout,
"[file %s -> %s] transfer_id=%u bytes_written=%llu total_bytes=%llu total_chunks=%u output=%s\n",
@@ -955,6 +1221,7 @@ static void handle_transport_event(PeerRuntime *rt,
rt->session = NULL;
rt->direct_register_sent = 0;
peer_set_bound_peer(rt, "");
peer_finalize_recv_observability(rt, 0);
close_recv_file(rt);
rt->rx_file.transfer_id = 0;
rt->rx_file.total_chunks = 0;
@@ -1114,6 +1381,7 @@ int main(int argc, char **argv)
memset(&rt, 0, sizeof(rt));
memset(&startup, 0, sizeof(startup));
rt.mode = mode;
rt.proto = proto;
rt.role = (mode == PEER_MODE_DIRECT && listen_port > 0) ? OMNI_ROLE_SERVER : OMNI_ROLE_CLIENT;
log_mode = (mode == PEER_MODE_DIRECT) ? "direct" : "hub";
log_role = (rt.role == OMNI_ROLE_SERVER) ? "server" : "client";
@@ -1220,6 +1488,7 @@ int main(int argc, char **argv)
}
}
peer_finalize_recv_observability(&rt, 0);
close_recv_file(&rt);
peer_transport_close(rt.transport);
logger_print_performance_log("final");

View File

@@ -29,6 +29,25 @@ static char g_ctx_mode[16];
static char g_ctx_role[16];
static char g_ctx_self_id[OMNI_PEER_ID_SIZE];
static void metric_reset(OmniMetricSummary *metric);
static void reset_transfer_observability_locked(void)
{
g_stats.total_work_bytes = 0;
g_stats.progress_bytes = 0;
metric_reset(&g_stats.processing_delay_ms);
metric_reset(&g_stats.end_to_end_delay_ms);
g_stats.udp_expected_chunks = 0;
g_stats.udp_received_chunks = 0;
g_stats.udp_lost_chunks = 0;
g_stats.udp_loss_burst_count = 0;
g_stats.udp_loss_burst_max_len = 0;
g_stats.udp_loss_rate_pct = 0.0;
g_stats.udp_loss_ranges[0] = '\0';
g_stats.udp_loss_seq_sample[0] = '\0';
g_stats.udp_recv_window_dist[0] = '\0';
}
/* 对 common.h 的时间接口做一层薄包装,统一 logger 模块内部的调用入口。 */
static uint64_t now_ms(void)
{
@@ -240,6 +259,7 @@ void logger_init(void)
metric_reset(&g_stats.send_buffer_pct);
metric_reset(&g_stats.recv_buffer_pct);
metric_reset(&g_stats.cwnd);
reset_transfer_observability_locked();
copy_context_field(g_ctx_app, sizeof(g_ctx_app), NULL, "unknown");
copy_context_field(g_ctx_proto, sizeof(g_ctx_proto), NULL, "unknown");
copy_context_field(g_ctx_mode, sizeof(g_ctx_mode), NULL, "unknown");
@@ -555,6 +575,43 @@ void logger_set_progress(uint64_t progress_bytes)
pthread_mutex_unlock(&g_mu);
}
void logger_reset_transfer_observability(void)
{
pthread_mutex_lock(&g_mu);
reset_transfer_observability_locked();
pthread_mutex_unlock(&g_mu);
}
void logger_on_udp_loss_summary(uint64_t expected_chunks,
uint64_t received_chunks,
uint64_t lost_chunks,
uint64_t burst_count,
uint64_t burst_max_len,
const char *ranges,
const char *seq_sample,
const char *recv_window_dist)
{
pthread_mutex_lock(&g_mu);
g_stats.udp_expected_chunks = expected_chunks;
g_stats.udp_received_chunks = received_chunks;
g_stats.udp_lost_chunks = lost_chunks;
g_stats.udp_loss_burst_count = burst_count;
g_stats.udp_loss_burst_max_len = burst_max_len;
g_stats.udp_loss_rate_pct = (expected_chunks > 0)
? ((double)lost_chunks * 100.0) / (double)expected_chunks
: 0.0;
omni_copy_fixed_ascii(g_stats.udp_loss_ranges,
sizeof(g_stats.udp_loss_ranges),
ranges ? ranges : "");
omni_copy_fixed_ascii(g_stats.udp_loss_seq_sample,
sizeof(g_stats.udp_loss_seq_sample),
seq_sample ? seq_sample : "");
omni_copy_fixed_ascii(g_stats.udp_recv_window_dist,
sizeof(g_stats.udp_recv_window_dist),
recv_window_dist ? recv_window_dist : "");
pthread_mutex_unlock(&g_mu);
}
/* 对外提供一个“当前总吞吐”的便捷计算接口。 */
double logger_calculate_throughput(void)
{
@@ -606,6 +663,9 @@ void logger_print_performance_log(const char *tag)
char ctx_mode[sizeof(g_ctx_mode)];
char ctx_role[sizeof(g_ctx_role)];
char ctx_self_id[sizeof(g_ctx_self_id)];
char udp_loss_ranges[sizeof(snapshot.udp_loss_ranges)];
char udp_loss_seq_sample[sizeof(snapshot.udp_loss_seq_sample)];
char udp_recv_window_dist[sizeof(snapshot.udp_recv_window_dist)];
pthread_mutex_lock(&g_mu);
now = now_ms();
@@ -626,8 +686,15 @@ void logger_print_performance_log(const char *tag)
omni_copy_fixed_ascii(ctx_mode, sizeof(ctx_mode), g_ctx_mode);
omni_copy_fixed_ascii(ctx_role, sizeof(ctx_role), g_ctx_role);
omni_copy_fixed_ascii(ctx_self_id, sizeof(ctx_self_id), g_ctx_self_id);
omni_copy_fixed_ascii(udp_loss_ranges, sizeof(udp_loss_ranges), snapshot.udp_loss_ranges);
omni_copy_fixed_ascii(udp_loss_seq_sample, sizeof(udp_loss_seq_sample), snapshot.udp_loss_seq_sample);
omni_copy_fixed_ascii(udp_recv_window_dist, sizeof(udp_recv_window_dist), snapshot.udp_recv_window_dist);
pthread_mutex_unlock(&g_mu);
udp_loss_ranges[sizeof(udp_loss_ranges) - 1u] = '\0';
udp_loss_seq_sample[sizeof(udp_loss_seq_sample) - 1u] = '\0';
udp_recv_window_dist[sizeof(udp_recv_window_dist) - 1u] = '\0';
progress_pct = 0.0;
if (snapshot.total_work_bytes > 0) {
progress_pct = ((double)snapshot.progress_bytes * 100.0) /
@@ -651,7 +718,7 @@ void logger_print_performance_log(const char *tag)
"processing_avg_ms=%.3f queue_avg_ms=%.3f transmission_avg_ms=%.3f propagation_avg_ms=%.3f end_to_end_avg_ms=%.3f "
"send_buffer_pct=%.2f recv_buffer_pct=%.2f cwnd=%.2f "
"last_rtt_ms=%llu min_rtt_ms=%llu max_rtt_ms=%llu "
"tcp_retrans=%llu tcp_data_segs_out=%llu tcp_data_bytes_sent=%llu tcp_retrans_bytes=%llu "
"tcp_retrans=%llu udp_retrans=%llu tcp_data_segs_out=%llu tcp_data_bytes_sent=%llu tcp_retrans_bytes=%llu "
"kcp_retrans=%llu kcp_data_segs_out=%llu kcp_data_bytes_sent=%llu kcp_retrans_bytes=%llu\n",
ctx_app[0] ? ctx_app : "unknown",
ctx_proto[0] ? ctx_proto : "unknown",
@@ -693,6 +760,7 @@ void logger_print_performance_log(const char *tag)
(unsigned long long)((snapshot.min_rtt_ms == UINT64_MAX) ? 0 : snapshot.min_rtt_ms),
(unsigned long long)snapshot.max_rtt_ms,
(unsigned long long)snapshot.tcp_retrans,
(unsigned long long)snapshot.udp_retrans,
(unsigned long long)snapshot.tcp_data_segs_out,
(unsigned long long)snapshot.tcp_data_bytes_sent,
(unsigned long long)snapshot.tcp_retrans_bytes,
@@ -700,10 +768,39 @@ void logger_print_performance_log(const char *tag)
(unsigned long long)snapshot.kcp_data_segs_out,
(unsigned long long)snapshot.kcp_data_bytes_sent,
(unsigned long long)snapshot.kcp_retrans_bytes);
if (snapshot.udp_expected_chunks > 0 || snapshot.udp_lost_chunks > 0) {
fprintf(fp,
"ts=%llu level=INFO component=perf_udp_loss app=%s proto=%s mode=%s role=%s self_id=%s "
"udp_expected_chunks=%llu udp_received_chunks=%llu udp_lost_chunks=%llu "
"udp_loss_rate_pct=%.2f udp_loss_burst_count=%llu udp_loss_burst_max_len=%llu "
"udp_loss_ranges=%s udp_loss_seq_sample=%s udp_recv_window_dist=%s\n",
(unsigned long long)now,
ctx_app[0] ? ctx_app : "unknown",
ctx_proto[0] ? ctx_proto : "unknown",
ctx_mode[0] ? ctx_mode : "unknown",
ctx_role[0] ? ctx_role : "unknown",
ctx_self_id[0] ? ctx_self_id : "-",
(unsigned long long)snapshot.udp_expected_chunks,
(unsigned long long)snapshot.udp_received_chunks,
(unsigned long long)snapshot.udp_lost_chunks,
snapshot.udp_loss_rate_pct,
(unsigned long long)snapshot.udp_loss_burst_count,
(unsigned long long)snapshot.udp_loss_burst_max_len,
udp_loss_ranges[0] ? udp_loss_ranges : "-",
udp_loss_seq_sample[0] ? udp_loss_seq_sample : "-",
udp_recv_window_dist[0] ? udp_recv_window_dist : "-");
}
}
if (g_json_fp) {
/* JSONL 输出便于后续脚本或可视化工具离线分析。 */
char esc_udp_ranges[sizeof(snapshot.udp_loss_ranges) * 2u];
char esc_udp_seq_sample[sizeof(snapshot.udp_loss_seq_sample) * 2u];
char esc_udp_window_dist[sizeof(snapshot.udp_recv_window_dist) * 2u];
json_escape(udp_loss_ranges, esc_udp_ranges, sizeof(esc_udp_ranges));
json_escape(udp_loss_seq_sample, esc_udp_seq_sample, sizeof(esc_udp_seq_sample));
json_escape(udp_recv_window_dist, esc_udp_window_dist, sizeof(esc_udp_window_dist));
fprintf(g_json_fp,
"{\"ts_ms\":%llu,"
"\"level\":\"INFO\","
@@ -764,13 +861,23 @@ void logger_print_performance_log(const char *tag)
"\"min_rtt_ms\":%llu,"
"\"max_rtt_ms\":%llu,"
"\"tcp_retrans\":%llu,"
"\"udp_retrans\":%llu,"
"\"tcp_data_segs_out\":%llu,"
"\"tcp_data_bytes_sent\":%llu,"
"\"tcp_retrans_bytes\":%llu,"
"\"kcp_retrans\":%llu,"
"\"kcp_data_segs_out\":%llu,"
"\"kcp_data_bytes_sent\":%llu,"
"\"kcp_retrans_bytes\":%llu}\n",
"\"kcp_retrans_bytes\":%llu,"
"\"udp_expected_chunks\":%llu,"
"\"udp_received_chunks\":%llu,"
"\"udp_lost_chunks\":%llu,"
"\"udp_loss_rate_pct\":%.6f,"
"\"udp_loss_burst_count\":%llu,"
"\"udp_loss_burst_max_len\":%llu,"
"\"udp_loss_ranges\":\"%s\","
"\"udp_loss_seq_sample\":\"%s\","
"\"udp_recv_window_dist\":\"%s\"}\n",
(unsigned long long)now,
ctx_app[0] ? ctx_app : "unknown",
ctx_proto[0] ? ctx_proto : "unknown",
@@ -828,13 +935,23 @@ void logger_print_performance_log(const char *tag)
(unsigned long long)((snapshot.min_rtt_ms == UINT64_MAX) ? 0 : snapshot.min_rtt_ms),
(unsigned long long)snapshot.max_rtt_ms,
(unsigned long long)snapshot.tcp_retrans,
(unsigned long long)snapshot.udp_retrans,
(unsigned long long)snapshot.tcp_data_segs_out,
(unsigned long long)snapshot.tcp_data_bytes_sent,
(unsigned long long)snapshot.tcp_retrans_bytes,
(unsigned long long)snapshot.kcp_retrans,
(unsigned long long)snapshot.kcp_data_segs_out,
(unsigned long long)snapshot.kcp_data_bytes_sent,
(unsigned long long)snapshot.kcp_retrans_bytes);
(unsigned long long)snapshot.kcp_retrans_bytes,
(unsigned long long)snapshot.udp_expected_chunks,
(unsigned long long)snapshot.udp_received_chunks,
(unsigned long long)snapshot.udp_lost_chunks,
snapshot.udp_loss_rate_pct,
(unsigned long long)snapshot.udp_loss_burst_count,
(unsigned long long)snapshot.udp_loss_burst_max_len,
esc_udp_ranges,
esc_udp_seq_sample,
esc_udp_window_dist);
fflush(g_json_fp);
}
}

View File

@@ -7,9 +7,11 @@
#include <errno.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <stddef.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/ioctl.h>
#include <sys/select.h>
#include <sys/socket.h>
#include <sys/types.h>
@@ -29,6 +31,8 @@ struct PeerTransportSession {
socklen_t addr_len;
ikcpcb *kcp;
uint32_t kcp_conv;
uint32_t *kcp_seg_xmit_seen;
size_t kcp_seg_xmit_cap;
char remote_ip[64];
uint16_t remote_port;
struct PeerTransportSession *next;
@@ -44,6 +48,66 @@ struct PeerTransport {
uint8_t *rx_frame;
};
#ifdef __linux__
struct OmniLinuxTcpInfo {
uint8_t tcpi_state;
uint8_t tcpi_ca_state;
uint8_t tcpi_retransmits;
uint8_t tcpi_probes;
uint8_t tcpi_backoff;
uint8_t tcpi_options;
uint8_t tcpi_snd_wscale : 4, tcpi_rcv_wscale : 4;
uint8_t tcpi_delivery_rate_app_limited : 1, tcpi_fastopen_client_fail : 2;
uint32_t tcpi_rto;
uint32_t tcpi_ato;
uint32_t tcpi_snd_mss;
uint32_t tcpi_rcv_mss;
uint32_t tcpi_unacked;
uint32_t tcpi_sacked;
uint32_t tcpi_lost;
uint32_t tcpi_retrans;
uint32_t tcpi_fackets;
uint32_t tcpi_last_data_sent;
uint32_t tcpi_last_ack_sent;
uint32_t tcpi_last_data_recv;
uint32_t tcpi_last_ack_recv;
uint32_t tcpi_pmtu;
uint32_t tcpi_rcv_ssthresh;
uint32_t tcpi_rtt;
uint32_t tcpi_rttvar;
uint32_t tcpi_snd_ssthresh;
uint32_t tcpi_snd_cwnd;
uint32_t tcpi_advmss;
uint32_t tcpi_reordering;
uint32_t tcpi_rcv_rtt;
uint32_t tcpi_rcv_space;
uint32_t tcpi_total_retrans;
uint64_t tcpi_pacing_rate;
uint64_t tcpi_max_pacing_rate;
uint64_t tcpi_bytes_acked;
uint64_t tcpi_bytes_received;
uint32_t tcpi_segs_out;
uint32_t tcpi_segs_in;
uint32_t tcpi_notsent_bytes;
uint32_t tcpi_min_rtt;
uint32_t tcpi_data_segs_in;
uint32_t tcpi_data_segs_out;
uint64_t tcpi_delivery_rate;
uint64_t tcpi_busy_time;
uint64_t tcpi_rwnd_limited;
uint64_t tcpi_sndbuf_limited;
uint32_t tcpi_delivered;
uint32_t tcpi_delivered_ce;
uint64_t tcpi_bytes_sent;
uint64_t tcpi_bytes_retrans;
};
static int tcp_info_has_field(socklen_t len, size_t field_end)
{
return (size_t)len >= field_end;
}
#endif
static void peer_transport_note_send(size_t bytes)
{
logger_on_send(bytes);
@@ -58,6 +122,203 @@ static void peer_transport_note_recv(size_t bytes)
logger_maybe_print_performance_log("peer_transport_recv");
}
static void sample_socket_buffers(int fd)
{
int sndbuf = 0;
int rcvbuf = 0;
socklen_t optlen = sizeof(int);
int outq = 0;
int inq = 0;
double send_pct = 0.0;
double recv_pct = 0.0;
if (fd < 0) {
return;
}
if (getsockopt(fd, SOL_SOCKET, SO_SNDBUF, &sndbuf, &optlen) == 0 && sndbuf > 0) {
#ifdef TIOCOUTQ
if (ioctl(fd, TIOCOUTQ, &outq) == 0 && outq >= 0) {
send_pct = ((double)outq * 100.0) / (double)sndbuf;
logger_on_send_queue_bytes((size_t)outq);
}
#endif
}
optlen = sizeof(int);
if (getsockopt(fd, SOL_SOCKET, SO_RCVBUF, &rcvbuf, &optlen) == 0 && rcvbuf > 0) {
if (ioctl(fd, FIONREAD, &inq) == 0 && inq >= 0) {
recv_pct = ((double)inq * 100.0) / (double)rcvbuf;
logger_on_recv_queue_bytes((size_t)inq);
}
}
logger_on_buffer_status(send_pct, recv_pct);
}
static void sample_tcp_info(int fd)
{
#ifdef __linux__
struct OmniLinuxTcpInfo ti;
uint64_t total_retrans = 0;
uint64_t data_segs_out = 0;
uint64_t bytes_sent = 0;
uint64_t bytes_retrans = 0;
socklen_t len = sizeof(ti);
if (fd < 0) {
return;
}
memset(&ti, 0, sizeof(ti));
if (getsockopt(fd, IPPROTO_TCP, TCP_INFO, &ti, &len) != 0) {
return;
}
if (tcp_info_has_field(len,
offsetof(struct OmniLinuxTcpInfo, tcpi_total_retrans) +
sizeof(ti.tcpi_total_retrans))) {
total_retrans = (uint64_t)ti.tcpi_total_retrans;
} else {
total_retrans = (uint64_t)ti.tcpi_retrans;
}
if (tcp_info_has_field(len,
offsetof(struct OmniLinuxTcpInfo, tcpi_data_segs_out) +
sizeof(ti.tcpi_data_segs_out))) {
data_segs_out = (uint64_t)ti.tcpi_data_segs_out;
}
if (tcp_info_has_field(len,
offsetof(struct OmniLinuxTcpInfo, tcpi_bytes_sent) +
sizeof(ti.tcpi_bytes_sent))) {
bytes_sent = (uint64_t)ti.tcpi_bytes_sent;
}
if (tcp_info_has_field(len,
offsetof(struct OmniLinuxTcpInfo, tcpi_bytes_retrans) +
sizeof(ti.tcpi_bytes_retrans))) {
bytes_retrans = (uint64_t)ti.tcpi_bytes_retrans;
}
logger_on_rtt((uint64_t)(ti.tcpi_rtt / 1000u));
logger_on_tcp_transport(total_retrans, data_segs_out, bytes_sent, bytes_retrans);
logger_on_cwnd((double)ti.tcpi_snd_cwnd);
sample_socket_buffers(fd);
#else
(void)fd;
#endif
}
static int kcp_ensure_seg_track_capacity(PeerTransportSession *session, uint32_t sn)
{
size_t need;
size_t new_cap;
uint32_t *new_seen;
if (!session) {
return OMNI_ERR_PARAM;
}
need = (size_t)sn + 1u;
if (need <= session->kcp_seg_xmit_cap) {
return OMNI_OK;
}
new_cap = (session->kcp_seg_xmit_cap == 0) ? 64u : session->kcp_seg_xmit_cap;
while (new_cap < need) {
new_cap *= 2u;
}
new_seen = (uint32_t *)realloc(session->kcp_seg_xmit_seen,
new_cap * sizeof(uint32_t));
if (!new_seen) {
return OMNI_ERR_GENERIC;
}
memset(new_seen + session->kcp_seg_xmit_cap,
0,
(new_cap - session->kcp_seg_xmit_cap) * sizeof(uint32_t));
session->kcp_seg_xmit_seen = new_seen;
session->kcp_seg_xmit_cap = new_cap;
return OMNI_OK;
}
static void sample_kcp_session(PeerTransportSession *session)
{
struct IQUEUEHEAD *p;
double send_pct = 0.0;
double recv_pct = 0.0;
if (!session || !session->kcp) {
return;
}
for (p = session->kcp->snd_buf.next; p != &session->kcp->snd_buf; p = p->next) {
struct IKCPSEG *segment = iqueue_entry(p, struct IKCPSEG, node);
uint32_t prev_xmit;
if (!segment || segment->len == 0) {
continue;
}
if (kcp_ensure_seg_track_capacity(session, segment->sn) != OMNI_OK) {
return;
}
prev_xmit = session->kcp_seg_xmit_seen[segment->sn];
if (prev_xmit == 0 && segment->xmit > 0) {
logger_on_kcp_tx(1u, (uint64_t)segment->len);
prev_xmit = 1u;
}
if (segment->xmit > prev_xmit) {
uint32_t delta = segment->xmit - prev_xmit;
logger_on_kcp_retrans((uint64_t)delta,
(uint64_t)delta * (uint64_t)segment->len);
}
session->kcp_seg_xmit_seen[segment->sn] = segment->xmit;
}
if (session->kcp->rx_srtt > 0) {
logger_on_rtt((uint64_t)session->kcp->rx_srtt);
}
logger_on_cwnd((double)session->kcp->cwnd);
if (session->kcp->snd_wnd > 0) {
send_pct = ((double)ikcp_waitsnd(session->kcp) * 100.0) /
(double)session->kcp->snd_wnd;
logger_on_send_queue_bytes((size_t)ikcp_waitsnd(session->kcp) *
(size_t)session->kcp->mss);
}
if (session->kcp->rcv_wnd > 0) {
recv_pct = ((double)session->kcp->nrcv_que * 100.0) /
(double)session->kcp->rcv_wnd;
logger_on_recv_queue_bytes((size_t)session->kcp->nrcv_que *
(size_t)session->kcp->mss);
}
logger_on_buffer_status(send_pct, recv_pct);
}
static void peer_transport_sample_after_send(PeerTransport *transport,
PeerTransportSession *session)
{
if (!transport || !session) {
return;
}
switch (transport->proto) {
case OMNI_PROTO_TCP:
sample_tcp_info(session->fd);
break;
case OMNI_PROTO_UDP:
sample_socket_buffers(transport->fd);
break;
case OMNI_PROTO_KCP:
sample_kcp_session(session);
break;
default:
break;
}
}
static void peer_transport_sample_after_recv(PeerTransport *transport,
PeerTransportSession *session)
{
peer_transport_sample_after_send(transport, session);
}
const char *peer_transport_proto_name(OmniProtocol proto)
{
switch (proto) {
@@ -183,6 +444,7 @@ static void session_free(PeerTransportSession *session)
if (session->kcp) {
ikcp_release(session->kcp);
}
free(session->kcp_seg_xmit_seen);
free(session);
}
@@ -845,6 +1107,10 @@ int peer_transport_send(PeerTransport *transport,
ssize_t n = -1;
int rc = OMNI_OK;
int raw_rc = 0;
uint64_t call_t0;
uint64_t proto_t0;
uint64_t proto_t1;
uint64_t call_t1;
if (!transport) {
return OMNI_ERR_PARAM;
@@ -856,11 +1122,13 @@ int peer_transport_send(PeerTransport *transport,
return OMNI_ERR_IO;
}
call_t0 = omni_now_ms();
frame = alloc_frame(type, payload, payload_len, &frame_len);
if (!frame) {
return OMNI_ERR_GENERIC;
}
proto_t0 = omni_now_ms();
switch (transport->proto) {
case OMNI_PROTO_TCP:
n = write_n(session->fd, frame, frame_len);
@@ -891,9 +1159,14 @@ int peer_transport_send(PeerTransport *transport,
rc = OMNI_ERR_PARAM;
break;
}
proto_t1 = omni_now_ms();
call_t1 = proto_t1;
logger_on_proto_send_latency(proto_t1 - proto_t0);
logger_on_send_call_latency(call_t1 - call_t0);
if (rc == OMNI_OK) {
peer_transport_note_send(frame_len);
peer_transport_sample_after_send(transport, session);
} else {
logger_log("ERROR", "peer_transport",
"send_failed proto=%s type=%u remote=%s:%u raw_rc=%d errno=%d",
@@ -1088,21 +1361,39 @@ int peer_transport_next_event(PeerTransport *transport,
size_t payload_cap,
int timeout_ms)
{
int rc;
uint64_t t0;
uint64_t t1;
if (!transport || !event || !payload_buf) {
return OMNI_ERR_PARAM;
}
event_reset(event);
t0 = omni_now_ms();
switch (transport->proto) {
case OMNI_PROTO_TCP:
return tcp_next_event(transport, event, payload_buf, payload_cap, timeout_ms);
rc = tcp_next_event(transport, event, payload_buf, payload_cap, timeout_ms);
break;
case OMNI_PROTO_UDP:
return udp_next_event(transport, event, payload_buf, payload_cap, timeout_ms);
rc = udp_next_event(transport, event, payload_buf, payload_cap, timeout_ms);
break;
case OMNI_PROTO_KCP:
return kcp_next_event(transport, event, payload_buf, payload_cap, timeout_ms);
rc = kcp_next_event(transport, event, payload_buf, payload_cap, timeout_ms);
break;
default:
return OMNI_ERR_PARAM;
rc = OMNI_ERR_PARAM;
break;
}
t1 = omni_now_ms();
if (rc > 0) {
logger_on_proto_recv_latency(t1 - t0);
logger_on_recv_call_latency(t1 - t0);
if (event->session) {
peer_transport_sample_after_recv(transport, event->session);
}
}
return rc;
}
void peer_transport_close_session(PeerTransport *transport,