更新tcp日志

This commit is contained in:
MOCK
2026-03-20 09:04:24 +08:00
parent 4b95d26f13
commit df7947c500
6 changed files with 576 additions and 37 deletions

View File

@@ -227,9 +227,9 @@ put /path/to/file.bin
- `processing / queue / transmission / propagation / end_to_end` 都是“当前实现下的本地观测值或估算值”,不是严格意义上的物理链路精确测量值。
- `processing_*` 当前表示本地应用层处理耗时,主要来自文件分片封装、接收端写盘等路径;它不是“某一块硬件 CPU 的完整开销画像”。
- `queue_*``transmission_*` 是基于当前吞吐和缓冲/队列状态反推出来的估算值。进程空转时间很长、样本小或者当前速率低时,这两个值可能明显偏大
- `queue_*``transmission_*` 是基于最近活跃窗口的吞吐和缓冲/队列状态反推出来的估算值。最近没有足够流量样本、样本小或者当前速率低时,这两个值可能直接为 `0`
- `propagation_*` 当前来自 `min_rtt_ms / 2` 的估算;如果当前协议没有 RTT 样本,这组字段就是 `0`
- `end_to_end_*` 当前只在“最终接收文件的 peer”上有值来源是发送端分片里的 `origin_ts_ms`。发送端、Hub、Bridge 一般是 `0`
- `end_to_end_*` 当前只在“最终接收文件的 peer”上有值来源是发送端分片里的 `origin_ts_ms`。发送文件前会先做一次时钟同步,把发送端时间对齐到接收端时钟域;如果同步没建立,这组字段会保持 `0`,而不是给出误导性的跨机结果
- UDP 丢包统计只在 `UDP 文件接收侧 peer` 上有值UDP 发送侧、Hub、Bridge 不会产出这组汇总。
- `udp_retrans` 字段当前还没有实现应用层 UDP 重传统计,所以现在始终是 `0`
- `TCP/KCP` 当前记录的是重传次数/重传字节/累计发送分片,不直接记录“重传频率”这个单独字段。
@@ -265,10 +265,10 @@ put /path/to/file.bin
| 协议层发送耗时 | `proto_send_avg_ms` | ms | TCP/UDP/KCP 实际 send 路径耗时 EWMA | 发送很快且小于 `1ms` 时常为 `0` |
| 协议层接收耗时 | `proto_recv_avg_ms` | ms | TCP/UDP/KCP 实际 recv 路径耗时 EWMA | 接收很快且小于 `1ms` 时常为 `0` |
| 本地处理耗时 | `processing_avg_ms` / `processing_min_ms` / `processing_max_ms` | ms | 当前进程内的分片封装、写盘等本地处理耗时 | 仅在文件发送/接收路径上采样;操作太快时可能为 `0` |
| 排队延迟估算 | `queue_avg_ms` / `queue_min_ms` / `queue_max_ms` | ms | 根据发送/接收队列字节数和当前速率估算 | 没有队列样本时为 `0`长空转后可能偏 |
| 传输延迟估算 | `transmission_avg_ms` / `transmission_min_ms` / `transmission_max_ms` | ms | 根据当前速率估算“这些字节推上链路需要多久” | 没有流量样本时为 `0`;小样本/低速率时可能偏 |
| 排队延迟估算 | `queue_avg_ms` / `queue_min_ms` / `queue_max_ms` | ms | 根据发送/接收队列字节数和最近活跃窗口速率估算 | 没有足够活跃样本时为 `0`小样本/低速率时可能偏保守 |
| 传输延迟估算 | `transmission_avg_ms` / `transmission_min_ms` / `transmission_max_ms` | ms | 根据最近活跃窗口速率估算“这些字节推上链路需要多久” | 没有足够活跃样本时为 `0`;小样本/低速率时可能偏保守 |
| 传播延迟估算 | `propagation_avg_ms` / `propagation_min_ms` / `propagation_max_ms` | ms | 基于 `min_rtt_ms / 2` 的估算 | 当前协议没有 RTT 样本时为 `0` |
| 端到端延迟 | `end_to_end_avg_ms` / `end_to_end_min_ms` / `end_to_end_max_ms` | ms | 发送端分片 `origin_ts_ms` 到接收端处理时刻的差值 | 只在最终接收文件的 `peer` 上有值;发送端 / Hub / Bridge 多数`0` |
| 端到端延迟 | `end_to_end_avg_ms` / `end_to_end_min_ms` / `end_to_end_max_ms` | ms | 发送端分片 `origin_ts_ms` 对齐到接收端时钟后,到接收端处理完成时刻的差值 | 只在最终接收文件的 `peer` 上有值;若时钟同步未建立则`0` |
### 可靠性字段
@@ -321,5 +321,6 @@ put /path/to/file.bin
| 还没实现 | `udp_retrans` | 现在没有应用层 UDP 重传,所以这个字段只是预留 |
| 当前角色不产出 | `udp_expected_chunks``udp_loss_*``end_to_end_*` | UDP 丢包统计只在最终接收文件的 UDP peer 上有值;`end_to_end_*` 也主要只在最终接收端有值 |
| 没有采样到 RTT | `last_rtt_ms``propagation_*` | UDP 当前没有 RTT 探针TCP/KCP 只有在拿到对应协议样本后才有值 |
| 没有建立时钟同步 | `end_to_end_*` | 发送端尚未和接收端完成 `TIME_SYNC_*` 探测,或同步结果太旧/未到达 |
| 时间分辨率太粗 | `send_call_*``proto_send_avg_ms``processing_*` | 当前很多路径按毫秒计时,本地回环下大量操作小于 `1ms`,所以会显示 `0` |
| 当前没有任务 | `progress_*` | 只注册但没有文件传输时,这组字段自然是 `0` |

View File

@@ -31,6 +31,12 @@ typedef struct OmniStats {
uint64_t bytes_recv; /* 接收总字节数 */
uint64_t window_bytes_sent; /* 当前 1 秒窗口发送字节数 */
uint64_t window_bytes_recv; /* 当前 1 秒窗口接收字节数 */
uint64_t delay_window_start_send_ms; /* 延时估算使用的最近发送活跃窗口起点 */
uint64_t delay_window_start_recv_ms; /* 延时估算使用的最近接收活跃窗口起点 */
uint64_t delay_window_bytes_sent; /* 延时估算使用的最近发送活跃窗口字节数 */
uint64_t delay_window_bytes_recv; /* 延时估算使用的最近接收活跃窗口字节数 */
uint64_t last_send_activity_ms; /* 最近一次发送活跃时间 */
uint64_t last_recv_activity_ms; /* 最近一次接收活跃时间 */
uint64_t send_count; /* 调用 omni_send 次数 */
uint64_t recv_count; /* 调用 omni_recv 次数 */

View File

@@ -30,6 +30,9 @@
#define PEER_MAX_PAYLOAD (PEER_TUNNEL_META_SIZE + 65536u)
#define PEER_MAX_CHUNK_SIZE (65536u - TRANSFER_CHUNK_META_SIZE)
#define PEER_OUTPUT_PATH_SIZE 512u
#define PEER_TIME_SYNC_SAMPLES 4u
#define PEER_TIME_SYNC_PROBE_TIMEOUT_MS 400u
#define PEER_TIME_SYNC_MAX_AGE_MS 60000u
typedef enum PeerMode {
PEER_MODE_HUB = 0,
@@ -49,6 +52,25 @@ typedef struct PeerRecvFileState {
size_t recv_window_cap;
} PeerRecvFileState;
typedef struct PeerTimeSyncPeer {
struct PeerTimeSyncPeer *next;
char peer_id[OMNI_PEER_ID_SIZE];
int has_offset;
int64_t peer_minus_local_offset_ms;
uint64_t best_rtt_ms;
uint32_t sample_count;
uint64_t last_sync_local_ms;
int sync_in_progress;
int pending_response;
uint32_t next_probe_id;
uint32_t pending_probe_id;
uint64_t pending_send_ts_ms;
int64_t sync_best_offset_ms;
uint64_t sync_best_rtt_ms;
uint32_t sync_success_count;
} PeerTimeSyncPeer;
typedef struct PeerRuntime {
PeerTransport *transport;
PeerTransportSession *session;
@@ -61,6 +83,7 @@ typedef struct PeerRuntime {
char bound_peer[OMNI_PEER_ID_SIZE];
char output_path[PEER_OUTPUT_PATH_SIZE];
PeerRecvFileState rx_file;
PeerTimeSyncPeer *time_sync_peers;
} PeerRuntime;
typedef struct StartupActions {
@@ -76,6 +99,10 @@ typedef struct StartupActions {
static volatile sig_atomic_t g_stop = 0;
static void handle_transport_event(PeerRuntime *rt,
const PeerTransportEvent *event,
const uint8_t *payload);
static void on_signal(int signo)
{
(void)signo;
@@ -179,6 +206,94 @@ static void peer_set_bound_peer(PeerRuntime *rt, const char *peer_id)
omni_copy_fixed_ascii(rt->bound_peer, sizeof(rt->bound_peer), peer_id);
}
static PeerTimeSyncPeer *peer_time_sync_find(PeerRuntime *rt,
const char *peer_id,
int create)
{
PeerTimeSyncPeer *peer;
if (!rt || !peer_id_is_valid(peer_id)) {
return NULL;
}
for (peer = rt->time_sync_peers; peer; peer = peer->next) {
if (strcmp(peer->peer_id, peer_id) == 0) {
return peer;
}
}
if (!create) {
return NULL;
}
peer = (PeerTimeSyncPeer *)calloc(1, sizeof(*peer));
if (!peer) {
return NULL;
}
omni_copy_fixed_ascii(peer->peer_id, sizeof(peer->peer_id), peer_id);
peer->next_probe_id = 1u;
peer->next = rt->time_sync_peers;
rt->time_sync_peers = peer;
return peer;
}
static void peer_time_sync_clear_transient(PeerTimeSyncPeer *peer)
{
if (!peer) {
return;
}
peer->sync_in_progress = 0;
peer->pending_response = 0;
peer->pending_probe_id = 0;
peer->pending_send_ts_ms = 0;
peer->sync_best_offset_ms = 0;
peer->sync_best_rtt_ms = 0;
peer->sync_success_count = 0;
}
static void peer_time_sync_free_all(PeerRuntime *rt)
{
PeerTimeSyncPeer *peer;
PeerTimeSyncPeer *next;
if (!rt) {
return;
}
for (peer = rt->time_sync_peers; peer; peer = next) {
next = peer->next;
free(peer);
}
rt->time_sync_peers = NULL;
}
static int peer_time_sync_offset_is_fresh(const PeerTimeSyncPeer *peer, uint64_t now_ms)
{
if (!peer || !peer->has_offset || peer->last_sync_local_ms == 0) {
return 0;
}
if (now_ms < peer->last_sync_local_ms) {
return 0;
}
return now_ms - peer->last_sync_local_ms <= PEER_TIME_SYNC_MAX_AGE_MS;
}
static int peer_time_sync_local_to_peer_ts(const PeerTimeSyncPeer *peer,
uint64_t local_ts_ms,
uint64_t *out_peer_ts_ms)
{
int64_t adjusted;
if (!peer || !peer->has_offset || !out_peer_ts_ms || local_ts_ms == 0) {
return 0;
}
adjusted = (int64_t)local_ts_ms + peer->peer_minus_local_offset_ms;
if (adjusted <= 0) {
return 0;
}
*out_peer_ts_ms = (uint64_t)adjusted;
return 1;
}
static void close_recv_file(PeerRuntime *rt)
{
if (!rt) {
@@ -609,11 +724,338 @@ static int peer_send_transfer_ack(PeerRuntime *rt,
TRANSFER_ACK_META_SIZE);
}
static int peer_send_time_sync_probe(PeerRuntime *rt,
const char *dst_id,
PeerTimeSyncPeer *peer)
{
TimeSyncProbeMeta probe_meta;
uint64_t now_ms;
uint32_t probe_id;
if (!rt || !peer || !peer_id_is_valid(dst_id)) {
return OMNI_ERR_PARAM;
}
now_ms = omni_now_ms();
probe_id = peer->next_probe_id++;
if (probe_id == 0u) {
probe_id = peer->next_probe_id++;
}
peer->pending_response = 1;
peer->pending_probe_id = probe_id;
peer->pending_send_ts_ms = now_ms;
omni_time_sync_probe_meta_encode(&probe_meta, probe_id, now_ms);
logger_log("DEBUG", "peer",
"time_sync_probe_send peer_id=%s probe_id=%u",
dst_id,
(unsigned)probe_id);
return peer_send_inner(rt,
dst_id,
MSG_TYPE_TIME_SYNC_REQ,
&probe_meta,
TIME_SYNC_PROBE_META_SIZE);
}
static int peer_send_time_sync_report(PeerRuntime *rt,
const char *dst_id,
int64_t peer_minus_local_offset_ms,
uint64_t best_rtt_ms,
uint32_t sample_count)
{
TimeSyncReportMeta report_meta;
if (!rt || !peer_id_is_valid(dst_id)) {
return OMNI_ERR_PARAM;
}
omni_time_sync_report_meta_encode(&report_meta,
peer_minus_local_offset_ms,
best_rtt_ms,
sample_count);
logger_log("DEBUG", "peer",
"time_sync_report_send peer_id=%s offset_ms=%lld best_rtt_ms=%llu sample_count=%u",
dst_id,
(long long)peer_minus_local_offset_ms,
(unsigned long long)best_rtt_ms,
(unsigned)sample_count);
return peer_send_inner(rt,
dst_id,
MSG_TYPE_TIME_SYNC_REPORT,
&report_meta,
TIME_SYNC_REPORT_META_SIZE);
}
static int peer_wait_for_time_sync_probe(PeerRuntime *rt,
PeerTimeSyncPeer *peer,
uint64_t deadline_ms)
{
uint8_t payload[PEER_MAX_PAYLOAD];
if (!rt || !peer) {
return OMNI_ERR_PARAM;
}
while (rt->running && !g_stop && peer->pending_response) {
PeerTransportEvent event;
uint64_t now_ms = omni_now_ms();
int rc;
int timeout_ms;
if (now_ms >= deadline_ms) {
return OMNI_ERR_TIMEOUT;
}
timeout_ms = (int)(deadline_ms - now_ms);
if (timeout_ms > 50) {
timeout_ms = 50;
}
rc = peer_transport_next_event(rt->transport,
&event,
payload,
sizeof(payload),
timeout_ms);
if (rc < 0) {
return rc;
}
if (rc > 0) {
handle_transport_event(rt, &event, payload);
}
}
return peer->pending_response ? OMNI_ERR_TIMEOUT : OMNI_OK;
}
static int peer_ensure_time_sync(PeerRuntime *rt, const char *dst_id)
{
PeerTimeSyncPeer *peer;
uint64_t now_ms;
int had_previous_offset = 0;
int64_t previous_offset_ms = 0;
uint64_t previous_best_rtt_ms = 0;
uint32_t previous_sample_count = 0;
if (!rt || !peer_id_is_valid(dst_id)) {
return OMNI_ERR_PARAM;
}
now_ms = omni_now_ms();
peer = peer_time_sync_find(rt, dst_id, 1);
if (!peer) {
return OMNI_ERR_GENERIC;
}
if (peer_time_sync_offset_is_fresh(peer, now_ms)) {
return OMNI_OK;
}
had_previous_offset = peer->has_offset;
previous_offset_ms = peer->peer_minus_local_offset_ms;
previous_best_rtt_ms = peer->best_rtt_ms;
previous_sample_count = peer->sample_count;
peer_time_sync_clear_transient(peer);
peer->sync_in_progress = 1;
peer->sync_best_rtt_ms = UINT64_MAX;
for (uint32_t i = 0; i < PEER_TIME_SYNC_SAMPLES && rt->running && !g_stop; ++i) {
int rc;
rc = peer_send_time_sync_probe(rt, dst_id, peer);
if (rc != OMNI_OK) {
peer_time_sync_clear_transient(peer);
return rc;
}
rc = peer_wait_for_time_sync_probe(rt,
peer,
omni_now_ms() + PEER_TIME_SYNC_PROBE_TIMEOUT_MS);
if (rc != OMNI_OK) {
peer->pending_response = 0;
logger_log("WARN", "peer",
"time_sync_probe_timeout peer_id=%s sample_index=%u",
dst_id,
(unsigned)i);
}
}
peer->sync_in_progress = 0;
peer->pending_response = 0;
peer->pending_probe_id = 0;
peer->pending_send_ts_ms = 0;
if (peer->sync_success_count > 0 && peer->sync_best_rtt_ms != UINT64_MAX) {
peer->has_offset = 1;
peer->peer_minus_local_offset_ms = peer->sync_best_offset_ms;
peer->best_rtt_ms = peer->sync_best_rtt_ms;
peer->sample_count = peer->sync_success_count;
peer->last_sync_local_ms = omni_now_ms();
(void)peer_send_time_sync_report(rt,
dst_id,
peer->peer_minus_local_offset_ms,
peer->best_rtt_ms,
peer->sample_count);
logger_log("INFO", "peer",
"time_sync_ready peer_id=%s offset_ms=%lld best_rtt_ms=%llu sample_count=%u",
dst_id,
(long long)peer->peer_minus_local_offset_ms,
(unsigned long long)peer->best_rtt_ms,
(unsigned)peer->sample_count);
peer->sync_best_offset_ms = 0;
peer->sync_best_rtt_ms = 0;
peer->sync_success_count = 0;
return OMNI_OK;
}
if (had_previous_offset) {
peer->has_offset = 1;
peer->peer_minus_local_offset_ms = previous_offset_ms;
peer->best_rtt_ms = previous_best_rtt_ms;
peer->sample_count = previous_sample_count;
logger_log("WARN", "peer",
"time_sync_refresh_failed_use_cached peer_id=%s offset_ms=%lld best_rtt_ms=%llu",
dst_id,
(long long)peer->peer_minus_local_offset_ms,
(unsigned long long)peer->best_rtt_ms);
peer->sync_best_offset_ms = 0;
peer->sync_best_rtt_ms = 0;
peer->sync_success_count = 0;
return OMNI_OK;
}
peer_time_sync_clear_transient(peer);
logger_log("WARN", "peer", "time_sync_unavailable peer_id=%s", dst_id);
return OMNI_ERR_TIMEOUT;
}
static void handle_time_sync_request(PeerRuntime *rt,
const PeerTunnelMeta *tunnel_meta,
const uint8_t *payload,
uint32_t payload_len)
{
TimeSyncProbeMeta probe_meta;
TimeSyncReplyMeta reply_meta;
uint64_t recv_ts_ms;
uint64_t send_ts_ms;
if (payload_len < TIME_SYNC_PROBE_META_SIZE) {
logger_log("WARN", "peer", "short_time_sync_req len=%u", (unsigned)payload_len);
return;
}
if (!peer_id_is_valid(tunnel_meta->src_id)) {
logger_log("WARN", "peer", "time_sync_req_missing_src");
return;
}
recv_ts_ms = omni_now_ms();
omni_time_sync_probe_meta_decode((const TimeSyncProbeMeta *)payload, &probe_meta);
send_ts_ms = omni_now_ms();
omni_time_sync_reply_meta_encode(&reply_meta,
probe_meta.probe_id,
probe_meta.client_send_ts_ms,
recv_ts_ms,
send_ts_ms);
(void)peer_send_inner(rt,
tunnel_meta->src_id,
MSG_TYPE_TIME_SYNC_RESP,
&reply_meta,
TIME_SYNC_REPLY_META_SIZE);
}
static void handle_time_sync_response(PeerRuntime *rt,
const PeerTunnelMeta *tunnel_meta,
const uint8_t *payload,
uint32_t payload_len)
{
PeerTimeSyncPeer *peer;
TimeSyncReplyMeta reply_meta;
uint64_t recv_ts_ms;
uint64_t rtt_ms;
int64_t offset_ms;
if (!rt || payload_len < TIME_SYNC_REPLY_META_SIZE) {
logger_log("WARN", "peer", "short_time_sync_resp len=%u", (unsigned)payload_len);
return;
}
if (!peer_id_is_valid(tunnel_meta->src_id)) {
logger_log("WARN", "peer", "time_sync_resp_missing_src");
return;
}
peer = peer_time_sync_find(rt, tunnel_meta->src_id, 1);
if (!peer || !peer->sync_in_progress || !peer->pending_response) {
return;
}
omni_time_sync_reply_meta_decode((const TimeSyncReplyMeta *)payload, &reply_meta);
if (reply_meta.probe_id != peer->pending_probe_id ||
reply_meta.client_send_ts_ms != peer->pending_send_ts_ms) {
return;
}
recv_ts_ms = omni_now_ms();
if (recv_ts_ms < peer->pending_send_ts_ms) {
peer->pending_response = 0;
return;
}
rtt_ms = recv_ts_ms - peer->pending_send_ts_ms;
offset_ms = (((int64_t)reply_meta.server_recv_ts_ms - (int64_t)peer->pending_send_ts_ms) +
((int64_t)reply_meta.server_send_ts_ms - (int64_t)recv_ts_ms)) / 2;
if (peer->sync_success_count == 0 || rtt_ms < peer->sync_best_rtt_ms) {
peer->sync_best_rtt_ms = rtt_ms;
peer->sync_best_offset_ms = offset_ms;
}
peer->sync_success_count++;
peer->pending_response = 0;
logger_log("DEBUG", "peer",
"time_sync_sample peer_id=%s probe_id=%u rtt_ms=%llu offset_ms=%lld",
tunnel_meta->src_id,
(unsigned)reply_meta.probe_id,
(unsigned long long)rtt_ms,
(long long)offset_ms);
}
static void handle_time_sync_report(PeerRuntime *rt,
const PeerTunnelMeta *tunnel_meta,
const uint8_t *payload,
uint32_t payload_len)
{
PeerTimeSyncPeer *peer;
TimeSyncReportMeta report_meta;
if (!rt || payload_len < TIME_SYNC_REPORT_META_SIZE) {
logger_log("WARN", "peer", "short_time_sync_report len=%u", (unsigned)payload_len);
return;
}
if (!peer_id_is_valid(tunnel_meta->src_id)) {
logger_log("WARN", "peer", "time_sync_report_missing_src");
return;
}
peer = peer_time_sync_find(rt, tunnel_meta->src_id, 1);
if (!peer) {
return;
}
omni_time_sync_report_meta_decode((const TimeSyncReportMeta *)payload, &report_meta);
peer->has_offset = 1;
peer->peer_minus_local_offset_ms = -report_meta.server_minus_client_offset_ms;
peer->best_rtt_ms = report_meta.best_rtt_ms;
peer->sample_count = report_meta.sample_count;
peer->last_sync_local_ms = omni_now_ms();
logger_log("INFO", "peer",
"time_sync_report_apply peer_id=%s offset_ms=%lld best_rtt_ms=%llu sample_count=%u",
tunnel_meta->src_id,
(long long)peer->peer_minus_local_offset_ms,
(unsigned long long)peer->best_rtt_ms,
(unsigned)peer->sample_count);
}
static int peer_send_file(PeerRuntime *rt,
const char *dst_id,
const char *file_path,
unsigned chunk_size)
{
PeerTimeSyncPeer *sync_peer = NULL;
char effective_dst[OMNI_PEER_ID_SIZE];
FILE *fp = NULL;
uint8_t *chunk = NULL;
@@ -638,6 +1080,13 @@ static int peer_send_file(PeerRuntime *rt,
if (peer_resolve_dst(rt, dst_id, effective_dst) != OMNI_OK) {
return OMNI_ERR_PARAM;
}
if (peer_ensure_time_sync(rt, effective_dst) == OMNI_OK) {
sync_peer = peer_time_sync_find(rt, effective_dst, 0);
} else {
logger_log("WARN", "peer",
"file_send_without_time_sync dst_id=%s end_to_end_will_be_zero",
effective_dst);
}
fp = fopen(file_path, "rb");
if (!fp) {
@@ -676,8 +1125,15 @@ static int peer_send_file(PeerRuntime *rt,
if (nread > 0) {
TransferChunkMeta meta;
uint64_t chunk_origin_ts_ms = omni_now_ms();
uint32_t window_id = (uint32_t)((chunk_origin_ts_ms - transfer_start_ms) / 1000u);
uint64_t chunk_origin_local_ts_ms = omni_now_ms();
uint64_t chunk_origin_peer_ts_ms = 0;
uint32_t window_id = (uint32_t)((chunk_origin_local_ts_ms - transfer_start_ms) / 1000u);
if (sync_peer) {
(void)peer_time_sync_local_to_peer_ts(sync_peer,
chunk_origin_local_ts_ms,
&chunk_origin_peer_ts_ms);
}
omni_transfer_chunk_meta_encode(&meta,
transfer_id,
@@ -687,7 +1143,7 @@ static int peer_send_file(PeerRuntime *rt,
total_bytes,
offset,
(uint32_t)nread,
chunk_origin_ts_ms);
chunk_origin_peer_ts_ms);
memcpy(payload, &meta, TRANSFER_CHUNK_META_SIZE);
memcpy(payload + TRANSFER_CHUNK_META_SIZE, chunk, nread);
if (window_id > max_window_id) {
@@ -1027,6 +1483,18 @@ static void handle_tunnel_message(PeerRuntime *rt, const uint8_t *payload, uint3
handle_transfer_ack_message(&tunnel_meta, inner_payload, inner_len);
return;
}
if (tunnel_meta.inner_type == MSG_TYPE_TIME_SYNC_REQ) {
handle_time_sync_request(rt, &tunnel_meta, inner_payload, inner_len);
return;
}
if (tunnel_meta.inner_type == MSG_TYPE_TIME_SYNC_RESP) {
handle_time_sync_response(rt, &tunnel_meta, inner_payload, inner_len);
return;
}
if (tunnel_meta.inner_type == MSG_TYPE_TIME_SYNC_REPORT) {
handle_time_sync_report(rt, &tunnel_meta, inner_payload, inner_len);
return;
}
fprintf(stdout,
"[peer %s -> %s] unsupported inner_type=%u payload_len=%u\n",
@@ -1221,6 +1689,7 @@ static void handle_transport_event(PeerRuntime *rt,
rt->session = NULL;
rt->direct_register_sent = 0;
peer_set_bound_peer(rt, "");
peer_time_sync_free_all(rt);
peer_finalize_recv_observability(rt, 0);
close_recv_file(rt);
rt->rx_file.transfer_id = 0;
@@ -1490,6 +1959,7 @@ int main(int argc, char **argv)
peer_finalize_recv_observability(&rt, 0);
close_recv_file(&rt);
peer_time_sync_free_all(&rt);
peer_transport_close(rt.transport);
logger_print_performance_log("final");
return 0;

View File

@@ -29,6 +29,9 @@ static char g_ctx_mode[16];
static char g_ctx_role[16];
static char g_ctx_self_id[OMNI_PEER_ID_SIZE];
#define OMNI_DELAY_WINDOW_MS 1000u
#define OMNI_DELAY_IDLE_RESET_MS 1000u
static void metric_reset(OmniMetricSummary *metric);
static void reset_transfer_observability_locked(void)
@@ -130,28 +133,66 @@ static double metric_min(const OmniMetricSummary *metric)
return metric->min;
}
static double derived_propagation_ms(uint64_t min_rtt_ms)
{
if (min_rtt_ms == 0 || min_rtt_ms == UINT64_MAX) {
return 0.0;
}
return (double)min_rtt_ms / 2.0;
}
static void delay_window_note_locked(uint64_t now, size_t bytes, int is_send)
{
uint64_t *window_start_ms = is_send ? &g_stats.delay_window_start_send_ms
: &g_stats.delay_window_start_recv_ms;
uint64_t *window_bytes = is_send ? &g_stats.delay_window_bytes_sent
: &g_stats.delay_window_bytes_recv;
uint64_t *last_activity_ms = is_send ? &g_stats.last_send_activity_ms
: &g_stats.last_recv_activity_ms;
if (*window_start_ms == 0 ||
(*last_activity_ms > 0 && now - *last_activity_ms > OMNI_DELAY_IDLE_RESET_MS) ||
now - *window_start_ms > OMNI_DELAY_WINDOW_MS) {
*window_start_ms = now;
*window_bytes = 0;
}
*window_bytes += (uint64_t)bytes;
*last_activity_ms = now;
}
/*
* 基于当前窗口内流量和累计流量估算“此刻本机可提供的服务速率”。
* 优先使用当前窗口速率;若窗口内尚无样本,则退化为全局平均速率
* 基于最近活跃窗口估算“当前本机可提供的服务速率”。
* 最近一段时间没有流量时直接放弃估算,避免把长时间空转折算进时延
*/
static double live_rate_mbps_locked(uint64_t now, int is_send)
{
uint64_t window_elapsed_ms = now - g_stats.window_start_ms;
uint64_t total_elapsed_ms = now - g_stats.start_ms;
uint64_t window_bytes = is_send ? g_stats.window_bytes_sent : g_stats.window_bytes_recv;
uint64_t total_bytes = is_send ? g_stats.bytes_sent : g_stats.bytes_recv;
uint64_t window_start_ms = is_send ? g_stats.delay_window_start_send_ms
: g_stats.delay_window_start_recv_ms;
uint64_t window_bytes = is_send ? g_stats.delay_window_bytes_sent
: g_stats.delay_window_bytes_recv;
uint64_t last_activity_ms = is_send ? g_stats.last_send_activity_ms
: g_stats.last_recv_activity_ms;
uint64_t window_elapsed_ms;
if (window_start_ms == 0 || window_bytes == 0 || last_activity_ms == 0) {
return 0.0;
}
if (now < window_start_ms || now < last_activity_ms) {
return 0.0;
}
if (now - last_activity_ms > OMNI_DELAY_IDLE_RESET_MS) {
return 0.0;
}
window_elapsed_ms = now - window_start_ms;
if (window_elapsed_ms == 0) {
return 0.0;
}
if (window_elapsed_ms > 0 && window_bytes > 0) {
return ((double)window_bytes * 8.0) /
((double)window_elapsed_ms / 1000.0) /
1000000.0;
}
if (total_elapsed_ms > 0 && total_bytes > 0) {
return ((double)total_bytes * 8.0) /
((double)total_elapsed_ms / 1000.0) /
1000000.0;
}
return 0.0;
}
/* 用本机当前服务速率把字节数换算成时延估计。 */
@@ -311,9 +352,12 @@ void logger_set_context(const char *app,
/* 记录一次发送事件,更新累计发送字节和窗口内发送字节。 */
void logger_on_send(size_t bytes)
{
uint64_t now = now_ms();
pthread_mutex_lock(&g_mu);
g_stats.bytes_sent += bytes;
g_stats.window_bytes_sent += bytes;
delay_window_note_locked(now, bytes, 1);
g_stats.send_count++;
pthread_mutex_unlock(&g_mu);
}
@@ -321,14 +365,17 @@ void logger_on_send(size_t bytes)
/* 记录一次接收事件,更新累计接收字节和窗口内接收字节。 */
void logger_on_recv(size_t bytes)
{
uint64_t now = now_ms();
pthread_mutex_lock(&g_mu);
g_stats.bytes_recv += bytes;
g_stats.window_bytes_recv += bytes;
delay_window_note_locked(now, bytes, 0);
g_stats.recv_count++;
pthread_mutex_unlock(&g_mu);
}
/* 更新最近一次 RTT 和历史最大 RTT。 */
/* 更新最近一次 RTT 以及进程生命周期内的最小/最大 RTT。 */
void logger_on_rtt(uint64_t rtt_ms)
{
if (rtt_ms == 0) {
@@ -343,11 +390,6 @@ void logger_on_rtt(uint64_t rtt_ms)
if (rtt_ms > g_stats.max_rtt_ms) {
g_stats.max_rtt_ms = rtt_ms;
}
/*
* 传播时延无法直接观测,这里统一采用 min RTT / 2 作为链路基线估算。
* min RTT 更接近“基本无排队”时的往返时间,因此比 last RTT 更稳。
*/
metric_observe(&g_stats.propagation_delay_ms, (double)g_stats.min_rtt_ms / 2.0);
pthread_mutex_unlock(&g_mu);
}
@@ -658,6 +700,9 @@ void logger_print_performance_log(const char *tag)
uint64_t now;
OmniStats snapshot;
double progress_pct;
double propagation_avg_ms;
double propagation_min_ms;
double propagation_max_ms;
char ctx_app[sizeof(g_ctx_app)];
char ctx_proto[sizeof(g_ctx_proto)];
char ctx_mode[sizeof(g_ctx_mode)];
@@ -700,6 +745,15 @@ void logger_print_performance_log(const char *tag)
progress_pct = ((double)snapshot.progress_bytes * 100.0) /
(double)snapshot.total_work_bytes;
}
if (snapshot.propagation_delay_ms.count > 0) {
propagation_avg_ms = metric_avg(&snapshot.propagation_delay_ms);
propagation_min_ms = metric_min(&snapshot.propagation_delay_ms);
propagation_max_ms = snapshot.propagation_delay_ms.max;
} else {
propagation_avg_ms = derived_propagation_ms(snapshot.min_rtt_ms);
propagation_min_ms = propagation_avg_ms;
propagation_max_ms = propagation_avg_ms;
}
{
/* 人读友好的单行文本日志,适合终端直接观察。 */
@@ -751,7 +805,7 @@ void logger_print_performance_log(const char *tag)
metric_avg(&snapshot.processing_delay_ms),
metric_avg(&snapshot.queue_delay_ms),
metric_avg(&snapshot.transmission_delay_ms),
metric_avg(&snapshot.propagation_delay_ms),
propagation_avg_ms,
metric_avg(&snapshot.end_to_end_delay_ms),
snapshot.send_buffer_pct.last,
snapshot.recv_buffer_pct.last,
@@ -916,9 +970,9 @@ void logger_print_performance_log(const char *tag)
metric_avg(&snapshot.transmission_delay_ms),
metric_min(&snapshot.transmission_delay_ms),
snapshot.transmission_delay_ms.max,
metric_avg(&snapshot.propagation_delay_ms),
metric_min(&snapshot.propagation_delay_ms),
snapshot.propagation_delay_ms.max,
propagation_avg_ms,
propagation_min_ms,
propagation_max_ms,
metric_avg(&snapshot.end_to_end_delay_ms),
metric_min(&snapshot.end_to_end_delay_ms),
snapshot.end_to_end_delay_ms.max,

View File

@@ -197,7 +197,7 @@ static void sample_tcp_info(int fd)
bytes_retrans = (uint64_t)ti.tcpi_bytes_retrans;
}
logger_on_rtt((uint64_t)(ti.tcpi_rtt / 1000u));
logger_on_rtt((ti.tcpi_rtt == 0u) ? 0u : (((uint64_t)ti.tcpi_rtt + 999u) / 1000u));
logger_on_tcp_transport(total_retrans, data_segs_out, bytes_sent, bytes_retrans);
logger_on_cwnd((double)ti.tcpi_snd_cwnd);
sample_socket_buffers(fd);

View File

@@ -100,6 +100,14 @@ static int tcp_info_has_field(socklen_t len, size_t field_end)
{
return (size_t)len >= field_end;
}
static uint64_t tcp_info_rtt_us_to_ms(uint32_t rtt_us)
{
if (rtt_us == 0u) {
return 0u;
}
return ((uint64_t)rtt_us + 999u) / 1000u;
}
#endif
struct TcpContext {
@@ -168,9 +176,9 @@ static void tcp_log_info(int fd, const char *tag)
return;
}
/* 注意tcpi_rtt 单位通常为微秒Linux这里转 ms 仅用于日志观察 */
unsigned long long rtt_ms = (unsigned long long)(ti.tcpi_rtt / 1000u);
unsigned long long rttvar_ms = (unsigned long long)(ti.tcpi_rttvar / 1000u);
/* 注意tcpi_rtt / tcpi_rttvar 单位通常为微秒Linux这里向上取整到 ms。 */
unsigned long long rtt_ms = (unsigned long long)tcp_info_rtt_us_to_ms(ti.tcpi_rtt);
unsigned long long rttvar_ms = (unsigned long long)tcp_info_rtt_us_to_ms(ti.tcpi_rttvar);
if (tcp_info_has_field(len, offsetof(struct OmniLinuxTcpInfo, tcpi_total_retrans) +
sizeof(ti.tcpi_total_retrans))) {