diff --git a/README.md b/README.md index 22f7ed1..db1d590 100644 --- a/README.md +++ b/README.md @@ -227,9 +227,9 @@ put /path/to/file.bin - `processing / queue / transmission / propagation / end_to_end` 都是“当前实现下的本地观测值或估算值”,不是严格意义上的物理链路精确测量值。 - `processing_*` 当前表示本地应用层处理耗时,主要来自文件分片封装、接收端写盘等路径;它不是“某一块硬件 CPU 的完整开销画像”。 -- `queue_*` 和 `transmission_*` 是基于当前吞吐和缓冲/队列状态反推出来的估算值。进程空转时间很长、样本很小或者当前速率很低时,这两个值可能明显偏大。 +- `queue_*` 和 `transmission_*` 是基于最近活跃窗口的吞吐和缓冲/队列状态反推出来的估算值。最近没有足够流量样本、样本太小或者当前速率太低时,这两个值可能直接为 `0`。 - `propagation_*` 当前来自 `min_rtt_ms / 2` 的估算;如果当前协议没有 RTT 样本,这组字段就是 `0`。 -- `end_to_end_*` 当前只在“最终接收文件的 peer”上有值,来源是发送端分片里的 `origin_ts_ms`。发送端、Hub、Bridge 一般是 `0`。 +- `end_to_end_*` 当前只在“最终接收文件的 peer”上有值,来源是发送端分片里的 `origin_ts_ms`。发送文件前会先做一次时钟同步,把发送端时间对齐到接收端时钟域;如果同步没建立,这组字段会保持 `0`,而不是给出误导性的跨机结果。 - UDP 丢包统计只在 `UDP 文件接收侧 peer` 上有值;UDP 发送侧、Hub、Bridge 不会产出这组汇总。 - `udp_retrans` 字段当前还没有实现应用层 UDP 重传统计,所以现在始终是 `0`。 - `TCP/KCP` 当前记录的是重传次数/重传字节/累计发送分片,不直接记录“重传频率”这个单独字段。 @@ -265,10 +265,10 @@ put /path/to/file.bin | 协议层发送耗时 | `proto_send_avg_ms` | ms | TCP/UDP/KCP 实际 send 路径耗时 EWMA | 发送很快且小于 `1ms` 时常为 `0` | | 协议层接收耗时 | `proto_recv_avg_ms` | ms | TCP/UDP/KCP 实际 recv 路径耗时 EWMA | 接收很快且小于 `1ms` 时常为 `0` | | 本地处理耗时 | `processing_avg_ms` / `processing_min_ms` / `processing_max_ms` | ms | 当前进程内的分片封装、写盘等本地处理耗时 | 仅在文件发送/接收路径上采样;操作太快时可能为 `0` | -| 排队延迟估算 | `queue_avg_ms` / `queue_min_ms` / `queue_max_ms` | ms | 根据发送/接收队列字节数和当前速率估算 | 没有队列样本时为 `0`;长空转后可能偏大 | -| 传输延迟估算 | `transmission_avg_ms` / `transmission_min_ms` / `transmission_max_ms` | ms | 根据当前速率估算“这些字节推上链路需要多久” | 没有流量样本时为 `0`;小样本/低速率时可能偏大 | +| 排队延迟估算 | `queue_avg_ms` / `queue_min_ms` / `queue_max_ms` | ms | 根据发送/接收队列字节数和最近活跃窗口速率估算 | 没有足够活跃样本时为 `0`;小样本/低速率时可能偏保守 | +| 传输延迟估算 | `transmission_avg_ms` / `transmission_min_ms` / `transmission_max_ms` | ms | 根据最近活跃窗口速率估算“这些字节推上链路需要多久” | 没有足够活跃样本时为 `0`;小样本/低速率时可能偏保守 | | 传播延迟估算 | `propagation_avg_ms` / `propagation_min_ms` / `propagation_max_ms` | ms | 基于 `min_rtt_ms / 2` 的估算 | 当前协议没有 RTT 样本时为 `0` | -| 端到端延迟 | `end_to_end_avg_ms` / `end_to_end_min_ms` / `end_to_end_max_ms` | ms | 发送端分片 `origin_ts_ms` 到接收端处理时刻的差值 | 只在最终接收文件的 `peer` 上有值;发送端 / Hub / Bridge 多数为 `0` | +| 端到端延迟 | `end_to_end_avg_ms` / `end_to_end_min_ms` / `end_to_end_max_ms` | ms | 发送端分片 `origin_ts_ms` 对齐到接收端时钟后,到接收端处理完成时刻的差值 | 只在最终接收文件的 `peer` 上有值;若时钟同步未建立则为 `0` | ### 可靠性字段 @@ -321,5 +321,6 @@ put /path/to/file.bin | 还没实现 | `udp_retrans` | 现在没有应用层 UDP 重传,所以这个字段只是预留 | | 当前角色不产出 | `udp_expected_chunks`、`udp_loss_*`、`end_to_end_*` | UDP 丢包统计只在最终接收文件的 UDP peer 上有值;`end_to_end_*` 也主要只在最终接收端有值 | | 没有采样到 RTT | `last_rtt_ms`、`propagation_*` | UDP 当前没有 RTT 探针;TCP/KCP 只有在拿到对应协议样本后才有值 | +| 没有建立时钟同步 | `end_to_end_*` | 发送端尚未和接收端完成 `TIME_SYNC_*` 探测,或同步结果太旧/未到达 | | 时间分辨率太粗 | `send_call_*`、`proto_send_avg_ms`、`processing_*` | 当前很多路径按毫秒计时,本地回环下大量操作小于 `1ms`,所以会显示 `0` | | 当前没有任务 | `progress_*` | 只注册但没有文件传输时,这组字段自然是 `0` | diff --git a/include/logger.h b/include/logger.h index 05cd1ac..4096b8e 100644 --- a/include/logger.h +++ b/include/logger.h @@ -31,6 +31,12 @@ typedef struct OmniStats { uint64_t bytes_recv; /* 接收总字节数 */ uint64_t window_bytes_sent; /* 当前 1 秒窗口发送字节数 */ uint64_t window_bytes_recv; /* 当前 1 秒窗口接收字节数 */ + uint64_t delay_window_start_send_ms; /* 延时估算使用的最近发送活跃窗口起点 */ + uint64_t delay_window_start_recv_ms; /* 延时估算使用的最近接收活跃窗口起点 */ + uint64_t delay_window_bytes_sent; /* 延时估算使用的最近发送活跃窗口字节数 */ + uint64_t delay_window_bytes_recv; /* 延时估算使用的最近接收活跃窗口字节数 */ + uint64_t last_send_activity_ms; /* 最近一次发送活跃时间 */ + uint64_t last_recv_activity_ms; /* 最近一次接收活跃时间 */ uint64_t send_count; /* 调用 omni_send 次数 */ uint64_t recv_count; /* 调用 omni_recv 次数 */ diff --git a/src/apps/peer_main.c b/src/apps/peer_main.c index 98f293b..5009457 100644 --- a/src/apps/peer_main.c +++ b/src/apps/peer_main.c @@ -30,6 +30,9 @@ #define PEER_MAX_PAYLOAD (PEER_TUNNEL_META_SIZE + 65536u) #define PEER_MAX_CHUNK_SIZE (65536u - TRANSFER_CHUNK_META_SIZE) #define PEER_OUTPUT_PATH_SIZE 512u +#define PEER_TIME_SYNC_SAMPLES 4u +#define PEER_TIME_SYNC_PROBE_TIMEOUT_MS 400u +#define PEER_TIME_SYNC_MAX_AGE_MS 60000u typedef enum PeerMode { PEER_MODE_HUB = 0, @@ -49,6 +52,25 @@ typedef struct PeerRecvFileState { size_t recv_window_cap; } PeerRecvFileState; +typedef struct PeerTimeSyncPeer { + struct PeerTimeSyncPeer *next; + char peer_id[OMNI_PEER_ID_SIZE]; + int has_offset; + int64_t peer_minus_local_offset_ms; + uint64_t best_rtt_ms; + uint32_t sample_count; + uint64_t last_sync_local_ms; + + int sync_in_progress; + int pending_response; + uint32_t next_probe_id; + uint32_t pending_probe_id; + uint64_t pending_send_ts_ms; + int64_t sync_best_offset_ms; + uint64_t sync_best_rtt_ms; + uint32_t sync_success_count; +} PeerTimeSyncPeer; + typedef struct PeerRuntime { PeerTransport *transport; PeerTransportSession *session; @@ -61,6 +83,7 @@ typedef struct PeerRuntime { char bound_peer[OMNI_PEER_ID_SIZE]; char output_path[PEER_OUTPUT_PATH_SIZE]; PeerRecvFileState rx_file; + PeerTimeSyncPeer *time_sync_peers; } PeerRuntime; typedef struct StartupActions { @@ -76,6 +99,10 @@ typedef struct StartupActions { static volatile sig_atomic_t g_stop = 0; +static void handle_transport_event(PeerRuntime *rt, + const PeerTransportEvent *event, + const uint8_t *payload); + static void on_signal(int signo) { (void)signo; @@ -179,6 +206,94 @@ static void peer_set_bound_peer(PeerRuntime *rt, const char *peer_id) omni_copy_fixed_ascii(rt->bound_peer, sizeof(rt->bound_peer), peer_id); } +static PeerTimeSyncPeer *peer_time_sync_find(PeerRuntime *rt, + const char *peer_id, + int create) +{ + PeerTimeSyncPeer *peer; + + if (!rt || !peer_id_is_valid(peer_id)) { + return NULL; + } + + for (peer = rt->time_sync_peers; peer; peer = peer->next) { + if (strcmp(peer->peer_id, peer_id) == 0) { + return peer; + } + } + if (!create) { + return NULL; + } + + peer = (PeerTimeSyncPeer *)calloc(1, sizeof(*peer)); + if (!peer) { + return NULL; + } + omni_copy_fixed_ascii(peer->peer_id, sizeof(peer->peer_id), peer_id); + peer->next_probe_id = 1u; + peer->next = rt->time_sync_peers; + rt->time_sync_peers = peer; + return peer; +} + +static void peer_time_sync_clear_transient(PeerTimeSyncPeer *peer) +{ + if (!peer) { + return; + } + peer->sync_in_progress = 0; + peer->pending_response = 0; + peer->pending_probe_id = 0; + peer->pending_send_ts_ms = 0; + peer->sync_best_offset_ms = 0; + peer->sync_best_rtt_ms = 0; + peer->sync_success_count = 0; +} + +static void peer_time_sync_free_all(PeerRuntime *rt) +{ + PeerTimeSyncPeer *peer; + PeerTimeSyncPeer *next; + + if (!rt) { + return; + } + for (peer = rt->time_sync_peers; peer; peer = next) { + next = peer->next; + free(peer); + } + rt->time_sync_peers = NULL; +} + +static int peer_time_sync_offset_is_fresh(const PeerTimeSyncPeer *peer, uint64_t now_ms) +{ + if (!peer || !peer->has_offset || peer->last_sync_local_ms == 0) { + return 0; + } + if (now_ms < peer->last_sync_local_ms) { + return 0; + } + return now_ms - peer->last_sync_local_ms <= PEER_TIME_SYNC_MAX_AGE_MS; +} + +static int peer_time_sync_local_to_peer_ts(const PeerTimeSyncPeer *peer, + uint64_t local_ts_ms, + uint64_t *out_peer_ts_ms) +{ + int64_t adjusted; + + if (!peer || !peer->has_offset || !out_peer_ts_ms || local_ts_ms == 0) { + return 0; + } + + adjusted = (int64_t)local_ts_ms + peer->peer_minus_local_offset_ms; + if (adjusted <= 0) { + return 0; + } + *out_peer_ts_ms = (uint64_t)adjusted; + return 1; +} + static void close_recv_file(PeerRuntime *rt) { if (!rt) { @@ -609,11 +724,338 @@ static int peer_send_transfer_ack(PeerRuntime *rt, TRANSFER_ACK_META_SIZE); } +static int peer_send_time_sync_probe(PeerRuntime *rt, + const char *dst_id, + PeerTimeSyncPeer *peer) +{ + TimeSyncProbeMeta probe_meta; + uint64_t now_ms; + uint32_t probe_id; + + if (!rt || !peer || !peer_id_is_valid(dst_id)) { + return OMNI_ERR_PARAM; + } + + now_ms = omni_now_ms(); + probe_id = peer->next_probe_id++; + if (probe_id == 0u) { + probe_id = peer->next_probe_id++; + } + + peer->pending_response = 1; + peer->pending_probe_id = probe_id; + peer->pending_send_ts_ms = now_ms; + omni_time_sync_probe_meta_encode(&probe_meta, probe_id, now_ms); + logger_log("DEBUG", "peer", + "time_sync_probe_send peer_id=%s probe_id=%u", + dst_id, + (unsigned)probe_id); + return peer_send_inner(rt, + dst_id, + MSG_TYPE_TIME_SYNC_REQ, + &probe_meta, + TIME_SYNC_PROBE_META_SIZE); +} + +static int peer_send_time_sync_report(PeerRuntime *rt, + const char *dst_id, + int64_t peer_minus_local_offset_ms, + uint64_t best_rtt_ms, + uint32_t sample_count) +{ + TimeSyncReportMeta report_meta; + + if (!rt || !peer_id_is_valid(dst_id)) { + return OMNI_ERR_PARAM; + } + + omni_time_sync_report_meta_encode(&report_meta, + peer_minus_local_offset_ms, + best_rtt_ms, + sample_count); + logger_log("DEBUG", "peer", + "time_sync_report_send peer_id=%s offset_ms=%lld best_rtt_ms=%llu sample_count=%u", + dst_id, + (long long)peer_minus_local_offset_ms, + (unsigned long long)best_rtt_ms, + (unsigned)sample_count); + return peer_send_inner(rt, + dst_id, + MSG_TYPE_TIME_SYNC_REPORT, + &report_meta, + TIME_SYNC_REPORT_META_SIZE); +} + +static int peer_wait_for_time_sync_probe(PeerRuntime *rt, + PeerTimeSyncPeer *peer, + uint64_t deadline_ms) +{ + uint8_t payload[PEER_MAX_PAYLOAD]; + + if (!rt || !peer) { + return OMNI_ERR_PARAM; + } + + while (rt->running && !g_stop && peer->pending_response) { + PeerTransportEvent event; + uint64_t now_ms = omni_now_ms(); + int rc; + int timeout_ms; + + if (now_ms >= deadline_ms) { + return OMNI_ERR_TIMEOUT; + } + + timeout_ms = (int)(deadline_ms - now_ms); + if (timeout_ms > 50) { + timeout_ms = 50; + } + rc = peer_transport_next_event(rt->transport, + &event, + payload, + sizeof(payload), + timeout_ms); + if (rc < 0) { + return rc; + } + if (rc > 0) { + handle_transport_event(rt, &event, payload); + } + } + + return peer->pending_response ? OMNI_ERR_TIMEOUT : OMNI_OK; +} + +static int peer_ensure_time_sync(PeerRuntime *rt, const char *dst_id) +{ + PeerTimeSyncPeer *peer; + uint64_t now_ms; + int had_previous_offset = 0; + int64_t previous_offset_ms = 0; + uint64_t previous_best_rtt_ms = 0; + uint32_t previous_sample_count = 0; + + if (!rt || !peer_id_is_valid(dst_id)) { + return OMNI_ERR_PARAM; + } + + now_ms = omni_now_ms(); + peer = peer_time_sync_find(rt, dst_id, 1); + if (!peer) { + return OMNI_ERR_GENERIC; + } + if (peer_time_sync_offset_is_fresh(peer, now_ms)) { + return OMNI_OK; + } + + had_previous_offset = peer->has_offset; + previous_offset_ms = peer->peer_minus_local_offset_ms; + previous_best_rtt_ms = peer->best_rtt_ms; + previous_sample_count = peer->sample_count; + + peer_time_sync_clear_transient(peer); + peer->sync_in_progress = 1; + peer->sync_best_rtt_ms = UINT64_MAX; + + for (uint32_t i = 0; i < PEER_TIME_SYNC_SAMPLES && rt->running && !g_stop; ++i) { + int rc; + + rc = peer_send_time_sync_probe(rt, dst_id, peer); + if (rc != OMNI_OK) { + peer_time_sync_clear_transient(peer); + return rc; + } + rc = peer_wait_for_time_sync_probe(rt, + peer, + omni_now_ms() + PEER_TIME_SYNC_PROBE_TIMEOUT_MS); + if (rc != OMNI_OK) { + peer->pending_response = 0; + logger_log("WARN", "peer", + "time_sync_probe_timeout peer_id=%s sample_index=%u", + dst_id, + (unsigned)i); + } + } + + peer->sync_in_progress = 0; + peer->pending_response = 0; + peer->pending_probe_id = 0; + peer->pending_send_ts_ms = 0; + + if (peer->sync_success_count > 0 && peer->sync_best_rtt_ms != UINT64_MAX) { + peer->has_offset = 1; + peer->peer_minus_local_offset_ms = peer->sync_best_offset_ms; + peer->best_rtt_ms = peer->sync_best_rtt_ms; + peer->sample_count = peer->sync_success_count; + peer->last_sync_local_ms = omni_now_ms(); + (void)peer_send_time_sync_report(rt, + dst_id, + peer->peer_minus_local_offset_ms, + peer->best_rtt_ms, + peer->sample_count); + logger_log("INFO", "peer", + "time_sync_ready peer_id=%s offset_ms=%lld best_rtt_ms=%llu sample_count=%u", + dst_id, + (long long)peer->peer_minus_local_offset_ms, + (unsigned long long)peer->best_rtt_ms, + (unsigned)peer->sample_count); + peer->sync_best_offset_ms = 0; + peer->sync_best_rtt_ms = 0; + peer->sync_success_count = 0; + return OMNI_OK; + } + + if (had_previous_offset) { + peer->has_offset = 1; + peer->peer_minus_local_offset_ms = previous_offset_ms; + peer->best_rtt_ms = previous_best_rtt_ms; + peer->sample_count = previous_sample_count; + logger_log("WARN", "peer", + "time_sync_refresh_failed_use_cached peer_id=%s offset_ms=%lld best_rtt_ms=%llu", + dst_id, + (long long)peer->peer_minus_local_offset_ms, + (unsigned long long)peer->best_rtt_ms); + peer->sync_best_offset_ms = 0; + peer->sync_best_rtt_ms = 0; + peer->sync_success_count = 0; + return OMNI_OK; + } + + peer_time_sync_clear_transient(peer); + logger_log("WARN", "peer", "time_sync_unavailable peer_id=%s", dst_id); + return OMNI_ERR_TIMEOUT; +} + +static void handle_time_sync_request(PeerRuntime *rt, + const PeerTunnelMeta *tunnel_meta, + const uint8_t *payload, + uint32_t payload_len) +{ + TimeSyncProbeMeta probe_meta; + TimeSyncReplyMeta reply_meta; + uint64_t recv_ts_ms; + uint64_t send_ts_ms; + + if (payload_len < TIME_SYNC_PROBE_META_SIZE) { + logger_log("WARN", "peer", "short_time_sync_req len=%u", (unsigned)payload_len); + return; + } + if (!peer_id_is_valid(tunnel_meta->src_id)) { + logger_log("WARN", "peer", "time_sync_req_missing_src"); + return; + } + + recv_ts_ms = omni_now_ms(); + omni_time_sync_probe_meta_decode((const TimeSyncProbeMeta *)payload, &probe_meta); + send_ts_ms = omni_now_ms(); + omni_time_sync_reply_meta_encode(&reply_meta, + probe_meta.probe_id, + probe_meta.client_send_ts_ms, + recv_ts_ms, + send_ts_ms); + (void)peer_send_inner(rt, + tunnel_meta->src_id, + MSG_TYPE_TIME_SYNC_RESP, + &reply_meta, + TIME_SYNC_REPLY_META_SIZE); +} + +static void handle_time_sync_response(PeerRuntime *rt, + const PeerTunnelMeta *tunnel_meta, + const uint8_t *payload, + uint32_t payload_len) +{ + PeerTimeSyncPeer *peer; + TimeSyncReplyMeta reply_meta; + uint64_t recv_ts_ms; + uint64_t rtt_ms; + int64_t offset_ms; + + if (!rt || payload_len < TIME_SYNC_REPLY_META_SIZE) { + logger_log("WARN", "peer", "short_time_sync_resp len=%u", (unsigned)payload_len); + return; + } + if (!peer_id_is_valid(tunnel_meta->src_id)) { + logger_log("WARN", "peer", "time_sync_resp_missing_src"); + return; + } + + peer = peer_time_sync_find(rt, tunnel_meta->src_id, 1); + if (!peer || !peer->sync_in_progress || !peer->pending_response) { + return; + } + + omni_time_sync_reply_meta_decode((const TimeSyncReplyMeta *)payload, &reply_meta); + if (reply_meta.probe_id != peer->pending_probe_id || + reply_meta.client_send_ts_ms != peer->pending_send_ts_ms) { + return; + } + + recv_ts_ms = omni_now_ms(); + if (recv_ts_ms < peer->pending_send_ts_ms) { + peer->pending_response = 0; + return; + } + + rtt_ms = recv_ts_ms - peer->pending_send_ts_ms; + offset_ms = (((int64_t)reply_meta.server_recv_ts_ms - (int64_t)peer->pending_send_ts_ms) + + ((int64_t)reply_meta.server_send_ts_ms - (int64_t)recv_ts_ms)) / 2; + if (peer->sync_success_count == 0 || rtt_ms < peer->sync_best_rtt_ms) { + peer->sync_best_rtt_ms = rtt_ms; + peer->sync_best_offset_ms = offset_ms; + } + peer->sync_success_count++; + peer->pending_response = 0; + logger_log("DEBUG", "peer", + "time_sync_sample peer_id=%s probe_id=%u rtt_ms=%llu offset_ms=%lld", + tunnel_meta->src_id, + (unsigned)reply_meta.probe_id, + (unsigned long long)rtt_ms, + (long long)offset_ms); +} + +static void handle_time_sync_report(PeerRuntime *rt, + const PeerTunnelMeta *tunnel_meta, + const uint8_t *payload, + uint32_t payload_len) +{ + PeerTimeSyncPeer *peer; + TimeSyncReportMeta report_meta; + + if (!rt || payload_len < TIME_SYNC_REPORT_META_SIZE) { + logger_log("WARN", "peer", "short_time_sync_report len=%u", (unsigned)payload_len); + return; + } + if (!peer_id_is_valid(tunnel_meta->src_id)) { + logger_log("WARN", "peer", "time_sync_report_missing_src"); + return; + } + + peer = peer_time_sync_find(rt, tunnel_meta->src_id, 1); + if (!peer) { + return; + } + + omni_time_sync_report_meta_decode((const TimeSyncReportMeta *)payload, &report_meta); + peer->has_offset = 1; + peer->peer_minus_local_offset_ms = -report_meta.server_minus_client_offset_ms; + peer->best_rtt_ms = report_meta.best_rtt_ms; + peer->sample_count = report_meta.sample_count; + peer->last_sync_local_ms = omni_now_ms(); + logger_log("INFO", "peer", + "time_sync_report_apply peer_id=%s offset_ms=%lld best_rtt_ms=%llu sample_count=%u", + tunnel_meta->src_id, + (long long)peer->peer_minus_local_offset_ms, + (unsigned long long)peer->best_rtt_ms, + (unsigned)peer->sample_count); +} + static int peer_send_file(PeerRuntime *rt, const char *dst_id, const char *file_path, unsigned chunk_size) { + PeerTimeSyncPeer *sync_peer = NULL; char effective_dst[OMNI_PEER_ID_SIZE]; FILE *fp = NULL; uint8_t *chunk = NULL; @@ -638,6 +1080,13 @@ static int peer_send_file(PeerRuntime *rt, if (peer_resolve_dst(rt, dst_id, effective_dst) != OMNI_OK) { return OMNI_ERR_PARAM; } + if (peer_ensure_time_sync(rt, effective_dst) == OMNI_OK) { + sync_peer = peer_time_sync_find(rt, effective_dst, 0); + } else { + logger_log("WARN", "peer", + "file_send_without_time_sync dst_id=%s end_to_end_will_be_zero", + effective_dst); + } fp = fopen(file_path, "rb"); if (!fp) { @@ -676,8 +1125,15 @@ static int peer_send_file(PeerRuntime *rt, if (nread > 0) { TransferChunkMeta meta; - uint64_t chunk_origin_ts_ms = omni_now_ms(); - uint32_t window_id = (uint32_t)((chunk_origin_ts_ms - transfer_start_ms) / 1000u); + uint64_t chunk_origin_local_ts_ms = omni_now_ms(); + uint64_t chunk_origin_peer_ts_ms = 0; + uint32_t window_id = (uint32_t)((chunk_origin_local_ts_ms - transfer_start_ms) / 1000u); + + if (sync_peer) { + (void)peer_time_sync_local_to_peer_ts(sync_peer, + chunk_origin_local_ts_ms, + &chunk_origin_peer_ts_ms); + } omni_transfer_chunk_meta_encode(&meta, transfer_id, @@ -687,7 +1143,7 @@ static int peer_send_file(PeerRuntime *rt, total_bytes, offset, (uint32_t)nread, - chunk_origin_ts_ms); + chunk_origin_peer_ts_ms); memcpy(payload, &meta, TRANSFER_CHUNK_META_SIZE); memcpy(payload + TRANSFER_CHUNK_META_SIZE, chunk, nread); if (window_id > max_window_id) { @@ -1027,6 +1483,18 @@ static void handle_tunnel_message(PeerRuntime *rt, const uint8_t *payload, uint3 handle_transfer_ack_message(&tunnel_meta, inner_payload, inner_len); return; } + if (tunnel_meta.inner_type == MSG_TYPE_TIME_SYNC_REQ) { + handle_time_sync_request(rt, &tunnel_meta, inner_payload, inner_len); + return; + } + if (tunnel_meta.inner_type == MSG_TYPE_TIME_SYNC_RESP) { + handle_time_sync_response(rt, &tunnel_meta, inner_payload, inner_len); + return; + } + if (tunnel_meta.inner_type == MSG_TYPE_TIME_SYNC_REPORT) { + handle_time_sync_report(rt, &tunnel_meta, inner_payload, inner_len); + return; + } fprintf(stdout, "[peer %s -> %s] unsupported inner_type=%u payload_len=%u\n", @@ -1221,6 +1689,7 @@ static void handle_transport_event(PeerRuntime *rt, rt->session = NULL; rt->direct_register_sent = 0; peer_set_bound_peer(rt, ""); + peer_time_sync_free_all(rt); peer_finalize_recv_observability(rt, 0); close_recv_file(rt); rt->rx_file.transfer_id = 0; @@ -1490,6 +1959,7 @@ int main(int argc, char **argv) peer_finalize_recv_observability(&rt, 0); close_recv_file(&rt); + peer_time_sync_free_all(&rt); peer_transport_close(rt.transport); logger_print_performance_log("final"); return 0; diff --git a/src/core/logger.c b/src/core/logger.c index ee79d5f..3ae622e 100644 --- a/src/core/logger.c +++ b/src/core/logger.c @@ -29,6 +29,9 @@ static char g_ctx_mode[16]; static char g_ctx_role[16]; static char g_ctx_self_id[OMNI_PEER_ID_SIZE]; +#define OMNI_DELAY_WINDOW_MS 1000u +#define OMNI_DELAY_IDLE_RESET_MS 1000u + static void metric_reset(OmniMetricSummary *metric); static void reset_transfer_observability_locked(void) @@ -130,28 +133,66 @@ static double metric_min(const OmniMetricSummary *metric) return metric->min; } +static double derived_propagation_ms(uint64_t min_rtt_ms) +{ + if (min_rtt_ms == 0 || min_rtt_ms == UINT64_MAX) { + return 0.0; + } + return (double)min_rtt_ms / 2.0; +} + +static void delay_window_note_locked(uint64_t now, size_t bytes, int is_send) +{ + uint64_t *window_start_ms = is_send ? &g_stats.delay_window_start_send_ms + : &g_stats.delay_window_start_recv_ms; + uint64_t *window_bytes = is_send ? &g_stats.delay_window_bytes_sent + : &g_stats.delay_window_bytes_recv; + uint64_t *last_activity_ms = is_send ? &g_stats.last_send_activity_ms + : &g_stats.last_recv_activity_ms; + + if (*window_start_ms == 0 || + (*last_activity_ms > 0 && now - *last_activity_ms > OMNI_DELAY_IDLE_RESET_MS) || + now - *window_start_ms > OMNI_DELAY_WINDOW_MS) { + *window_start_ms = now; + *window_bytes = 0; + } + + *window_bytes += (uint64_t)bytes; + *last_activity_ms = now; +} + /* - * 基于当前窗口内流量和累计流量估算“此刻本机可提供的服务速率”。 - * 优先使用当前窗口速率;若窗口内尚无样本,则退化为全局平均速率。 + * 仅基于最近活跃窗口估算“当前本机可提供的服务速率”。 + * 最近一段时间没有流量时直接放弃估算,避免把长时间空转折算进时延。 */ static double live_rate_mbps_locked(uint64_t now, int is_send) { - uint64_t window_elapsed_ms = now - g_stats.window_start_ms; - uint64_t total_elapsed_ms = now - g_stats.start_ms; - uint64_t window_bytes = is_send ? g_stats.window_bytes_sent : g_stats.window_bytes_recv; - uint64_t total_bytes = is_send ? g_stats.bytes_sent : g_stats.bytes_recv; + uint64_t window_start_ms = is_send ? g_stats.delay_window_start_send_ms + : g_stats.delay_window_start_recv_ms; + uint64_t window_bytes = is_send ? g_stats.delay_window_bytes_sent + : g_stats.delay_window_bytes_recv; + uint64_t last_activity_ms = is_send ? g_stats.last_send_activity_ms + : g_stats.last_recv_activity_ms; + uint64_t window_elapsed_ms; - if (window_elapsed_ms > 0 && window_bytes > 0) { - return ((double)window_bytes * 8.0) / - ((double)window_elapsed_ms / 1000.0) / - 1000000.0; + if (window_start_ms == 0 || window_bytes == 0 || last_activity_ms == 0) { + return 0.0; } - if (total_elapsed_ms > 0 && total_bytes > 0) { - return ((double)total_bytes * 8.0) / - ((double)total_elapsed_ms / 1000.0) / - 1000000.0; + if (now < window_start_ms || now < last_activity_ms) { + return 0.0; } - return 0.0; + if (now - last_activity_ms > OMNI_DELAY_IDLE_RESET_MS) { + return 0.0; + } + + window_elapsed_ms = now - window_start_ms; + if (window_elapsed_ms == 0) { + return 0.0; + } + + return ((double)window_bytes * 8.0) / + ((double)window_elapsed_ms / 1000.0) / + 1000000.0; } /* 用本机当前服务速率把字节数换算成时延估计。 */ @@ -311,9 +352,12 @@ void logger_set_context(const char *app, /* 记录一次发送事件,更新累计发送字节和窗口内发送字节。 */ void logger_on_send(size_t bytes) { + uint64_t now = now_ms(); + pthread_mutex_lock(&g_mu); g_stats.bytes_sent += bytes; g_stats.window_bytes_sent += bytes; + delay_window_note_locked(now, bytes, 1); g_stats.send_count++; pthread_mutex_unlock(&g_mu); } @@ -321,14 +365,17 @@ void logger_on_send(size_t bytes) /* 记录一次接收事件,更新累计接收字节和窗口内接收字节。 */ void logger_on_recv(size_t bytes) { + uint64_t now = now_ms(); + pthread_mutex_lock(&g_mu); g_stats.bytes_recv += bytes; g_stats.window_bytes_recv += bytes; + delay_window_note_locked(now, bytes, 0); g_stats.recv_count++; pthread_mutex_unlock(&g_mu); } -/* 更新最近一次 RTT 和历史最大 RTT。 */ +/* 更新最近一次 RTT 以及进程生命周期内的最小/最大 RTT。 */ void logger_on_rtt(uint64_t rtt_ms) { if (rtt_ms == 0) { @@ -343,11 +390,6 @@ void logger_on_rtt(uint64_t rtt_ms) if (rtt_ms > g_stats.max_rtt_ms) { g_stats.max_rtt_ms = rtt_ms; } - /* - * 传播时延无法直接观测,这里统一采用 min RTT / 2 作为链路基线估算。 - * min RTT 更接近“基本无排队”时的往返时间,因此比 last RTT 更稳。 - */ - metric_observe(&g_stats.propagation_delay_ms, (double)g_stats.min_rtt_ms / 2.0); pthread_mutex_unlock(&g_mu); } @@ -658,6 +700,9 @@ void logger_print_performance_log(const char *tag) uint64_t now; OmniStats snapshot; double progress_pct; + double propagation_avg_ms; + double propagation_min_ms; + double propagation_max_ms; char ctx_app[sizeof(g_ctx_app)]; char ctx_proto[sizeof(g_ctx_proto)]; char ctx_mode[sizeof(g_ctx_mode)]; @@ -700,6 +745,15 @@ void logger_print_performance_log(const char *tag) progress_pct = ((double)snapshot.progress_bytes * 100.0) / (double)snapshot.total_work_bytes; } + if (snapshot.propagation_delay_ms.count > 0) { + propagation_avg_ms = metric_avg(&snapshot.propagation_delay_ms); + propagation_min_ms = metric_min(&snapshot.propagation_delay_ms); + propagation_max_ms = snapshot.propagation_delay_ms.max; + } else { + propagation_avg_ms = derived_propagation_ms(snapshot.min_rtt_ms); + propagation_min_ms = propagation_avg_ms; + propagation_max_ms = propagation_avg_ms; + } { /* 人读友好的单行文本日志,适合终端直接观察。 */ @@ -751,7 +805,7 @@ void logger_print_performance_log(const char *tag) metric_avg(&snapshot.processing_delay_ms), metric_avg(&snapshot.queue_delay_ms), metric_avg(&snapshot.transmission_delay_ms), - metric_avg(&snapshot.propagation_delay_ms), + propagation_avg_ms, metric_avg(&snapshot.end_to_end_delay_ms), snapshot.send_buffer_pct.last, snapshot.recv_buffer_pct.last, @@ -916,9 +970,9 @@ void logger_print_performance_log(const char *tag) metric_avg(&snapshot.transmission_delay_ms), metric_min(&snapshot.transmission_delay_ms), snapshot.transmission_delay_ms.max, - metric_avg(&snapshot.propagation_delay_ms), - metric_min(&snapshot.propagation_delay_ms), - snapshot.propagation_delay_ms.max, + propagation_avg_ms, + propagation_min_ms, + propagation_max_ms, metric_avg(&snapshot.end_to_end_delay_ms), metric_min(&snapshot.end_to_end_delay_ms), snapshot.end_to_end_delay_ms.max, diff --git a/src/core/peer_transport.c b/src/core/peer_transport.c index b440fd5..7aa7fcf 100644 --- a/src/core/peer_transport.c +++ b/src/core/peer_transport.c @@ -197,7 +197,7 @@ static void sample_tcp_info(int fd) bytes_retrans = (uint64_t)ti.tcpi_bytes_retrans; } - logger_on_rtt((uint64_t)(ti.tcpi_rtt / 1000u)); + logger_on_rtt((ti.tcpi_rtt == 0u) ? 0u : (((uint64_t)ti.tcpi_rtt + 999u) / 1000u)); logger_on_tcp_transport(total_retrans, data_segs_out, bytes_sent, bytes_retrans); logger_on_cwnd((double)ti.tcpi_snd_cwnd); sample_socket_buffers(fd); diff --git a/src/protocols/tcp_impl.c b/src/protocols/tcp_impl.c index f7cbf27..5c2e229 100644 --- a/src/protocols/tcp_impl.c +++ b/src/protocols/tcp_impl.c @@ -100,6 +100,14 @@ static int tcp_info_has_field(socklen_t len, size_t field_end) { return (size_t)len >= field_end; } + +static uint64_t tcp_info_rtt_us_to_ms(uint32_t rtt_us) +{ + if (rtt_us == 0u) { + return 0u; + } + return ((uint64_t)rtt_us + 999u) / 1000u; +} #endif struct TcpContext { @@ -168,9 +176,9 @@ static void tcp_log_info(int fd, const char *tag) return; } - /* 注意:tcpi_rtt 单位通常为微秒(Linux),这里转 ms 仅用于日志观察 */ - unsigned long long rtt_ms = (unsigned long long)(ti.tcpi_rtt / 1000u); - unsigned long long rttvar_ms = (unsigned long long)(ti.tcpi_rttvar / 1000u); + /* 注意:tcpi_rtt / tcpi_rttvar 单位通常为微秒(Linux),这里向上取整到 ms。 */ + unsigned long long rtt_ms = (unsigned long long)tcp_info_rtt_us_to_ms(ti.tcpi_rtt); + unsigned long long rttvar_ms = (unsigned long long)tcp_info_rtt_us_to_ms(ti.tcpi_rttvar); if (tcp_info_has_field(len, offsetof(struct OmniLinuxTcpInfo, tcpi_total_retrans) + sizeof(ti.tcpi_total_retrans))) {