This commit is contained in:
meiqi
2026-03-13 19:58:29 +08:00
parent 03e8c7beaa
commit c819bddbbb
13 changed files with 443 additions and 7 deletions

View File

@@ -22,6 +22,7 @@ struct KcpContext {
struct sockaddr_in peer_addr;
socklen_t peer_len;
ikcpcb *kcp;
uint32_t last_xmit; /* 用于推算重传/发送次数变化 */
};
static int kcp_output(const char *buf, int len, ikcpcb *kcp, void *user)
@@ -104,6 +105,7 @@ static OmniContext *kcp_init(OmniRole role,
static void kcp_update_loop(struct KcpContext *ctx)
{
uint64_t t0 = omni_now_ms();
IUINT32 current = (IUINT32)omni_now_ms();
ikcp_update(ctx->kcp, current);
@@ -117,6 +119,32 @@ static void kcp_update_loop(struct KcpContext *ctx)
ctx->peer_len = fromlen;
ikcp_input(ctx->kcp, buf, (long)n);
}
/* KCP 内部状态监控 */
uint32_t xmit = ctx->kcp->xmit;
if (xmit >= ctx->last_xmit) {
logger_on_kcp_retrans((uint64_t)(xmit - ctx->last_xmit));
}
ctx->last_xmit = xmit;
uint64_t t1 = omni_now_ms();
logger_log("DEBUG", "kcp",
"update ms=%llu cwnd=%u ssthresh=%u rmt_wnd=%u snd_wnd=%u rcv_wnd=%u "
"rx_srtt=%u rx_rto=%u nsnd_buf=%u nsnd_que=%u nrcv_buf=%u nrcv_que=%u xmit=%u state=%u",
(unsigned long long)(t1 - t0),
(unsigned)ctx->kcp->cwnd,
(unsigned)ctx->kcp->ssthresh,
(unsigned)ctx->kcp->rmt_wnd,
(unsigned)ctx->kcp->snd_wnd,
(unsigned)ctx->kcp->rcv_wnd,
(unsigned)ctx->kcp->rx_srtt,
(unsigned)ctx->kcp->rx_rto,
(unsigned)ctx->kcp->nsnd_buf,
(unsigned)ctx->kcp->nsnd_que,
(unsigned)ctx->kcp->nrcv_buf,
(unsigned)ctx->kcp->nrcv_que,
(unsigned)ctx->kcp->xmit,
(unsigned)ctx->kcp->state);
}
static ssize_t kcp_send(OmniContext *c, const void *buf, size_t len)
@@ -124,6 +152,7 @@ static ssize_t kcp_send(OmniContext *c, const void *buf, size_t len)
struct KcpContext *ctx = (struct KcpContext *)c;
if (!ctx || !ctx->kcp) return OMNI_ERR_PARAM;
uint64_t t0 = omni_now_ms();
int rc = ikcp_send(ctx->kcp, (const char *)buf, (int)len);
if (rc < 0) {
logger_log("ERROR", "kcp", "ikcp_send_failed rc=%d", rc);
@@ -132,6 +161,10 @@ static ssize_t kcp_send(OmniContext *c, const void *buf, size_t len)
/* 驱动一次 flush */
kcp_update_loop(ctx);
uint64_t t1 = omni_now_ms();
logger_on_proto_send_latency(t1 - t0);
logger_log("DEBUG", "kcp", "send payload_bytes=%zu proto_ms=%llu waitsnd=%d",
len, (unsigned long long)(t1 - t0), ikcp_waitsnd(ctx->kcp));
return (ssize_t)len;
}
@@ -140,12 +173,17 @@ static ssize_t kcp_recv(OmniContext *c, void *buf, size_t len)
struct KcpContext *ctx = (struct KcpContext *)c;
if (!ctx || !ctx->kcp) return OMNI_ERR_PARAM;
uint64_t t0 = omni_now_ms();
kcp_update_loop(ctx);
int n = ikcp_recv(ctx->kcp, (char *)buf, (int)len);
if (n < 0) {
return 0; /* 暂无数据 */
}
uint64_t t1 = omni_now_ms();
logger_on_proto_recv_latency(t1 - t0);
logger_log("DEBUG", "kcp", "recv payload_bytes=%d proto_ms=%llu",
n, (unsigned long long)(t1 - t0));
return (ssize_t)n;
}

View File

@@ -10,6 +10,7 @@
#include <arpa/inet.h>
#include <errno.h>
#include <netinet/tcp.h>
#include <netinet/in.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
@@ -17,10 +18,52 @@
#include <sys/types.h>
#include <unistd.h>
/* Linux 下 TCP_INFO 定义通常已在 <netinet/tcp.h> 提供,避免引入 <linux/tcp.h> 重定义 */
struct TcpContext {
int fd;
};
#ifdef __linux__
static void tcp_log_info(int fd, const char *tag)
{
struct tcp_info ti;
socklen_t len = sizeof(ti);
if (getsockopt(fd, IPPROTO_TCP, TCP_INFO, &ti, &len) != 0) {
return;
}
/* 注意tcpi_rtt 单位通常为微秒Linux这里转 ms 仅用于日志观察 */
unsigned long long rtt_ms = (unsigned long long)(ti.tcpi_rtt / 1000u);
unsigned long long rttvar_ms = (unsigned long long)(ti.tcpi_rttvar / 1000u);
logger_log("INFO", "tcpinfo",
"tag=%s state=%u retransmits=%u probes=%u backoff=%u "
"rto=%u ato=%u rtt_ms=%llu rttvar_ms=%llu "
"snd_cwnd=%u snd_ssthresh=%u snd_mss=%u rcv_mss=%u "
"lost=%u retrans=%u fackets=%u "
"last_data_sent_ms=%u last_data_recv_ms=%u",
tag ? tag : "sample",
(unsigned)ti.tcpi_state,
(unsigned)ti.tcpi_retransmits,
(unsigned)ti.tcpi_probes,
(unsigned)ti.tcpi_backoff,
(unsigned)ti.tcpi_rto,
(unsigned)ti.tcpi_ato,
rtt_ms,
rttvar_ms,
(unsigned)ti.tcpi_snd_cwnd,
(unsigned)ti.tcpi_snd_ssthresh,
(unsigned)ti.tcpi_snd_mss,
(unsigned)ti.tcpi_rcv_mss,
(unsigned)ti.tcpi_lost,
(unsigned)ti.tcpi_retrans,
(unsigned)ti.tcpi_fackets,
(unsigned)ti.tcpi_last_data_sent,
(unsigned)ti.tcpi_last_data_recv);
}
#endif
static int tcp_set_nodelay(int fd)
{
int flag = 1;
@@ -178,6 +221,7 @@ static ssize_t tcp_send(OmniContext *c, const void *buf, size_t len)
struct TcpContext *ctx = (struct TcpContext *)c;
if (!ctx || ctx->fd < 0) return OMNI_ERR_PARAM;
uint64_t t0 = omni_now_ms();
MsgHeader hdr;
hdr.magic = htonl(MSG_MAGIC);
hdr.length = htonl((uint32_t)len);
@@ -196,6 +240,13 @@ static ssize_t tcp_send(OmniContext *c, const void *buf, size_t len)
return OMNI_ERR_IO;
}
uint64_t t1 = omni_now_ms();
logger_on_proto_send_latency(t1 - t0);
logger_log("DEBUG", "tcp", "send payload_bytes=%zu header_bytes=%zu proto_ms=%llu",
len, (size_t)MSG_HEADER_SIZE, (unsigned long long)(t1 - t0));
#ifdef __linux__
tcp_log_info(ctx->fd, "after_send");
#endif
return (ssize_t)len;
}
@@ -204,6 +255,7 @@ static ssize_t tcp_recv(OmniContext *c, void *buf, size_t len)
struct TcpContext *ctx = (struct TcpContext *)c;
if (!ctx || ctx->fd < 0) return OMNI_ERR_PARAM;
uint64_t t0 = omni_now_ms();
uint8_t header_buf[MSG_HEADER_SIZE];
ssize_t n1 = tcp_read_n(ctx->fd, header_buf, MSG_HEADER_SIZE);
if (n1 <= 0) {
@@ -233,6 +285,13 @@ static ssize_t tcp_recv(OmniContext *c, void *buf, size_t len)
return OMNI_ERR_IO;
}
uint64_t t1 = omni_now_ms();
logger_on_proto_recv_latency(t1 - t0);
logger_log("DEBUG", "tcp", "recv payload_bytes=%u header_bytes=%zu proto_ms=%llu",
payload_len, (size_t)MSG_HEADER_SIZE, (unsigned long long)(t1 - t0));
#ifdef __linux__
tcp_log_info(ctx->fd, "after_recv");
#endif
return (ssize_t)payload_len;
}