diff --git a/README.md b/README.md index fb03543..22f7ed1 100644 --- a/README.md +++ b/README.md @@ -211,3 +211,115 @@ put /path/to/file.bin - 该场景已经实现 `A -> C -> D -> B` 与 `B -> D -> C -> A` 的桥接转发 - 但当前 `bridge` 仍是单下游、单身份模型,不是完整的多节点桥接网络 +## 日志与指标 + +### 输出位置 + +- 终端文本日志:默认输出到 `stderr`,格式为 `key=value` +- 结构化日志:默认追加到当前工作目录下的 `omni_logs.jsonl` +- 性能快照统一写在 `component="perf"` 的 JSONL 记录里 +- 终端里会额外出现 `component=perf_udp_loss` 文本行;同一批 UDP 丢包字段已经合并写进对应的 `component="perf"` JSONL 行 +- 周期性性能快照大约每 `1s` 打一次;进程退出时会再打一条 `tag="final"` + +### 当前文档口径 + +下面这些说法要以当前实现为准,不要再按旧文档理解: + +- `processing / queue / transmission / propagation / end_to_end` 都是“当前实现下的本地观测值或估算值”,不是严格意义上的物理链路精确测量值。 +- `processing_*` 当前表示本地应用层处理耗时,主要来自文件分片封装、接收端写盘等路径;它不是“某一块硬件 CPU 的完整开销画像”。 +- `queue_*` 和 `transmission_*` 是基于当前吞吐和缓冲/队列状态反推出来的估算值。进程空转时间很长、样本很小或者当前速率很低时,这两个值可能明显偏大。 +- `propagation_*` 当前来自 `min_rtt_ms / 2` 的估算;如果当前协议没有 RTT 样本,这组字段就是 `0`。 +- `end_to_end_*` 当前只在“最终接收文件的 peer”上有值,来源是发送端分片里的 `origin_ts_ms`。发送端、Hub、Bridge 一般是 `0`。 +- UDP 丢包统计只在 `UDP 文件接收侧 peer` 上有值;UDP 发送侧、Hub、Bridge 不会产出这组汇总。 +- `udp_retrans` 字段当前还没有实现应用层 UDP 重传统计,所以现在始终是 `0`。 +- `TCP/KCP` 当前记录的是重传次数/重传字节/累计发送分片,不直接记录“重传频率”这个单独字段。 +- `send_buffer_pct_*` / `recv_buffer_pct_*` 是占用率风格的指标,但不保证永远严格落在 `0-100`;尤其 KCP 等待队列超过窗口时,理论上可以大于 `100`。 +- `send_call_* / recv_call_* / proto_* / processing_*` 有时会是 `0`,常见原因不是没统计,而是当前时间分辨率是毫秒,很多本地操作小于 `1ms`。 + +### 基础与吞吐字段 + +| 文档含义 | JSONL 字段 | 单位 | 当前语义 | 什么时候有值 / 为什么会是 0 | +| --- | --- | --- | --- | --- | +| 时间戳 | `ts_ms` | ms | 这条日志写出的单调时间戳 | 始终有值 | +| 日志分类 | `level` / `component` / `tag` | - | `component="perf"` 表示性能快照,`tag` 常见为 `peer_transport_send`、`peer_transport_recv`、`final` | 始终有值 | +| 身份上下文 | `app` / `proto` / `mode` / `role` / `self_id` | - | 程序名、协议、模式、角色、逻辑 ID | `hub` 的 `self_id` 为空是正常的 | +| 运行时长 | `elapsed_ms` | ms | 当前进程从启动到本次快照的时长 | 始终有值 | +| 累计发送字节 | `bytes_sent` | bytes | 当前进程累计发送的协议帧总字节 | 无发送时为 `0` | +| 累计接收字节 | `bytes_recv` | bytes | 当前进程累计接收的协议帧总字节 | 无接收时为 `0` | +| 发送次数 | `send_count` | count | 当前进程累计发送帧次数 | 无发送时为 `0` | +| 接收次数 | `recv_count` | count | 当前进程累计接收帧次数 | 无接收时为 `0` | +| 瞬时发送带宽 | `tx_current_mbps` | Mbps | 最近一个统计窗口内的发送速率 | 当前窗口没流量时为 `0` | +| 瞬时接收带宽 | `rx_current_mbps` | Mbps | 最近一个统计窗口内的接收速率 | 当前窗口没流量时为 `0` | +| 平均发送带宽 | `tx_avg_mbps` | Mbps | 进程启动到当前的平均发送速率 | 从未发送时为 `0` | +| 平均接收带宽 | `rx_avg_mbps` | Mbps | 进程启动到当前的平均接收速率 | 从未接收时为 `0` | +| 传输进度字节 | `progress_bytes` | bytes | 当前文件传输已完成字节数 | 没有文件传输时为 `0` | +| 总工作量 | `total_work_bytes` | bytes | 当前文件总大小 | 没有文件传输时为 `0` | +| 传输进度百分比 | `progress_pct` | % | `progress_bytes / total_work_bytes * 100` | 没有文件传输时为 `0` | + +### 调用耗时与延迟字段 + +| 文档含义 | JSONL 字段 | 单位 | 当前语义 | 什么时候有值 / 为什么会是 0 | +| --- | --- | --- | --- | --- | +| 应用层发送调用 | `send_call_last_ms` / `send_call_min_ms` / `send_call_max_ms` / `send_call_avg_ms` | ms | `peer_transport_send()` 调用耗时 | 没有发送,或每次发送都小于 `1ms` 时可能为 `0` | +| 应用层接收调用 | `recv_call_last_ms` / `recv_call_min_ms` / `recv_call_max_ms` / `recv_call_avg_ms` | ms | `peer_transport_next_event()` 接收调用耗时 | 没有接收,或每次接收都小于 `1ms` 时可能为 `0` | +| 协议层发送耗时 | `proto_send_avg_ms` | ms | TCP/UDP/KCP 实际 send 路径耗时 EWMA | 发送很快且小于 `1ms` 时常为 `0` | +| 协议层接收耗时 | `proto_recv_avg_ms` | ms | TCP/UDP/KCP 实际 recv 路径耗时 EWMA | 接收很快且小于 `1ms` 时常为 `0` | +| 本地处理耗时 | `processing_avg_ms` / `processing_min_ms` / `processing_max_ms` | ms | 当前进程内的分片封装、写盘等本地处理耗时 | 仅在文件发送/接收路径上采样;操作太快时可能为 `0` | +| 排队延迟估算 | `queue_avg_ms` / `queue_min_ms` / `queue_max_ms` | ms | 根据发送/接收队列字节数和当前速率估算 | 没有队列样本时为 `0`;长空转后可能偏大 | +| 传输延迟估算 | `transmission_avg_ms` / `transmission_min_ms` / `transmission_max_ms` | ms | 根据当前速率估算“这些字节推上链路需要多久” | 没有流量样本时为 `0`;小样本/低速率时可能偏大 | +| 传播延迟估算 | `propagation_avg_ms` / `propagation_min_ms` / `propagation_max_ms` | ms | 基于 `min_rtt_ms / 2` 的估算 | 当前协议没有 RTT 样本时为 `0` | +| 端到端延迟 | `end_to_end_avg_ms` / `end_to_end_min_ms` / `end_to_end_max_ms` | ms | 发送端分片 `origin_ts_ms` 到接收端处理时刻的差值 | 只在最终接收文件的 `peer` 上有值;发送端 / Hub / Bridge 多数为 `0` | + +### 可靠性字段 + +| 文档含义 | JSONL 字段 | 单位 | 当前语义 | 什么时候有值 / 为什么会是 0 | +| --- | --- | --- | --- | --- | +| TCP 重传次数 | `tcp_retrans` | count | 内核 `TCP_INFO` 的累计重传次数 | 仅 TCP 有值;未重传或非 TCP 时为 `0` | +| TCP 数据段数 | `tcp_data_segs_out` | count | TCP 累计发送数据段数 | 仅 TCP 有值;非 TCP 时为 `0` | +| TCP 发送字节 | `tcp_data_bytes_sent` | bytes | TCP 累计发送数据字节 | 仅 TCP 有值;非 TCP 时为 `0` | +| TCP 重传字节 | `tcp_retrans_bytes` | bytes | TCP 累计重传字节 | 仅 TCP 有值;未重传或非 TCP 时为 `0` | +| UDP 重传次数 | `udp_retrans` | count | 预留字段,当前未实现 | 当前始终为 `0` | +| KCP 重传次数 | `kcp_retrans` | count | KCP 内部累计重传分片数 | 仅 KCP 有值;未重传或非 KCP 时为 `0` | +| KCP 数据分片数 | `kcp_data_segs_out` | count | KCP 累计发送分片数 | 仅 KCP 有值;非 KCP 时为 `0` | +| KCP 发送字节 | `kcp_data_bytes_sent` | bytes | KCP 累计发送分片字节 | 仅 KCP 有值;非 KCP 时为 `0` | +| KCP 重传字节 | `kcp_retrans_bytes` | bytes | KCP 累计重传字节 | 仅 KCP 有值;未重传或非 KCP 时为 `0` | +| UDP 预期分片数 | `udp_expected_chunks` | count | UDP 文件接收端预期应收到的总分片数 | 仅 UDP 文件接收侧 `peer` 有值;其他角色为 `0` | +| UDP 实收分片数 | `udp_received_chunks` | count | UDP 文件接收端实际收到的唯一分片数 | 仅 UDP 文件接收侧 `peer` 有值;其他角色为 `0` | +| UDP 丢失分片数 | `udp_lost_chunks` | count | 根据 `seq` 推断的缺失分片数 | 完整收到时为 `0`;非 UDP 接收侧也为 `0` | +| UDP 丢包率 | `udp_loss_rate_pct` | % | `udp_lost_chunks / udp_expected_chunks * 100` | 完整收到时为 `0`;非 UDP 接收侧也为 `0` | +| UDP 连续丢包区间数 | `udp_loss_burst_count` | count | 丢包区间个数 | 无丢包时为 `0` | +| UDP 最大连续丢包长度 | `udp_loss_burst_max_len` | count | 单个丢包区间的最大长度 | 无丢包时为 `0` | +| UDP 丢包区间摘要 | `udp_loss_ranges` | csv string | 例如 `4-6,9,12-13` | 无丢包时为空串 | +| UDP 丢失序号样本 | `udp_loss_seq_sample` | csv string | 最多记录一部分缺失 `seq` 样本 | 无丢包时为空串 | +| UDP 接收窗口分布 | `udp_recv_window_dist` | csv string | `window_id:count`,例如 `0:3,1:28` | 仅 UDP 接收侧有值;没有样本时为空串 | + +### 资源与算法字段 + +| 文档含义 | JSONL 字段 | 单位 | 当前语义 | 什么时候有值 / 为什么会是 0 | +| --- | --- | --- | --- | --- | +| 发送缓冲区占用 | `send_buffer_pct_last` / `send_buffer_pct_avg` / `send_buffer_pct_max` | % 风格值 | Socket 或 KCP 发送队列占用率样本 | 没有缓冲采样时为 `0`;KCP 拥塞时可能大于 `100` | +| 接收缓冲区占用 | `recv_buffer_pct_last` / `recv_buffer_pct_avg` / `recv_buffer_pct_max` | % 风格值 | Socket 或 KCP 接收队列占用率样本 | 没有缓冲采样时为 `0` | +| 拥塞窗口 | `cwnd_last` / `cwnd_avg` / `cwnd_max` | 协议窗口大小 | TCP/KCP 当前拥塞窗口样本 | UDP 没有拥塞窗口,因此为 `0` | +| RTT | `last_rtt_ms` / `min_rtt_ms` / `max_rtt_ms` | ms | TCP `TCP_INFO` 或 KCP `rx_srtt` 的 RTT 样本 | UDP 当前没有 RTT 探测,因此为 `0` | + +### 实测样本 + +以下值来自本仓库当前实现的本地回环测试,仅用于说明“字段已经能落到 JSONL 且当前名字是什么”,不是固定性能指标。 + +| 样本 | 关键字段 | 实测值 | +| --- | --- | --- | +| `TCP` 点对点直传发送端 | `progress_pct` / `cwnd_last` / `last_rtt_ms` / `tcp_data_bytes_sent` | `100` / `10` / `1` / `287984` | +| `UDP` Hub 中转接收端 | `progress_pct` / `udp_expected_chunks` / `udp_received_chunks` / `udp_lost_chunks` / `udp_recv_window_dist` | `100` / `3` / `3` / `0` / `0:3` | +| `KCP` Bridge 桥接节点 | `cwnd_last` / `last_rtt_ms` / `kcp_data_bytes_sent` / `queue_avg_ms` | `2` / `7` / `265` / `33991.342083` | +| `KCP` 最终接收端 | `progress_pct` / `end_to_end_avg_ms` / `cwnd_last` | `100` / `121.333333` / `2` | + +### 为什么有些字段“存在但没有值” + +| 情况 | 典型字段 | 说明 | +| --- | --- | --- | +| 协议不适用 | `tcp_*` 出现在 UDP/KCP;`kcp_*` 出现在 TCP/UDP;`cwnd_*` 出现在 UDP | 字段统一保留,便于同一份 JSONL 脚本处理;不适用的协议就写 `0` | +| 还没实现 | `udp_retrans` | 现在没有应用层 UDP 重传,所以这个字段只是预留 | +| 当前角色不产出 | `udp_expected_chunks`、`udp_loss_*`、`end_to_end_*` | UDP 丢包统计只在最终接收文件的 UDP peer 上有值;`end_to_end_*` 也主要只在最终接收端有值 | +| 没有采样到 RTT | `last_rtt_ms`、`propagation_*` | UDP 当前没有 RTT 探针;TCP/KCP 只有在拿到对应协议样本后才有值 | +| 时间分辨率太粗 | `send_call_*`、`proto_send_avg_ms`、`processing_*` | 当前很多路径按毫秒计时,本地回环下大量操作小于 `1ms`,所以会显示 `0` | +| 当前没有任务 | `progress_*` | 只注册但没有文件传输时,这组字段自然是 `0` | diff --git a/include/logger.h b/include/logger.h index ad34356..05cd1ac 100644 --- a/include/logger.h +++ b/include/logger.h @@ -17,6 +17,10 @@ typedef struct OmniMetricSummary { double sum; } OmniMetricSummary; +#define OMNI_LOGGER_UDP_RANGES_SIZE 512u +#define OMNI_LOGGER_UDP_SEQ_SAMPLE_SIZE 512u +#define OMNI_LOGGER_UDP_WINDOW_DIST_SIZE 512u + /* 通过该结构体收集全局统计信息 */ typedef struct OmniStats { uint64_t start_ms; /* 起始时间(毫秒) */ @@ -44,6 +48,15 @@ typedef struct OmniStats { uint64_t kcp_data_segs_out; /* KCP 累计发送的数据分片数(含重传) */ uint64_t kcp_data_bytes_sent; /* KCP 累计发送的数据字节(含重传) */ uint64_t kcp_retrans_bytes; /* KCP 累计重传的数据字节 */ + uint64_t udp_expected_chunks; /* UDP 文件接收侧预期分片数 */ + uint64_t udp_received_chunks; /* UDP 文件接收侧实际收到的分片数 */ + uint64_t udp_lost_chunks; /* UDP 文件接收侧推断丢失的分片数 */ + uint64_t udp_loss_burst_count; /* UDP 丢包区间数量 */ + uint64_t udp_loss_burst_max_len; /* UDP 最大连续丢包长度 */ + double udp_loss_rate_pct; /* UDP 丢包率(百分比) */ + char udp_loss_ranges[OMNI_LOGGER_UDP_RANGES_SIZE]; /* UDP 丢包区间摘要 */ + char udp_loss_seq_sample[OMNI_LOGGER_UDP_SEQ_SAMPLE_SIZE]; /* UDP 丢包序号样本 */ + char udp_recv_window_dist[OMNI_LOGGER_UDP_WINDOW_DIST_SIZE]; /* UDP 接收窗口分布 */ /* 延迟/耗时统计(单位:毫秒) */ double send_call_avg_ms; /* omni_send 平均耗时(EWMA) */ @@ -126,6 +139,15 @@ void logger_on_cwnd(double cwnd); /* 记录任务总量与当前进度。 */ void logger_set_transfer_total(uint64_t total_bytes); void logger_set_progress(uint64_t progress_bytes); +void logger_reset_transfer_observability(void); +void logger_on_udp_loss_summary(uint64_t expected_chunks, + uint64_t received_chunks, + uint64_t lost_chunks, + uint64_t burst_count, + uint64_t burst_max_len, + const char *ranges, + const char *seq_sample, + const char *recv_window_dist); /* 计算当前吞吐量(返回:字节/秒) */ double logger_calculate_throughput(void); diff --git a/src/apps/peer_main.c b/src/apps/peer_main.c index 804dbeb..98f293b 100644 --- a/src/apps/peer_main.c +++ b/src/apps/peer_main.c @@ -42,6 +42,11 @@ typedef struct PeerRecvFileState { uint32_t total_chunks; uint64_t total_bytes; uint64_t bytes_written; + uint32_t received_chunks; + uint32_t observed_windows; + uint8_t *seq_seen; + uint32_t *recv_window_counts; + size_t recv_window_cap; } PeerRecvFileState; typedef struct PeerRuntime { @@ -49,6 +54,7 @@ typedef struct PeerRuntime { PeerTransportSession *session; PeerMode mode; OmniRole role; + OmniProtocol proto; int running; int direct_register_sent; char client_id[OMNI_PEER_ID_SIZE]; @@ -175,11 +181,239 @@ static void peer_set_bound_peer(PeerRuntime *rt, const char *peer_id) static void close_recv_file(PeerRuntime *rt) { - if (!rt || !rt->rx_file.fp) { + if (!rt) { return; } - fclose(rt->rx_file.fp); - rt->rx_file.fp = NULL; + if (rt->rx_file.fp) { + fclose(rt->rx_file.fp); + rt->rx_file.fp = NULL; + } + free(rt->rx_file.seq_seen); + free(rt->rx_file.recv_window_counts); + memset(&rt->rx_file, 0, sizeof(rt->rx_file)); +} + +static int recv_file_prepare_tracking(PeerRecvFileState *state, uint32_t total_chunks) +{ + if (!state) { + return OMNI_ERR_PARAM; + } + if (total_chunks == 0) { + return OMNI_OK; + } + state->seq_seen = (uint8_t *)calloc((size_t)total_chunks + 1u, sizeof(uint8_t)); + if (!state->seq_seen) { + return OMNI_ERR_GENERIC; + } + return OMNI_OK; +} + +static int recv_file_ensure_window_capacity(PeerRecvFileState *state, uint32_t window_id) +{ + size_t need; + size_t new_cap; + uint32_t *new_counts; + + if (!state) { + return OMNI_ERR_PARAM; + } + need = (size_t)window_id + 1u; + if (need <= state->recv_window_cap) { + return OMNI_OK; + } + + new_cap = (state->recv_window_cap == 0) ? 4u : state->recv_window_cap; + while (new_cap < need) { + new_cap *= 2u; + } + + new_counts = (uint32_t *)realloc(state->recv_window_counts, + new_cap * sizeof(uint32_t)); + if (!new_counts) { + return OMNI_ERR_GENERIC; + } + memset(new_counts + state->recv_window_cap, + 0, + (new_cap - state->recv_window_cap) * sizeof(uint32_t)); + state->recv_window_counts = new_counts; + state->recv_window_cap = new_cap; + return OMNI_OK; +} + +static void recv_file_note_chunk(PeerRecvFileState *state, + uint32_t seq, + uint32_t window_id) +{ + if (!state || seq == 0) { + return; + } + + if (recv_file_ensure_window_capacity(state, window_id) == OMNI_OK) { + if ((size_t)window_id < state->recv_window_cap) { + state->recv_window_counts[window_id]++; + } + if (window_id + 1u > state->observed_windows) { + state->observed_windows = window_id + 1u; + } + } + + if (!state->seq_seen) { + state->received_chunks++; + return; + } + if (state->seq_seen && seq <= state->total_chunks && !state->seq_seen[seq]) { + state->seq_seen[seq] = 1; + state->received_chunks++; + } +} + +static void append_csv_uint(char *buf, size_t buf_sz, uint32_t value) +{ + size_t len; + + if (!buf || buf_sz == 0) { + return; + } + len = strlen(buf); + if (len >= buf_sz - 1u) { + return; + } + snprintf(buf + len, buf_sz - len, "%s%u", len == 0 ? "" : ",", (unsigned)value); +} + +static void append_range(char *buf, size_t buf_sz, uint32_t start, uint32_t end) +{ + size_t len; + + if (!buf || buf_sz == 0) { + return; + } + len = strlen(buf); + if (len >= buf_sz - 1u) { + return; + } + if (start == end) { + snprintf(buf + len, buf_sz - len, "%s%u", len == 0 ? "" : ",", (unsigned)start); + } else { + snprintf(buf + len, buf_sz - len, "%s%u-%u", + len == 0 ? "" : ",", + (unsigned)start, + (unsigned)end); + } +} + +static void append_window_count(char *buf, + size_t buf_sz, + uint32_t window_id, + uint32_t count) +{ + size_t len; + + if (!buf || buf_sz == 0) { + return; + } + len = strlen(buf); + if (len >= buf_sz - 1u) { + return; + } + snprintf(buf + len, buf_sz - len, "%s%u:%u", + len == 0 ? "" : ",", + (unsigned)window_id, + (unsigned)count); +} + +static void peer_record_udp_loss_summary(PeerRuntime *rt, uint32_t total_windows) +{ + PeerRecvFileState *state; + uint64_t lost_chunks; + uint64_t burst_count = 0; + uint64_t burst_max_len = 0; + char ranges[OMNI_LOGGER_UDP_RANGES_SIZE]; + char seq_sample[OMNI_LOGGER_UDP_SEQ_SAMPLE_SIZE]; + char recv_window_dist[OMNI_LOGGER_UDP_WINDOW_DIST_SIZE]; + uint32_t sample_count = 0; + + if (!rt || rt->proto != OMNI_PROTO_UDP) { + return; + } + state = &rt->rx_file; + if (state->total_chunks == 0 || !state->seq_seen) { + return; + } + + ranges[0] = '\0'; + seq_sample[0] = '\0'; + recv_window_dist[0] = '\0'; + lost_chunks = 0; + + for (uint32_t seq = 1; seq <= state->total_chunks; ++seq) { + if (state->seq_seen[seq]) { + continue; + } + + { + uint32_t end = seq; + + while (end + 1u <= state->total_chunks && !state->seq_seen[end + 1u]) { + end++; + } + lost_chunks += (uint64_t)(end - seq + 1u); + burst_count++; + if ((uint64_t)(end - seq + 1u) > burst_max_len) { + burst_max_len = (uint64_t)(end - seq + 1u); + } + append_range(ranges, sizeof(ranges), seq, end); + for (uint32_t missing = seq; missing <= end && sample_count < 16u; ++missing) { + append_csv_uint(seq_sample, sizeof(seq_sample), missing); + sample_count++; + } + seq = end; + } + } + + for (uint32_t window_id = 0; + window_id < (uint32_t)state->recv_window_cap && + window_id < total_windows; + ++window_id) { + if (state->recv_window_counts[window_id] == 0) { + continue; + } + append_window_count(recv_window_dist, + sizeof(recv_window_dist), + window_id, + state->recv_window_counts[window_id]); + } + + logger_on_udp_loss_summary(state->total_chunks, + state->received_chunks, + lost_chunks, + burst_count, + burst_max_len, + ranges, + seq_sample, + recv_window_dist); +} + +static void peer_finalize_recv_observability(PeerRuntime *rt, uint32_t total_windows_hint) +{ + uint32_t total_windows; + + if (!rt || rt->proto != OMNI_PROTO_UDP) { + return; + } + if (rt->rx_file.transfer_id == 0 || rt->rx_file.total_chunks == 0) { + return; + } + + total_windows = total_windows_hint; + if (total_windows == 0) { + total_windows = rt->rx_file.observed_windows; + } + if (total_windows == 0) { + total_windows = 1; + } + + peer_record_udp_loss_summary(rt, total_windows); } static uint64_t compute_file_size(FILE *fp) @@ -389,6 +623,8 @@ static int peer_send_file(PeerRuntime *rt, uint32_t total_chunks; uint32_t transfer_id; uint32_t total_windows = 1; + uint64_t transfer_start_ms; + uint32_t max_window_id = 0; int rc = OMNI_OK; if (!file_path) { @@ -419,10 +655,13 @@ static int peer_send_file(PeerRuntime *rt, goto out; } + logger_reset_transfer_observability(); logger_set_transfer_total(total_bytes); logger_set_progress(0); + transfer_start_ms = omni_now_ms(); for (uint32_t seq = 1; rt->running && !g_stop; ++seq) { + uint64_t processing_t0 = omni_now_ms(); size_t nread = fread(chunk, 1, chunk_size, fp); if (nread == 0) { @@ -437,18 +676,24 @@ static int peer_send_file(PeerRuntime *rt, if (nread > 0) { TransferChunkMeta meta; + uint64_t chunk_origin_ts_ms = omni_now_ms(); + uint32_t window_id = (uint32_t)((chunk_origin_ts_ms - transfer_start_ms) / 1000u); omni_transfer_chunk_meta_encode(&meta, transfer_id, seq, total_chunks, - 0, + window_id, total_bytes, offset, (uint32_t)nread, - omni_now_ms()); + chunk_origin_ts_ms); memcpy(payload, &meta, TRANSFER_CHUNK_META_SIZE); memcpy(payload + TRANSFER_CHUNK_META_SIZE, chunk, nread); + if (window_id > max_window_id) { + max_window_id = window_id; + } + logger_on_processing_latency((double)(omni_now_ms() - processing_t0)); rc = peer_send_inner(rt, effective_dst, MSG_TYPE_FILE_CHUNK, @@ -465,6 +710,7 @@ static int peer_send_file(PeerRuntime *rt, if (rc == OMNI_OK && rt->running && !g_stop) { TransferEndMeta end_meta; + total_windows = max_window_id + 1u; omni_transfer_end_meta_encode(&end_meta, transfer_id, total_chunks, @@ -587,6 +833,7 @@ static int peer_open_recv_file(PeerRuntime *rt, const TransferChunkMeta *meta) } if (!rt->rx_file.fp || rt->rx_file.transfer_id != meta->transfer_id) { + peer_finalize_recv_observability(rt, 0); close_recv_file(rt); rt->rx_file.fp = fopen(rt->output_path, "wb+"); if (!rt->rx_file.fp) { @@ -596,6 +843,13 @@ static int peer_open_recv_file(PeerRuntime *rt, const TransferChunkMeta *meta) rt->rx_file.total_chunks = meta->total_chunks; rt->rx_file.total_bytes = meta->total_bytes; rt->rx_file.bytes_written = 0; + if (recv_file_prepare_tracking(&rt->rx_file, meta->total_chunks) != OMNI_OK) { + logger_log("WARN", "peer", + "recv_tracking_alloc_failed transfer_id=%u total_chunks=%u", + (unsigned)meta->transfer_id, + (unsigned)meta->total_chunks); + } + logger_reset_transfer_observability(); logger_set_transfer_total(meta->total_bytes); logger_set_progress(0); } @@ -608,6 +862,8 @@ static void handle_file_chunk_message(PeerRuntime *rt, uint32_t payload_len) { TransferChunkMeta meta; + uint64_t processing_t0 = omni_now_ms(); + uint64_t now_ms; if (payload_len < TRANSFER_CHUNK_META_SIZE) { logger_log("WARN", "peer", "short_file_chunk_payload len=%u", (unsigned)payload_len); @@ -632,6 +888,8 @@ static void handle_file_chunk_message(PeerRuntime *rt, return; } + recv_file_note_chunk(&rt->rx_file, meta.seq, meta.window_id); + if (fseeko(rt->rx_file.fp, (off_t)meta.offset_bytes, SEEK_SET) != 0) { logger_log("ERROR", "peer", "fseeko_failed offset=%llu errno=%d", (unsigned long long)meta.offset_bytes, @@ -645,6 +903,11 @@ static void handle_file_chunk_message(PeerRuntime *rt, rt->rx_file.bytes_written += meta.chunk_bytes; logger_set_progress(rt->rx_file.bytes_written); + now_ms = omni_now_ms(); + logger_on_processing_latency((double)(now_ms - processing_t0)); + if (meta.origin_ts_ms > 0 && now_ms >= meta.origin_ts_ms) { + logger_on_end_to_end_latency((double)(now_ms - meta.origin_ts_ms)); + } } static void handle_file_end_message(PeerRuntime *rt, @@ -669,6 +932,9 @@ static void handle_file_end_message(PeerRuntime *rt, total_chunks = rt->rx_file.total_chunks ? rt->rx_file.total_chunks : end_meta.total_chunks; total_bytes = rt->rx_file.total_bytes ? rt->rx_file.total_bytes : end_meta.total_bytes; } + peer_finalize_recv_observability(rt, + end_meta.total_windows > 0 ? end_meta.total_windows + : rt->rx_file.observed_windows); fprintf(stdout, "[file %s -> %s] transfer_id=%u bytes_written=%llu total_bytes=%llu total_chunks=%u output=%s\n", @@ -955,6 +1221,7 @@ static void handle_transport_event(PeerRuntime *rt, rt->session = NULL; rt->direct_register_sent = 0; peer_set_bound_peer(rt, ""); + peer_finalize_recv_observability(rt, 0); close_recv_file(rt); rt->rx_file.transfer_id = 0; rt->rx_file.total_chunks = 0; @@ -1114,6 +1381,7 @@ int main(int argc, char **argv) memset(&rt, 0, sizeof(rt)); memset(&startup, 0, sizeof(startup)); rt.mode = mode; + rt.proto = proto; rt.role = (mode == PEER_MODE_DIRECT && listen_port > 0) ? OMNI_ROLE_SERVER : OMNI_ROLE_CLIENT; log_mode = (mode == PEER_MODE_DIRECT) ? "direct" : "hub"; log_role = (rt.role == OMNI_ROLE_SERVER) ? "server" : "client"; @@ -1220,6 +1488,7 @@ int main(int argc, char **argv) } } + peer_finalize_recv_observability(&rt, 0); close_recv_file(&rt); peer_transport_close(rt.transport); logger_print_performance_log("final"); diff --git a/src/core/logger.c b/src/core/logger.c index 5e7bac5..ee79d5f 100644 --- a/src/core/logger.c +++ b/src/core/logger.c @@ -29,6 +29,25 @@ static char g_ctx_mode[16]; static char g_ctx_role[16]; static char g_ctx_self_id[OMNI_PEER_ID_SIZE]; +static void metric_reset(OmniMetricSummary *metric); + +static void reset_transfer_observability_locked(void) +{ + g_stats.total_work_bytes = 0; + g_stats.progress_bytes = 0; + metric_reset(&g_stats.processing_delay_ms); + metric_reset(&g_stats.end_to_end_delay_ms); + g_stats.udp_expected_chunks = 0; + g_stats.udp_received_chunks = 0; + g_stats.udp_lost_chunks = 0; + g_stats.udp_loss_burst_count = 0; + g_stats.udp_loss_burst_max_len = 0; + g_stats.udp_loss_rate_pct = 0.0; + g_stats.udp_loss_ranges[0] = '\0'; + g_stats.udp_loss_seq_sample[0] = '\0'; + g_stats.udp_recv_window_dist[0] = '\0'; +} + /* 对 common.h 的时间接口做一层薄包装,统一 logger 模块内部的调用入口。 */ static uint64_t now_ms(void) { @@ -240,6 +259,7 @@ void logger_init(void) metric_reset(&g_stats.send_buffer_pct); metric_reset(&g_stats.recv_buffer_pct); metric_reset(&g_stats.cwnd); + reset_transfer_observability_locked(); copy_context_field(g_ctx_app, sizeof(g_ctx_app), NULL, "unknown"); copy_context_field(g_ctx_proto, sizeof(g_ctx_proto), NULL, "unknown"); copy_context_field(g_ctx_mode, sizeof(g_ctx_mode), NULL, "unknown"); @@ -555,6 +575,43 @@ void logger_set_progress(uint64_t progress_bytes) pthread_mutex_unlock(&g_mu); } +void logger_reset_transfer_observability(void) +{ + pthread_mutex_lock(&g_mu); + reset_transfer_observability_locked(); + pthread_mutex_unlock(&g_mu); +} + +void logger_on_udp_loss_summary(uint64_t expected_chunks, + uint64_t received_chunks, + uint64_t lost_chunks, + uint64_t burst_count, + uint64_t burst_max_len, + const char *ranges, + const char *seq_sample, + const char *recv_window_dist) +{ + pthread_mutex_lock(&g_mu); + g_stats.udp_expected_chunks = expected_chunks; + g_stats.udp_received_chunks = received_chunks; + g_stats.udp_lost_chunks = lost_chunks; + g_stats.udp_loss_burst_count = burst_count; + g_stats.udp_loss_burst_max_len = burst_max_len; + g_stats.udp_loss_rate_pct = (expected_chunks > 0) + ? ((double)lost_chunks * 100.0) / (double)expected_chunks + : 0.0; + omni_copy_fixed_ascii(g_stats.udp_loss_ranges, + sizeof(g_stats.udp_loss_ranges), + ranges ? ranges : ""); + omni_copy_fixed_ascii(g_stats.udp_loss_seq_sample, + sizeof(g_stats.udp_loss_seq_sample), + seq_sample ? seq_sample : ""); + omni_copy_fixed_ascii(g_stats.udp_recv_window_dist, + sizeof(g_stats.udp_recv_window_dist), + recv_window_dist ? recv_window_dist : ""); + pthread_mutex_unlock(&g_mu); +} + /* 对外提供一个“当前总吞吐”的便捷计算接口。 */ double logger_calculate_throughput(void) { @@ -606,6 +663,9 @@ void logger_print_performance_log(const char *tag) char ctx_mode[sizeof(g_ctx_mode)]; char ctx_role[sizeof(g_ctx_role)]; char ctx_self_id[sizeof(g_ctx_self_id)]; + char udp_loss_ranges[sizeof(snapshot.udp_loss_ranges)]; + char udp_loss_seq_sample[sizeof(snapshot.udp_loss_seq_sample)]; + char udp_recv_window_dist[sizeof(snapshot.udp_recv_window_dist)]; pthread_mutex_lock(&g_mu); now = now_ms(); @@ -626,8 +686,15 @@ void logger_print_performance_log(const char *tag) omni_copy_fixed_ascii(ctx_mode, sizeof(ctx_mode), g_ctx_mode); omni_copy_fixed_ascii(ctx_role, sizeof(ctx_role), g_ctx_role); omni_copy_fixed_ascii(ctx_self_id, sizeof(ctx_self_id), g_ctx_self_id); + omni_copy_fixed_ascii(udp_loss_ranges, sizeof(udp_loss_ranges), snapshot.udp_loss_ranges); + omni_copy_fixed_ascii(udp_loss_seq_sample, sizeof(udp_loss_seq_sample), snapshot.udp_loss_seq_sample); + omni_copy_fixed_ascii(udp_recv_window_dist, sizeof(udp_recv_window_dist), snapshot.udp_recv_window_dist); pthread_mutex_unlock(&g_mu); + udp_loss_ranges[sizeof(udp_loss_ranges) - 1u] = '\0'; + udp_loss_seq_sample[sizeof(udp_loss_seq_sample) - 1u] = '\0'; + udp_recv_window_dist[sizeof(udp_recv_window_dist) - 1u] = '\0'; + progress_pct = 0.0; if (snapshot.total_work_bytes > 0) { progress_pct = ((double)snapshot.progress_bytes * 100.0) / @@ -651,7 +718,7 @@ void logger_print_performance_log(const char *tag) "processing_avg_ms=%.3f queue_avg_ms=%.3f transmission_avg_ms=%.3f propagation_avg_ms=%.3f end_to_end_avg_ms=%.3f " "send_buffer_pct=%.2f recv_buffer_pct=%.2f cwnd=%.2f " "last_rtt_ms=%llu min_rtt_ms=%llu max_rtt_ms=%llu " - "tcp_retrans=%llu tcp_data_segs_out=%llu tcp_data_bytes_sent=%llu tcp_retrans_bytes=%llu " + "tcp_retrans=%llu udp_retrans=%llu tcp_data_segs_out=%llu tcp_data_bytes_sent=%llu tcp_retrans_bytes=%llu " "kcp_retrans=%llu kcp_data_segs_out=%llu kcp_data_bytes_sent=%llu kcp_retrans_bytes=%llu\n", ctx_app[0] ? ctx_app : "unknown", ctx_proto[0] ? ctx_proto : "unknown", @@ -693,6 +760,7 @@ void logger_print_performance_log(const char *tag) (unsigned long long)((snapshot.min_rtt_ms == UINT64_MAX) ? 0 : snapshot.min_rtt_ms), (unsigned long long)snapshot.max_rtt_ms, (unsigned long long)snapshot.tcp_retrans, + (unsigned long long)snapshot.udp_retrans, (unsigned long long)snapshot.tcp_data_segs_out, (unsigned long long)snapshot.tcp_data_bytes_sent, (unsigned long long)snapshot.tcp_retrans_bytes, @@ -700,10 +768,39 @@ void logger_print_performance_log(const char *tag) (unsigned long long)snapshot.kcp_data_segs_out, (unsigned long long)snapshot.kcp_data_bytes_sent, (unsigned long long)snapshot.kcp_retrans_bytes); + if (snapshot.udp_expected_chunks > 0 || snapshot.udp_lost_chunks > 0) { + fprintf(fp, + "ts=%llu level=INFO component=perf_udp_loss app=%s proto=%s mode=%s role=%s self_id=%s " + "udp_expected_chunks=%llu udp_received_chunks=%llu udp_lost_chunks=%llu " + "udp_loss_rate_pct=%.2f udp_loss_burst_count=%llu udp_loss_burst_max_len=%llu " + "udp_loss_ranges=%s udp_loss_seq_sample=%s udp_recv_window_dist=%s\n", + (unsigned long long)now, + ctx_app[0] ? ctx_app : "unknown", + ctx_proto[0] ? ctx_proto : "unknown", + ctx_mode[0] ? ctx_mode : "unknown", + ctx_role[0] ? ctx_role : "unknown", + ctx_self_id[0] ? ctx_self_id : "-", + (unsigned long long)snapshot.udp_expected_chunks, + (unsigned long long)snapshot.udp_received_chunks, + (unsigned long long)snapshot.udp_lost_chunks, + snapshot.udp_loss_rate_pct, + (unsigned long long)snapshot.udp_loss_burst_count, + (unsigned long long)snapshot.udp_loss_burst_max_len, + udp_loss_ranges[0] ? udp_loss_ranges : "-", + udp_loss_seq_sample[0] ? udp_loss_seq_sample : "-", + udp_recv_window_dist[0] ? udp_recv_window_dist : "-"); + } } if (g_json_fp) { /* JSONL 输出便于后续脚本或可视化工具离线分析。 */ + char esc_udp_ranges[sizeof(snapshot.udp_loss_ranges) * 2u]; + char esc_udp_seq_sample[sizeof(snapshot.udp_loss_seq_sample) * 2u]; + char esc_udp_window_dist[sizeof(snapshot.udp_recv_window_dist) * 2u]; + + json_escape(udp_loss_ranges, esc_udp_ranges, sizeof(esc_udp_ranges)); + json_escape(udp_loss_seq_sample, esc_udp_seq_sample, sizeof(esc_udp_seq_sample)); + json_escape(udp_recv_window_dist, esc_udp_window_dist, sizeof(esc_udp_window_dist)); fprintf(g_json_fp, "{\"ts_ms\":%llu," "\"level\":\"INFO\"," @@ -764,13 +861,23 @@ void logger_print_performance_log(const char *tag) "\"min_rtt_ms\":%llu," "\"max_rtt_ms\":%llu," "\"tcp_retrans\":%llu," + "\"udp_retrans\":%llu," "\"tcp_data_segs_out\":%llu," "\"tcp_data_bytes_sent\":%llu," "\"tcp_retrans_bytes\":%llu," "\"kcp_retrans\":%llu," "\"kcp_data_segs_out\":%llu," "\"kcp_data_bytes_sent\":%llu," - "\"kcp_retrans_bytes\":%llu}\n", + "\"kcp_retrans_bytes\":%llu," + "\"udp_expected_chunks\":%llu," + "\"udp_received_chunks\":%llu," + "\"udp_lost_chunks\":%llu," + "\"udp_loss_rate_pct\":%.6f," + "\"udp_loss_burst_count\":%llu," + "\"udp_loss_burst_max_len\":%llu," + "\"udp_loss_ranges\":\"%s\"," + "\"udp_loss_seq_sample\":\"%s\"," + "\"udp_recv_window_dist\":\"%s\"}\n", (unsigned long long)now, ctx_app[0] ? ctx_app : "unknown", ctx_proto[0] ? ctx_proto : "unknown", @@ -828,13 +935,23 @@ void logger_print_performance_log(const char *tag) (unsigned long long)((snapshot.min_rtt_ms == UINT64_MAX) ? 0 : snapshot.min_rtt_ms), (unsigned long long)snapshot.max_rtt_ms, (unsigned long long)snapshot.tcp_retrans, + (unsigned long long)snapshot.udp_retrans, (unsigned long long)snapshot.tcp_data_segs_out, (unsigned long long)snapshot.tcp_data_bytes_sent, (unsigned long long)snapshot.tcp_retrans_bytes, (unsigned long long)snapshot.kcp_retrans, (unsigned long long)snapshot.kcp_data_segs_out, (unsigned long long)snapshot.kcp_data_bytes_sent, - (unsigned long long)snapshot.kcp_retrans_bytes); + (unsigned long long)snapshot.kcp_retrans_bytes, + (unsigned long long)snapshot.udp_expected_chunks, + (unsigned long long)snapshot.udp_received_chunks, + (unsigned long long)snapshot.udp_lost_chunks, + snapshot.udp_loss_rate_pct, + (unsigned long long)snapshot.udp_loss_burst_count, + (unsigned long long)snapshot.udp_loss_burst_max_len, + esc_udp_ranges, + esc_udp_seq_sample, + esc_udp_window_dist); fflush(g_json_fp); } } diff --git a/src/core/peer_transport.c b/src/core/peer_transport.c index 1fef2c3..b440fd5 100644 --- a/src/core/peer_transport.c +++ b/src/core/peer_transport.c @@ -7,9 +7,11 @@ #include #include #include +#include #include #include #include +#include #include #include #include @@ -29,6 +31,8 @@ struct PeerTransportSession { socklen_t addr_len; ikcpcb *kcp; uint32_t kcp_conv; + uint32_t *kcp_seg_xmit_seen; + size_t kcp_seg_xmit_cap; char remote_ip[64]; uint16_t remote_port; struct PeerTransportSession *next; @@ -44,6 +48,66 @@ struct PeerTransport { uint8_t *rx_frame; }; +#ifdef __linux__ +struct OmniLinuxTcpInfo { + uint8_t tcpi_state; + uint8_t tcpi_ca_state; + uint8_t tcpi_retransmits; + uint8_t tcpi_probes; + uint8_t tcpi_backoff; + uint8_t tcpi_options; + uint8_t tcpi_snd_wscale : 4, tcpi_rcv_wscale : 4; + uint8_t tcpi_delivery_rate_app_limited : 1, tcpi_fastopen_client_fail : 2; + uint32_t tcpi_rto; + uint32_t tcpi_ato; + uint32_t tcpi_snd_mss; + uint32_t tcpi_rcv_mss; + uint32_t tcpi_unacked; + uint32_t tcpi_sacked; + uint32_t tcpi_lost; + uint32_t tcpi_retrans; + uint32_t tcpi_fackets; + uint32_t tcpi_last_data_sent; + uint32_t tcpi_last_ack_sent; + uint32_t tcpi_last_data_recv; + uint32_t tcpi_last_ack_recv; + uint32_t tcpi_pmtu; + uint32_t tcpi_rcv_ssthresh; + uint32_t tcpi_rtt; + uint32_t tcpi_rttvar; + uint32_t tcpi_snd_ssthresh; + uint32_t tcpi_snd_cwnd; + uint32_t tcpi_advmss; + uint32_t tcpi_reordering; + uint32_t tcpi_rcv_rtt; + uint32_t tcpi_rcv_space; + uint32_t tcpi_total_retrans; + uint64_t tcpi_pacing_rate; + uint64_t tcpi_max_pacing_rate; + uint64_t tcpi_bytes_acked; + uint64_t tcpi_bytes_received; + uint32_t tcpi_segs_out; + uint32_t tcpi_segs_in; + uint32_t tcpi_notsent_bytes; + uint32_t tcpi_min_rtt; + uint32_t tcpi_data_segs_in; + uint32_t tcpi_data_segs_out; + uint64_t tcpi_delivery_rate; + uint64_t tcpi_busy_time; + uint64_t tcpi_rwnd_limited; + uint64_t tcpi_sndbuf_limited; + uint32_t tcpi_delivered; + uint32_t tcpi_delivered_ce; + uint64_t tcpi_bytes_sent; + uint64_t tcpi_bytes_retrans; +}; + +static int tcp_info_has_field(socklen_t len, size_t field_end) +{ + return (size_t)len >= field_end; +} +#endif + static void peer_transport_note_send(size_t bytes) { logger_on_send(bytes); @@ -58,6 +122,203 @@ static void peer_transport_note_recv(size_t bytes) logger_maybe_print_performance_log("peer_transport_recv"); } +static void sample_socket_buffers(int fd) +{ + int sndbuf = 0; + int rcvbuf = 0; + socklen_t optlen = sizeof(int); + int outq = 0; + int inq = 0; + double send_pct = 0.0; + double recv_pct = 0.0; + + if (fd < 0) { + return; + } + + if (getsockopt(fd, SOL_SOCKET, SO_SNDBUF, &sndbuf, &optlen) == 0 && sndbuf > 0) { +#ifdef TIOCOUTQ + if (ioctl(fd, TIOCOUTQ, &outq) == 0 && outq >= 0) { + send_pct = ((double)outq * 100.0) / (double)sndbuf; + logger_on_send_queue_bytes((size_t)outq); + } +#endif + } + + optlen = sizeof(int); + if (getsockopt(fd, SOL_SOCKET, SO_RCVBUF, &rcvbuf, &optlen) == 0 && rcvbuf > 0) { + if (ioctl(fd, FIONREAD, &inq) == 0 && inq >= 0) { + recv_pct = ((double)inq * 100.0) / (double)rcvbuf; + logger_on_recv_queue_bytes((size_t)inq); + } + } + + logger_on_buffer_status(send_pct, recv_pct); +} + +static void sample_tcp_info(int fd) +{ +#ifdef __linux__ + struct OmniLinuxTcpInfo ti; + uint64_t total_retrans = 0; + uint64_t data_segs_out = 0; + uint64_t bytes_sent = 0; + uint64_t bytes_retrans = 0; + socklen_t len = sizeof(ti); + + if (fd < 0) { + return; + } + memset(&ti, 0, sizeof(ti)); + if (getsockopt(fd, IPPROTO_TCP, TCP_INFO, &ti, &len) != 0) { + return; + } + + if (tcp_info_has_field(len, + offsetof(struct OmniLinuxTcpInfo, tcpi_total_retrans) + + sizeof(ti.tcpi_total_retrans))) { + total_retrans = (uint64_t)ti.tcpi_total_retrans; + } else { + total_retrans = (uint64_t)ti.tcpi_retrans; + } + if (tcp_info_has_field(len, + offsetof(struct OmniLinuxTcpInfo, tcpi_data_segs_out) + + sizeof(ti.tcpi_data_segs_out))) { + data_segs_out = (uint64_t)ti.tcpi_data_segs_out; + } + if (tcp_info_has_field(len, + offsetof(struct OmniLinuxTcpInfo, tcpi_bytes_sent) + + sizeof(ti.tcpi_bytes_sent))) { + bytes_sent = (uint64_t)ti.tcpi_bytes_sent; + } + if (tcp_info_has_field(len, + offsetof(struct OmniLinuxTcpInfo, tcpi_bytes_retrans) + + sizeof(ti.tcpi_bytes_retrans))) { + bytes_retrans = (uint64_t)ti.tcpi_bytes_retrans; + } + + logger_on_rtt((uint64_t)(ti.tcpi_rtt / 1000u)); + logger_on_tcp_transport(total_retrans, data_segs_out, bytes_sent, bytes_retrans); + logger_on_cwnd((double)ti.tcpi_snd_cwnd); + sample_socket_buffers(fd); +#else + (void)fd; +#endif +} + +static int kcp_ensure_seg_track_capacity(PeerTransportSession *session, uint32_t sn) +{ + size_t need; + size_t new_cap; + uint32_t *new_seen; + + if (!session) { + return OMNI_ERR_PARAM; + } + need = (size_t)sn + 1u; + if (need <= session->kcp_seg_xmit_cap) { + return OMNI_OK; + } + + new_cap = (session->kcp_seg_xmit_cap == 0) ? 64u : session->kcp_seg_xmit_cap; + while (new_cap < need) { + new_cap *= 2u; + } + + new_seen = (uint32_t *)realloc(session->kcp_seg_xmit_seen, + new_cap * sizeof(uint32_t)); + if (!new_seen) { + return OMNI_ERR_GENERIC; + } + memset(new_seen + session->kcp_seg_xmit_cap, + 0, + (new_cap - session->kcp_seg_xmit_cap) * sizeof(uint32_t)); + session->kcp_seg_xmit_seen = new_seen; + session->kcp_seg_xmit_cap = new_cap; + return OMNI_OK; +} + +static void sample_kcp_session(PeerTransportSession *session) +{ + struct IQUEUEHEAD *p; + double send_pct = 0.0; + double recv_pct = 0.0; + + if (!session || !session->kcp) { + return; + } + + for (p = session->kcp->snd_buf.next; p != &session->kcp->snd_buf; p = p->next) { + struct IKCPSEG *segment = iqueue_entry(p, struct IKCPSEG, node); + uint32_t prev_xmit; + + if (!segment || segment->len == 0) { + continue; + } + if (kcp_ensure_seg_track_capacity(session, segment->sn) != OMNI_OK) { + return; + } + + prev_xmit = session->kcp_seg_xmit_seen[segment->sn]; + if (prev_xmit == 0 && segment->xmit > 0) { + logger_on_kcp_tx(1u, (uint64_t)segment->len); + prev_xmit = 1u; + } + if (segment->xmit > prev_xmit) { + uint32_t delta = segment->xmit - prev_xmit; + logger_on_kcp_retrans((uint64_t)delta, + (uint64_t)delta * (uint64_t)segment->len); + } + session->kcp_seg_xmit_seen[segment->sn] = segment->xmit; + } + + if (session->kcp->rx_srtt > 0) { + logger_on_rtt((uint64_t)session->kcp->rx_srtt); + } + logger_on_cwnd((double)session->kcp->cwnd); + + if (session->kcp->snd_wnd > 0) { + send_pct = ((double)ikcp_waitsnd(session->kcp) * 100.0) / + (double)session->kcp->snd_wnd; + logger_on_send_queue_bytes((size_t)ikcp_waitsnd(session->kcp) * + (size_t)session->kcp->mss); + } + if (session->kcp->rcv_wnd > 0) { + recv_pct = ((double)session->kcp->nrcv_que * 100.0) / + (double)session->kcp->rcv_wnd; + logger_on_recv_queue_bytes((size_t)session->kcp->nrcv_que * + (size_t)session->kcp->mss); + } + logger_on_buffer_status(send_pct, recv_pct); +} + +static void peer_transport_sample_after_send(PeerTransport *transport, + PeerTransportSession *session) +{ + if (!transport || !session) { + return; + } + switch (transport->proto) { + case OMNI_PROTO_TCP: + sample_tcp_info(session->fd); + break; + case OMNI_PROTO_UDP: + sample_socket_buffers(transport->fd); + break; + case OMNI_PROTO_KCP: + sample_kcp_session(session); + break; + default: + break; + } +} + +static void peer_transport_sample_after_recv(PeerTransport *transport, + PeerTransportSession *session) +{ + peer_transport_sample_after_send(transport, session); +} + const char *peer_transport_proto_name(OmniProtocol proto) { switch (proto) { @@ -183,6 +444,7 @@ static void session_free(PeerTransportSession *session) if (session->kcp) { ikcp_release(session->kcp); } + free(session->kcp_seg_xmit_seen); free(session); } @@ -845,6 +1107,10 @@ int peer_transport_send(PeerTransport *transport, ssize_t n = -1; int rc = OMNI_OK; int raw_rc = 0; + uint64_t call_t0; + uint64_t proto_t0; + uint64_t proto_t1; + uint64_t call_t1; if (!transport) { return OMNI_ERR_PARAM; @@ -856,11 +1122,13 @@ int peer_transport_send(PeerTransport *transport, return OMNI_ERR_IO; } + call_t0 = omni_now_ms(); frame = alloc_frame(type, payload, payload_len, &frame_len); if (!frame) { return OMNI_ERR_GENERIC; } + proto_t0 = omni_now_ms(); switch (transport->proto) { case OMNI_PROTO_TCP: n = write_n(session->fd, frame, frame_len); @@ -891,9 +1159,14 @@ int peer_transport_send(PeerTransport *transport, rc = OMNI_ERR_PARAM; break; } + proto_t1 = omni_now_ms(); + call_t1 = proto_t1; + logger_on_proto_send_latency(proto_t1 - proto_t0); + logger_on_send_call_latency(call_t1 - call_t0); if (rc == OMNI_OK) { peer_transport_note_send(frame_len); + peer_transport_sample_after_send(transport, session); } else { logger_log("ERROR", "peer_transport", "send_failed proto=%s type=%u remote=%s:%u raw_rc=%d errno=%d", @@ -1088,21 +1361,39 @@ int peer_transport_next_event(PeerTransport *transport, size_t payload_cap, int timeout_ms) { + int rc; + uint64_t t0; + uint64_t t1; + if (!transport || !event || !payload_buf) { return OMNI_ERR_PARAM; } event_reset(event); + t0 = omni_now_ms(); switch (transport->proto) { case OMNI_PROTO_TCP: - return tcp_next_event(transport, event, payload_buf, payload_cap, timeout_ms); + rc = tcp_next_event(transport, event, payload_buf, payload_cap, timeout_ms); + break; case OMNI_PROTO_UDP: - return udp_next_event(transport, event, payload_buf, payload_cap, timeout_ms); + rc = udp_next_event(transport, event, payload_buf, payload_cap, timeout_ms); + break; case OMNI_PROTO_KCP: - return kcp_next_event(transport, event, payload_buf, payload_cap, timeout_ms); + rc = kcp_next_event(transport, event, payload_buf, payload_cap, timeout_ms); + break; default: - return OMNI_ERR_PARAM; + rc = OMNI_ERR_PARAM; + break; } + t1 = omni_now_ms(); + if (rc > 0) { + logger_on_proto_recv_latency(t1 - t0); + logger_on_recv_call_latency(t1 - t0); + if (event->session) { + peer_transport_sample_after_recv(transport, event->session); + } + } + return rc; } void peer_transport_close_session(PeerTransport *transport,