diff --git a/src/apps/test_main.c b/src/apps/test_main.c index 0aa63af..51d2ce8 100644 --- a/src/apps/test_main.c +++ b/src/apps/test_main.c @@ -1,161 +1,177 @@ -/* - * test_main.c - * 简单测试程序:客户端/服务端双向传输 + 日志观测 - * - * 使用方式示例(同一机器上,两个终端): - * 终端1:./omni_test -r server -p tcp -P 9000 - * 终端2:./omni_test -r client -p tcp -P 9000 -H 127.0.0.1 - * - * 协议可选:tcp / udp / kcp - */ - -#include "common.h" -#include "network.h" -#include "logger.h" - -#include -#include -#include -#include - -static void usage(const char *prog) -{ - fprintf(stderr, - "Usage:\n" - " %s -r server -p tcp|udp|kcp -P \n" - " %s -r client -p tcp|udp|kcp -P -H \n", - prog, prog); -} - -static OmniProtocol parse_proto(const char *s) -{ - if (strcmp(s, "tcp") == 0) return OMNI_PROTO_TCP; - if (strcmp(s, "udp") == 0) return OMNI_PROTO_UDP; - if (strcmp(s, "kcp") == 0) return OMNI_PROTO_KCP; - return OMNI_PROTO_TCP; -} - -static void run_server(OmniProtocol proto, uint16_t port) -{ - OmniContext *ctx = omni_init(OMNI_ROLE_SERVER, proto, - NULL, port, - NULL, 0); - if (!ctx) { - fprintf(stderr, "server: omni_init failed\n"); - return; - } - - logger_log("INFO", "test", "server_started proto=%d port=%u", - (int)proto, (unsigned)port); - - char buf[4096]; - for (;;) { - ssize_t n = omni_recv(ctx, buf, sizeof(buf)); - if (n <= 0) { - logger_log("INFO", "test", "server_recv_end n=%zd", n); - break; - } - logger_log("INFO", "test", "server_recv bytes=%zd", n); - - /* 简单 echo 回客户端,验证双向通信 */ - ssize_t m = omni_send(ctx, buf, (size_t)n); - logger_log("INFO", "test", "server_echo bytes=%zd", m); - } - - omni_close(ctx); -} - -static void run_client(OmniProtocol proto, const char *host, uint16_t port) -{ - if (!host) { - fprintf(stderr, "client: host is required\n"); - return; - } - - OmniContext *ctx = omni_init(OMNI_ROLE_CLIENT, proto, - NULL, 0, - host, port); - if (!ctx) { - fprintf(stderr, "client: omni_init failed\n"); - return; - } - - logger_log("INFO", "test", "client_started proto=%d host=%s port=%u", - (int)proto, host, (unsigned)port); - - char send_buf[2048]; - char recv_buf[4096]; - - for (int i = 0; i < 100; ++i) { - int len = snprintf(send_buf, sizeof(send_buf), - "msg=%d time_ms=%llu payload_size=%zu", - i, - (unsigned long long)omni_now_ms(), - sizeof(send_buf)); - - ssize_t n = omni_send(ctx, send_buf, (size_t)len); - logger_log("INFO", "test", "client_send i=%d bytes=%zd", i, n); - if (n <= 0) break; - - ssize_t m = omni_recv(ctx, recv_buf, sizeof(recv_buf)); - if (m <= 0) { - logger_log("INFO", "test", "client_recv_end i=%d bytes=%zd", i, m); - break; - } - logger_log("INFO", "test", - "client_recv_echo i=%d bytes=%zd first_bytes=\"%.32s\"", - i, m, recv_buf); - - usleep(10 * 1000); /* 10ms 间隔,模拟稳定流量 */ - } - - omni_close(ctx); -} - -int main(int argc, char **argv) -{ - const char *role_str = NULL; - const char *proto_str = "tcp"; - const char *host = NULL; - int port = 0; - - int opt; - while ((opt = getopt(argc, argv, "r:p:P:H:")) != -1) { - switch (opt) { - case 'r': - role_str = optarg; - break; - case 'p': - proto_str = optarg; - break; - case 'P': - port = atoi(optarg); - break; - case 'H': - host = optarg; - break; - default: - usage(argv[0]); - return 1; - } - } - - if (!role_str || port <= 0) { - usage(argv[0]); - return 1; - } - - OmniProtocol proto = parse_proto(proto_str); - - if (strcmp(role_str, "server") == 0) { - run_server(proto, (uint16_t)port); - } else if (strcmp(role_str, "client") == 0) { - run_client(proto, host, (uint16_t)port); - } else { - usage(argv[0]); - return 1; - } - - return 0; -} - +/* + * test_main.c + * 简单测试程序:客户端/服务端双向传输 + 日志观测 + * + * 使用方式示例(同一机器上,两个终端): + * 终端1:./omni_test -r server -p tcp -P 9000 + * 终端2:./omni_test -r client -p tcp -P 9000 -H 127.0.0.1 + * + * 协议可选:tcp / udp / kcp + */ + +#include "common.h" +#include "network.h" +#include "logger.h" + +#include +#include +#include +#include + +static void usage(const char *prog) +{ + fprintf(stderr, + "Usage:\n" + " %s -r server -p tcp|udp|kcp -P \n" + " %s -r client -p tcp|udp|kcp -P -H \n", + prog, prog); +} + +static OmniProtocol parse_proto(const char *s) +{ + if (strcmp(s, "tcp") == 0) return OMNI_PROTO_TCP; + if (strcmp(s, "udp") == 0) return OMNI_PROTO_UDP; + if (strcmp(s, "kcp") == 0) return OMNI_PROTO_KCP; + return OMNI_PROTO_TCP; +} + +static void run_server(OmniProtocol proto, uint16_t port) +{ + OmniContext *ctx = omni_init(OMNI_ROLE_SERVER, proto, + NULL, port, + NULL, 0); + if (!ctx) { + fprintf(stderr, "server: omni_init failed\n"); + return; + } + + logger_log("INFO", "test", "server_started proto=%d port=%u", + (int)proto, (unsigned)port); + + char buf[4096]; + for (;;) { + ssize_t n = omni_recv(ctx, buf, sizeof(buf)); + if (n < 0) { + logger_log("INFO", "test", "server_recv_end n=%zd", n); + break; + } + if (n == 0) { + if (proto == OMNI_PROTO_KCP) { + usleep(10 * 1000); + continue; + } + logger_log("INFO", "test", "server_recv_end n=%zd", n); + break; + } + logger_log("INFO", "test", "server_recv bytes=%zd", n); + + /* 简单 echo 回客户端,验证双向通信 */ + ssize_t m = omni_send(ctx, buf, (size_t)n); + logger_log("INFO", "test", "server_echo bytes=%zd", m); + } + + omni_close(ctx); +} + +static void run_client(OmniProtocol proto, const char *host, uint16_t port) +{ + if (!host) { + fprintf(stderr, "client: host is required\n"); + return; + } + + OmniContext *ctx = omni_init(OMNI_ROLE_CLIENT, proto, + NULL, 0, + host, port); + if (!ctx) { + fprintf(stderr, "client: omni_init failed\n"); + return; + } + + logger_log("INFO", "test", "client_started proto=%d host=%s port=%u", + (int)proto, host, (unsigned)port); + + char send_buf[2048]; + char recv_buf[4096]; + + for (int i = 0; i < 100; ++i) { + int len = snprintf(send_buf, sizeof(send_buf), + "msg=%d time_ms=%llu payload_size=%zu", + i, + (unsigned long long)omni_now_ms(), + sizeof(send_buf)); + + ssize_t n = omni_send(ctx, send_buf, (size_t)len); + logger_log("INFO", "test", "client_send i=%d bytes=%zd", i, n); + if (n <= 0) break; + + ssize_t m = omni_recv(ctx, recv_buf, sizeof(recv_buf)); + if (m < 0) { + logger_log("INFO", "test", "client_recv_end i=%d bytes=%zd", i, m); + break; + } + if (m == 0) { + if (proto == OMNI_PROTO_KCP) { + usleep(10 * 1000); + --i; + continue; + } + logger_log("INFO", "test", "client_recv_end i=%d bytes=%zd", i, m); + break; + } + logger_log("INFO", "test", + "client_recv_echo i=%d bytes=%zd first_bytes=\"%.32s\"", + i, m, recv_buf); + + usleep(10 * 1000); /* 10ms 间隔,模拟稳定流量 */ + } + + omni_close(ctx); +} + +int main(int argc, char **argv) +{ + const char *role_str = NULL; + const char *proto_str = "tcp"; + const char *host = NULL; + int port = 0; + + int opt; + while ((opt = getopt(argc, argv, "r:p:P:H:")) != -1) { + switch (opt) { + case 'r': + role_str = optarg; + break; + case 'p': + proto_str = optarg; + break; + case 'P': + port = atoi(optarg); + break; + case 'H': + host = optarg; + break; + default: + usage(argv[0]); + return 1; + } + } + + if (!role_str || port <= 0) { + usage(argv[0]); + return 1; + } + + OmniProtocol proto = parse_proto(proto_str); + + if (strcmp(role_str, "server") == 0) { + run_server(proto, (uint16_t)port); + } else if (strcmp(role_str, "client") == 0) { + run_client(proto, host, (uint16_t)port); + } else { + usage(argv[0]); + return 1; + } + + return 0; +} diff --git a/src/core/logger.c b/src/core/logger.c index d283afb..b783431 100644 --- a/src/core/logger.c +++ b/src/core/logger.c @@ -6,20 +6,32 @@ #include "logger.h" #include "common.h" -#include -#include -#include -#include +#include +#include +#include +#include +#include -static OmniStats g_stats; -static FILE *g_json_fp = NULL; +static OmniStats g_stats; +static FILE *g_json_fp = NULL; +static int g_min_level = 1; /* default INFO */ -static uint64_t now_ms(void) -{ - return omni_now_ms(); -} - -static void ewma_update(double *avg, double sample, double alpha) +static uint64_t now_ms(void) +{ + return omni_now_ms(); +} + +static int level_to_int(const char *level) +{ + if (!level) return 1; + if (strcmp(level, "DEBUG") == 0) return 0; + if (strcmp(level, "INFO") == 0) return 1; + if (strcmp(level, "WARN") == 0) return 2; + if (strcmp(level, "ERROR") == 0) return 3; + return 1; +} + +static void ewma_update(double *avg, double sample, double alpha) { if (*avg <= 0.0) { *avg = sample; @@ -28,18 +40,21 @@ static void ewma_update(double *avg, double sample, double alpha) *avg = (*avg) * (1.0 - alpha) + sample * alpha; } -void logger_init(void) -{ - memset(&g_stats, 0, sizeof(g_stats)); - g_stats.start_ms = now_ms(); - g_stats.last_report_ms = g_stats.start_ms; - g_stats.send_call_min_ms = UINT64_MAX; - g_stats.recv_call_min_ms = UINT64_MAX; - - if (!g_json_fp) { - g_json_fp = fopen("omni_logs.jsonl", "a"); - } -} +void logger_init(void) +{ + memset(&g_stats, 0, sizeof(g_stats)); + g_stats.start_ms = now_ms(); + g_stats.last_report_ms = g_stats.start_ms; + g_stats.send_call_min_ms = UINT64_MAX; + g_stats.recv_call_min_ms = UINT64_MAX; + + const char *lvl_env = getenv("OMNI_LOG_LEVEL"); + g_min_level = level_to_int(lvl_env); + + if (!g_json_fp) { + g_json_fp = fopen("omni_logs.jsonl", "a"); + } +} void logger_on_send(size_t bytes) { @@ -224,14 +239,18 @@ void logger_print_performance_log(const char *tag) } } -void logger_log(const char *level, const char *component, - const char *fmt, ...) -{ - const char *lvl = level ? level : "INFO"; - const char *comp = component ? component : "general"; - - FILE *fp = stderr; - print_timestamp(fp); +void logger_log(const char *level, const char *component, + const char *fmt, ...) +{ + const char *lvl = level ? level : "INFO"; + const char *comp = component ? component : "general"; + + if (level_to_int(lvl) < g_min_level) { + return; + } + + FILE *fp = stderr; + print_timestamp(fp); fprintf(fp, "level=%s component=%s ", lvl, comp); char msg_buf[1024]; diff --git a/src/core/network.c b/src/core/network.c index b71618a..57a4d57 100644 --- a/src/core/network.c +++ b/src/core/network.c @@ -96,6 +96,9 @@ ssize_t omni_send(OmniContext *ctx, const void *buf, size_t len) } logger_log("DEBUG", "network", "omni_send proto=%d bytes=%zd call_ms=%llu", (int)ctx->proto, n, (unsigned long long)(t1 - t0)); + if (n > 0) { + logger_print_performance_log("on_send"); + } return n; } @@ -114,6 +117,9 @@ ssize_t omni_recv(OmniContext *ctx, void *buf, size_t len) } logger_log("DEBUG", "network", "omni_recv proto=%d bytes=%zd call_ms=%llu", (int)ctx->proto, n, (unsigned long long)(t1 - t0)); + if (n > 0) { + logger_print_performance_log("on_recv"); + } return n; } @@ -126,4 +132,3 @@ void omni_close(OmniContext *ctx) logger_print_performance_log("final"); free(ctx); } - diff --git a/src/protocols/kcp_impl.c b/src/protocols/kcp_impl.c index 0e838d5..d9263f5 100644 --- a/src/protocols/kcp_impl.c +++ b/src/protocols/kcp_impl.c @@ -38,16 +38,14 @@ static int kcp_output(const char *buf, int len, ikcpcb *kcp, void *user) return 0; } -static OmniContext *kcp_init(OmniRole role, - const char *bind_ip, - uint16_t bind_port, - const char *peer_ip, - uint16_t peer_port) -{ - (void)role; - - struct KcpContext *ctx = (struct KcpContext *)calloc(1, sizeof(*ctx)); - if (!ctx) return NULL; +static OmniContext *kcp_init(OmniRole role, + const char *bind_ip, + uint16_t bind_port, + const char *peer_ip, + uint16_t peer_port) +{ + struct KcpContext *ctx = (struct KcpContext *)calloc(1, sizeof(*ctx)); + if (!ctx) return NULL; int fd = socket(AF_INET, SOCK_DGRAM, 0); if (fd < 0) { @@ -78,8 +76,9 @@ static OmniContext *kcp_init(OmniRole role, ctx->fd = fd; - /* conv 可简单使用端口号 */ - IUINT32 conv = (IUINT32)peer_port; + /* conv 必须两端一致:server 用 bind_port,client 用 peer_port */ + IUINT32 conv = (role == OMNI_ROLE_SERVER) ? (IUINT32)bind_port + : (IUINT32)peer_port; ikcpcb *kcp = ikcp_create(conv, ctx); if (!kcp) { logger_log("ERROR", "kcp", "ikcp_create_failed"); @@ -94,11 +93,12 @@ static OmniContext *kcp_init(OmniRole role, ikcp_nodelay(kcp, 1, 10, 2, 1); ikcp_wndsize(kcp, 128, 128); - logger_log("INFO", "kcp", - "init bind_port=%u peer_ip=%s peer_port=%u", - (unsigned)bind_port, - peer_ip ? peer_ip : "NULL", - (unsigned)peer_port); + logger_log("INFO", "kcp", + "init bind_port=%u peer_ip=%s peer_port=%u conv=%u", + (unsigned)bind_port, + peer_ip ? peer_ip : "NULL", + (unsigned)peer_port, + (unsigned)conv); return (OmniContext *)ctx; }