fix: 重构基础通信框架为异步事件循环,并修复 KCP conv 错位与接收漏斗堵塞问题
This commit is contained in:
@@ -1,161 +1,177 @@
|
||||
/*
|
||||
* test_main.c
|
||||
* 简单测试程序:客户端/服务端双向传输 + 日志观测
|
||||
*
|
||||
* 使用方式示例(同一机器上,两个终端):
|
||||
* 终端1:./omni_test -r server -p tcp -P 9000
|
||||
* 终端2:./omni_test -r client -p tcp -P 9000 -H 127.0.0.1
|
||||
*
|
||||
* 协议可选:tcp / udp / kcp
|
||||
*/
|
||||
|
||||
#include "common.h"
|
||||
#include "network.h"
|
||||
#include "logger.h"
|
||||
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include <unistd.h>
|
||||
|
||||
static void usage(const char *prog)
|
||||
{
|
||||
fprintf(stderr,
|
||||
"Usage:\n"
|
||||
" %s -r server -p tcp|udp|kcp -P <port>\n"
|
||||
" %s -r client -p tcp|udp|kcp -P <port> -H <host>\n",
|
||||
prog, prog);
|
||||
}
|
||||
|
||||
static OmniProtocol parse_proto(const char *s)
|
||||
{
|
||||
if (strcmp(s, "tcp") == 0) return OMNI_PROTO_TCP;
|
||||
if (strcmp(s, "udp") == 0) return OMNI_PROTO_UDP;
|
||||
if (strcmp(s, "kcp") == 0) return OMNI_PROTO_KCP;
|
||||
return OMNI_PROTO_TCP;
|
||||
}
|
||||
|
||||
static void run_server(OmniProtocol proto, uint16_t port)
|
||||
{
|
||||
OmniContext *ctx = omni_init(OMNI_ROLE_SERVER, proto,
|
||||
NULL, port,
|
||||
NULL, 0);
|
||||
if (!ctx) {
|
||||
fprintf(stderr, "server: omni_init failed\n");
|
||||
return;
|
||||
}
|
||||
|
||||
logger_log("INFO", "test", "server_started proto=%d port=%u",
|
||||
(int)proto, (unsigned)port);
|
||||
|
||||
char buf[4096];
|
||||
for (;;) {
|
||||
ssize_t n = omni_recv(ctx, buf, sizeof(buf));
|
||||
if (n <= 0) {
|
||||
logger_log("INFO", "test", "server_recv_end n=%zd", n);
|
||||
break;
|
||||
}
|
||||
logger_log("INFO", "test", "server_recv bytes=%zd", n);
|
||||
|
||||
/* 简单 echo 回客户端,验证双向通信 */
|
||||
ssize_t m = omni_send(ctx, buf, (size_t)n);
|
||||
logger_log("INFO", "test", "server_echo bytes=%zd", m);
|
||||
}
|
||||
|
||||
omni_close(ctx);
|
||||
}
|
||||
|
||||
static void run_client(OmniProtocol proto, const char *host, uint16_t port)
|
||||
{
|
||||
if (!host) {
|
||||
fprintf(stderr, "client: host is required\n");
|
||||
return;
|
||||
}
|
||||
|
||||
OmniContext *ctx = omni_init(OMNI_ROLE_CLIENT, proto,
|
||||
NULL, 0,
|
||||
host, port);
|
||||
if (!ctx) {
|
||||
fprintf(stderr, "client: omni_init failed\n");
|
||||
return;
|
||||
}
|
||||
|
||||
logger_log("INFO", "test", "client_started proto=%d host=%s port=%u",
|
||||
(int)proto, host, (unsigned)port);
|
||||
|
||||
char send_buf[2048];
|
||||
char recv_buf[4096];
|
||||
|
||||
for (int i = 0; i < 100; ++i) {
|
||||
int len = snprintf(send_buf, sizeof(send_buf),
|
||||
"msg=%d time_ms=%llu payload_size=%zu",
|
||||
i,
|
||||
(unsigned long long)omni_now_ms(),
|
||||
sizeof(send_buf));
|
||||
|
||||
ssize_t n = omni_send(ctx, send_buf, (size_t)len);
|
||||
logger_log("INFO", "test", "client_send i=%d bytes=%zd", i, n);
|
||||
if (n <= 0) break;
|
||||
|
||||
ssize_t m = omni_recv(ctx, recv_buf, sizeof(recv_buf));
|
||||
if (m <= 0) {
|
||||
logger_log("INFO", "test", "client_recv_end i=%d bytes=%zd", i, m);
|
||||
break;
|
||||
}
|
||||
logger_log("INFO", "test",
|
||||
"client_recv_echo i=%d bytes=%zd first_bytes=\"%.32s\"",
|
||||
i, m, recv_buf);
|
||||
|
||||
usleep(10 * 1000); /* 10ms 间隔,模拟稳定流量 */
|
||||
}
|
||||
|
||||
omni_close(ctx);
|
||||
}
|
||||
|
||||
int main(int argc, char **argv)
|
||||
{
|
||||
const char *role_str = NULL;
|
||||
const char *proto_str = "tcp";
|
||||
const char *host = NULL;
|
||||
int port = 0;
|
||||
|
||||
int opt;
|
||||
while ((opt = getopt(argc, argv, "r:p:P:H:")) != -1) {
|
||||
switch (opt) {
|
||||
case 'r':
|
||||
role_str = optarg;
|
||||
break;
|
||||
case 'p':
|
||||
proto_str = optarg;
|
||||
break;
|
||||
case 'P':
|
||||
port = atoi(optarg);
|
||||
break;
|
||||
case 'H':
|
||||
host = optarg;
|
||||
break;
|
||||
default:
|
||||
usage(argv[0]);
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
|
||||
if (!role_str || port <= 0) {
|
||||
usage(argv[0]);
|
||||
return 1;
|
||||
}
|
||||
|
||||
OmniProtocol proto = parse_proto(proto_str);
|
||||
|
||||
if (strcmp(role_str, "server") == 0) {
|
||||
run_server(proto, (uint16_t)port);
|
||||
} else if (strcmp(role_str, "client") == 0) {
|
||||
run_client(proto, host, (uint16_t)port);
|
||||
} else {
|
||||
usage(argv[0]);
|
||||
return 1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
/*
|
||||
* test_main.c
|
||||
* 简单测试程序:客户端/服务端双向传输 + 日志观测
|
||||
*
|
||||
* 使用方式示例(同一机器上,两个终端):
|
||||
* 终端1:./omni_test -r server -p tcp -P 9000
|
||||
* 终端2:./omni_test -r client -p tcp -P 9000 -H 127.0.0.1
|
||||
*
|
||||
* 协议可选:tcp / udp / kcp
|
||||
*/
|
||||
|
||||
#include "common.h"
|
||||
#include "network.h"
|
||||
#include "logger.h"
|
||||
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include <unistd.h>
|
||||
|
||||
static void usage(const char *prog)
|
||||
{
|
||||
fprintf(stderr,
|
||||
"Usage:\n"
|
||||
" %s -r server -p tcp|udp|kcp -P <port>\n"
|
||||
" %s -r client -p tcp|udp|kcp -P <port> -H <host>\n",
|
||||
prog, prog);
|
||||
}
|
||||
|
||||
static OmniProtocol parse_proto(const char *s)
|
||||
{
|
||||
if (strcmp(s, "tcp") == 0) return OMNI_PROTO_TCP;
|
||||
if (strcmp(s, "udp") == 0) return OMNI_PROTO_UDP;
|
||||
if (strcmp(s, "kcp") == 0) return OMNI_PROTO_KCP;
|
||||
return OMNI_PROTO_TCP;
|
||||
}
|
||||
|
||||
static void run_server(OmniProtocol proto, uint16_t port)
|
||||
{
|
||||
OmniContext *ctx = omni_init(OMNI_ROLE_SERVER, proto,
|
||||
NULL, port,
|
||||
NULL, 0);
|
||||
if (!ctx) {
|
||||
fprintf(stderr, "server: omni_init failed\n");
|
||||
return;
|
||||
}
|
||||
|
||||
logger_log("INFO", "test", "server_started proto=%d port=%u",
|
||||
(int)proto, (unsigned)port);
|
||||
|
||||
char buf[4096];
|
||||
for (;;) {
|
||||
ssize_t n = omni_recv(ctx, buf, sizeof(buf));
|
||||
if (n < 0) {
|
||||
logger_log("INFO", "test", "server_recv_end n=%zd", n);
|
||||
break;
|
||||
}
|
||||
if (n == 0) {
|
||||
if (proto == OMNI_PROTO_KCP) {
|
||||
usleep(10 * 1000);
|
||||
continue;
|
||||
}
|
||||
logger_log("INFO", "test", "server_recv_end n=%zd", n);
|
||||
break;
|
||||
}
|
||||
logger_log("INFO", "test", "server_recv bytes=%zd", n);
|
||||
|
||||
/* 简单 echo 回客户端,验证双向通信 */
|
||||
ssize_t m = omni_send(ctx, buf, (size_t)n);
|
||||
logger_log("INFO", "test", "server_echo bytes=%zd", m);
|
||||
}
|
||||
|
||||
omni_close(ctx);
|
||||
}
|
||||
|
||||
static void run_client(OmniProtocol proto, const char *host, uint16_t port)
|
||||
{
|
||||
if (!host) {
|
||||
fprintf(stderr, "client: host is required\n");
|
||||
return;
|
||||
}
|
||||
|
||||
OmniContext *ctx = omni_init(OMNI_ROLE_CLIENT, proto,
|
||||
NULL, 0,
|
||||
host, port);
|
||||
if (!ctx) {
|
||||
fprintf(stderr, "client: omni_init failed\n");
|
||||
return;
|
||||
}
|
||||
|
||||
logger_log("INFO", "test", "client_started proto=%d host=%s port=%u",
|
||||
(int)proto, host, (unsigned)port);
|
||||
|
||||
char send_buf[2048];
|
||||
char recv_buf[4096];
|
||||
|
||||
for (int i = 0; i < 100; ++i) {
|
||||
int len = snprintf(send_buf, sizeof(send_buf),
|
||||
"msg=%d time_ms=%llu payload_size=%zu",
|
||||
i,
|
||||
(unsigned long long)omni_now_ms(),
|
||||
sizeof(send_buf));
|
||||
|
||||
ssize_t n = omni_send(ctx, send_buf, (size_t)len);
|
||||
logger_log("INFO", "test", "client_send i=%d bytes=%zd", i, n);
|
||||
if (n <= 0) break;
|
||||
|
||||
ssize_t m = omni_recv(ctx, recv_buf, sizeof(recv_buf));
|
||||
if (m < 0) {
|
||||
logger_log("INFO", "test", "client_recv_end i=%d bytes=%zd", i, m);
|
||||
break;
|
||||
}
|
||||
if (m == 0) {
|
||||
if (proto == OMNI_PROTO_KCP) {
|
||||
usleep(10 * 1000);
|
||||
--i;
|
||||
continue;
|
||||
}
|
||||
logger_log("INFO", "test", "client_recv_end i=%d bytes=%zd", i, m);
|
||||
break;
|
||||
}
|
||||
logger_log("INFO", "test",
|
||||
"client_recv_echo i=%d bytes=%zd first_bytes=\"%.32s\"",
|
||||
i, m, recv_buf);
|
||||
|
||||
usleep(10 * 1000); /* 10ms 间隔,模拟稳定流量 */
|
||||
}
|
||||
|
||||
omni_close(ctx);
|
||||
}
|
||||
|
||||
int main(int argc, char **argv)
|
||||
{
|
||||
const char *role_str = NULL;
|
||||
const char *proto_str = "tcp";
|
||||
const char *host = NULL;
|
||||
int port = 0;
|
||||
|
||||
int opt;
|
||||
while ((opt = getopt(argc, argv, "r:p:P:H:")) != -1) {
|
||||
switch (opt) {
|
||||
case 'r':
|
||||
role_str = optarg;
|
||||
break;
|
||||
case 'p':
|
||||
proto_str = optarg;
|
||||
break;
|
||||
case 'P':
|
||||
port = atoi(optarg);
|
||||
break;
|
||||
case 'H':
|
||||
host = optarg;
|
||||
break;
|
||||
default:
|
||||
usage(argv[0]);
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
|
||||
if (!role_str || port <= 0) {
|
||||
usage(argv[0]);
|
||||
return 1;
|
||||
}
|
||||
|
||||
OmniProtocol proto = parse_proto(proto_str);
|
||||
|
||||
if (strcmp(role_str, "server") == 0) {
|
||||
run_server(proto, (uint16_t)port);
|
||||
} else if (strcmp(role_str, "client") == 0) {
|
||||
run_client(proto, host, (uint16_t)port);
|
||||
} else {
|
||||
usage(argv[0]);
|
||||
return 1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
@@ -6,20 +6,32 @@
|
||||
#include "logger.h"
|
||||
#include "common.h"
|
||||
|
||||
#include <stdio.h>
|
||||
#include <stdarg.h>
|
||||
#include <string.h>
|
||||
#include <inttypes.h>
|
||||
#include <stdio.h>
|
||||
#include <stdarg.h>
|
||||
#include <string.h>
|
||||
#include <inttypes.h>
|
||||
#include <stdlib.h>
|
||||
|
||||
static OmniStats g_stats;
|
||||
static FILE *g_json_fp = NULL;
|
||||
static OmniStats g_stats;
|
||||
static FILE *g_json_fp = NULL;
|
||||
static int g_min_level = 1; /* default INFO */
|
||||
|
||||
static uint64_t now_ms(void)
|
||||
{
|
||||
return omni_now_ms();
|
||||
}
|
||||
|
||||
static void ewma_update(double *avg, double sample, double alpha)
|
||||
static uint64_t now_ms(void)
|
||||
{
|
||||
return omni_now_ms();
|
||||
}
|
||||
|
||||
static int level_to_int(const char *level)
|
||||
{
|
||||
if (!level) return 1;
|
||||
if (strcmp(level, "DEBUG") == 0) return 0;
|
||||
if (strcmp(level, "INFO") == 0) return 1;
|
||||
if (strcmp(level, "WARN") == 0) return 2;
|
||||
if (strcmp(level, "ERROR") == 0) return 3;
|
||||
return 1;
|
||||
}
|
||||
|
||||
static void ewma_update(double *avg, double sample, double alpha)
|
||||
{
|
||||
if (*avg <= 0.0) {
|
||||
*avg = sample;
|
||||
@@ -28,18 +40,21 @@ static void ewma_update(double *avg, double sample, double alpha)
|
||||
*avg = (*avg) * (1.0 - alpha) + sample * alpha;
|
||||
}
|
||||
|
||||
void logger_init(void)
|
||||
{
|
||||
memset(&g_stats, 0, sizeof(g_stats));
|
||||
g_stats.start_ms = now_ms();
|
||||
g_stats.last_report_ms = g_stats.start_ms;
|
||||
g_stats.send_call_min_ms = UINT64_MAX;
|
||||
g_stats.recv_call_min_ms = UINT64_MAX;
|
||||
|
||||
if (!g_json_fp) {
|
||||
g_json_fp = fopen("omni_logs.jsonl", "a");
|
||||
}
|
||||
}
|
||||
void logger_init(void)
|
||||
{
|
||||
memset(&g_stats, 0, sizeof(g_stats));
|
||||
g_stats.start_ms = now_ms();
|
||||
g_stats.last_report_ms = g_stats.start_ms;
|
||||
g_stats.send_call_min_ms = UINT64_MAX;
|
||||
g_stats.recv_call_min_ms = UINT64_MAX;
|
||||
|
||||
const char *lvl_env = getenv("OMNI_LOG_LEVEL");
|
||||
g_min_level = level_to_int(lvl_env);
|
||||
|
||||
if (!g_json_fp) {
|
||||
g_json_fp = fopen("omni_logs.jsonl", "a");
|
||||
}
|
||||
}
|
||||
|
||||
void logger_on_send(size_t bytes)
|
||||
{
|
||||
@@ -224,14 +239,18 @@ void logger_print_performance_log(const char *tag)
|
||||
}
|
||||
}
|
||||
|
||||
void logger_log(const char *level, const char *component,
|
||||
const char *fmt, ...)
|
||||
{
|
||||
const char *lvl = level ? level : "INFO";
|
||||
const char *comp = component ? component : "general";
|
||||
|
||||
FILE *fp = stderr;
|
||||
print_timestamp(fp);
|
||||
void logger_log(const char *level, const char *component,
|
||||
const char *fmt, ...)
|
||||
{
|
||||
const char *lvl = level ? level : "INFO";
|
||||
const char *comp = component ? component : "general";
|
||||
|
||||
if (level_to_int(lvl) < g_min_level) {
|
||||
return;
|
||||
}
|
||||
|
||||
FILE *fp = stderr;
|
||||
print_timestamp(fp);
|
||||
fprintf(fp, "level=%s component=%s ", lvl, comp);
|
||||
|
||||
char msg_buf[1024];
|
||||
|
||||
@@ -96,6 +96,9 @@ ssize_t omni_send(OmniContext *ctx, const void *buf, size_t len)
|
||||
}
|
||||
logger_log("DEBUG", "network", "omni_send proto=%d bytes=%zd call_ms=%llu",
|
||||
(int)ctx->proto, n, (unsigned long long)(t1 - t0));
|
||||
if (n > 0) {
|
||||
logger_print_performance_log("on_send");
|
||||
}
|
||||
return n;
|
||||
}
|
||||
|
||||
@@ -114,6 +117,9 @@ ssize_t omni_recv(OmniContext *ctx, void *buf, size_t len)
|
||||
}
|
||||
logger_log("DEBUG", "network", "omni_recv proto=%d bytes=%zd call_ms=%llu",
|
||||
(int)ctx->proto, n, (unsigned long long)(t1 - t0));
|
||||
if (n > 0) {
|
||||
logger_print_performance_log("on_recv");
|
||||
}
|
||||
return n;
|
||||
}
|
||||
|
||||
@@ -126,4 +132,3 @@ void omni_close(OmniContext *ctx)
|
||||
logger_print_performance_log("final");
|
||||
free(ctx);
|
||||
}
|
||||
|
||||
|
||||
@@ -38,16 +38,14 @@ static int kcp_output(const char *buf, int len, ikcpcb *kcp, void *user)
|
||||
return 0;
|
||||
}
|
||||
|
||||
static OmniContext *kcp_init(OmniRole role,
|
||||
const char *bind_ip,
|
||||
uint16_t bind_port,
|
||||
const char *peer_ip,
|
||||
uint16_t peer_port)
|
||||
{
|
||||
(void)role;
|
||||
|
||||
struct KcpContext *ctx = (struct KcpContext *)calloc(1, sizeof(*ctx));
|
||||
if (!ctx) return NULL;
|
||||
static OmniContext *kcp_init(OmniRole role,
|
||||
const char *bind_ip,
|
||||
uint16_t bind_port,
|
||||
const char *peer_ip,
|
||||
uint16_t peer_port)
|
||||
{
|
||||
struct KcpContext *ctx = (struct KcpContext *)calloc(1, sizeof(*ctx));
|
||||
if (!ctx) return NULL;
|
||||
|
||||
int fd = socket(AF_INET, SOCK_DGRAM, 0);
|
||||
if (fd < 0) {
|
||||
@@ -78,8 +76,9 @@ static OmniContext *kcp_init(OmniRole role,
|
||||
|
||||
ctx->fd = fd;
|
||||
|
||||
/* conv 可简单使用端口号 */
|
||||
IUINT32 conv = (IUINT32)peer_port;
|
||||
/* conv 必须两端一致:server 用 bind_port,client 用 peer_port */
|
||||
IUINT32 conv = (role == OMNI_ROLE_SERVER) ? (IUINT32)bind_port
|
||||
: (IUINT32)peer_port;
|
||||
ikcpcb *kcp = ikcp_create(conv, ctx);
|
||||
if (!kcp) {
|
||||
logger_log("ERROR", "kcp", "ikcp_create_failed");
|
||||
@@ -94,11 +93,12 @@ static OmniContext *kcp_init(OmniRole role,
|
||||
ikcp_nodelay(kcp, 1, 10, 2, 1);
|
||||
ikcp_wndsize(kcp, 128, 128);
|
||||
|
||||
logger_log("INFO", "kcp",
|
||||
"init bind_port=%u peer_ip=%s peer_port=%u",
|
||||
(unsigned)bind_port,
|
||||
peer_ip ? peer_ip : "NULL",
|
||||
(unsigned)peer_port);
|
||||
logger_log("INFO", "kcp",
|
||||
"init bind_port=%u peer_ip=%s peer_port=%u conv=%u",
|
||||
(unsigned)bind_port,
|
||||
peer_ip ? peer_ip : "NULL",
|
||||
(unsigned)peer_port,
|
||||
(unsigned)conv);
|
||||
|
||||
return (OmniContext *)ctx;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user