feat: 实现并完成核心功能测试套件

- 编译系统:支持通过 `make clean all` 进行全量编译,生成可执行文件 `omni_client`、`omni_server`、`omni_relay` 和 `omni_test`。
- 客户端-服务端文件传输:支持 TCP/UDP/KCP 协议,已验证文件收发功能(使用 `/tmp/input.bin` 作为测试文件)。
- 服务端指令驱动:服务端可通过控制台发送 ASCII 指令(如 `hello-client`)实时驱动客户端。
- 动态转发功能 (Relay):实现 UDP 协议下的动态目标切换,支持 `show` 查询和 `set` 命令实时修改转发目标(如从 9102 端口切换到 9103 端口)。
- 所有功能已在本地环境(127.0.0.1)通过完整流程验证。
This commit is contained in:
nnbcccscdscdsc
2026-03-13 22:39:41 +08:00
parent 4d475f8c92
commit 7ecd8a4ef4
9 changed files with 34061 additions and 152 deletions

358
src/apps/client_main.c Normal file
View File

@@ -0,0 +1,358 @@
/*
* client_main.c
* 客户端:读取大文件分片发送,同时后台接收服务端 ASCII 指令并打印
*
* 线程模型:
* - 主线程:读取文件并发送 FILE_CHUNK / FILE_END
* - 子线程:持续接收服务端 COMMAND 并打印
*
* 消息格式:
* - 每条业务消息为 [MsgHeader(16B) + payload]
* - MsgHeader 字段由 common.h 中的 encode/decode 统一处理
*/
#include "common.h"
#include "network.h"
#include "logger.h"
#include <pthread.h>
#include <signal.h>
#include <stdatomic.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#define CLIENT_FRAME_BUF_SIZE (MSG_HEADER_SIZE + 65536u)
typedef struct ClientRuntime {
/* 协议抽象层句柄。 */
OmniContext *ctx;
/* 线程共享运行标记1=运行中0=退出。 */
atomic_int running;
} ClientRuntime;
/*
* 进程级停止标记:
* - 收到 SIGINT/SIGTERM例如 Ctrl+C时置 1
* - 主线程据此触发收尾逻辑,保证线程/连接能优雅退出
*/
static volatile sig_atomic_t g_stop = 0;
static void on_signal(int signo)
{
(void)signo;
g_stop = 1;
}
static void install_signal_handlers(void)
{
struct sigaction sa;
memset(&sa, 0, sizeof(sa));
sa.sa_handler = on_signal;
sigemptyset(&sa.sa_mask);
sa.sa_flags = 0;
(void)sigaction(SIGINT, &sa, NULL);
(void)sigaction(SIGTERM, &sa, NULL);
}
static void usage(const char *prog)
{
fprintf(stderr,
"Usage:\n"
" %s -p tcp|udp|kcp -H <server_ip> -P <server_port> -f <file>\n"
" [-b <bind_port>] [-m <chunk_mtu>] [-w <wait_seconds|-1>]\n",
prog);
}
static OmniProtocol parse_proto(const char *s)
{
/* 输入非法时回退到 TCP方便本地默认测试。 */
if (!s) return OMNI_PROTO_TCP;
if (strcmp(s, "tcp") == 0) return OMNI_PROTO_TCP;
if (strcmp(s, "udp") == 0) return OMNI_PROTO_UDP;
if (strcmp(s, "kcp") == 0) return OMNI_PROTO_KCP;
return OMNI_PROTO_TCP;
}
static int send_app_message(OmniContext *ctx,
uint32_t type,
const void *payload,
uint32_t payload_len)
{
/*
* 统一应用层发包:
* 1) 组装业务头(网络字节序)
* 2) 拼接 payload
* 3) 通过 omni_send 一次发送整帧
*/
size_t total_len = MSG_HEADER_SIZE + (size_t)payload_len;
uint8_t *frame = (uint8_t *)malloc(total_len);
if (!frame) {
logger_log("ERROR", "client", "malloc_frame_failed len=%zu", total_len);
return OMNI_ERR_GENERIC;
}
MsgHeader hdr;
omni_msg_header_encode(&hdr, type, payload_len, omni_now_ms());
memcpy(frame, &hdr, MSG_HEADER_SIZE);
if (payload_len > 0 && payload) {
memcpy(frame + MSG_HEADER_SIZE, payload, payload_len);
}
ssize_t n = omni_send(ctx, frame, total_len);
free(frame);
if (n != (ssize_t)total_len) {
logger_log("ERROR", "client",
"omni_send_failed expect=%zu got=%zd type=%u",
total_len, n, (unsigned)type);
return OMNI_ERR_IO;
}
return OMNI_OK;
}
static int decode_app_message(const uint8_t *frame,
size_t frame_len,
MsgHeader *out_hdr,
const uint8_t **out_payload)
{
/*
* 统一应用层解包:
* - 至少要有 16B 头
* - 头中 len 与总帧长度必须一致,避免越界/脏数据
*/
if (!frame || frame_len < MSG_HEADER_SIZE || !out_hdr || !out_payload) {
return OMNI_ERR_PARAM;
}
MsgHeader net_hdr;
memcpy(&net_hdr, frame, MSG_HEADER_SIZE);
omni_msg_header_decode(&net_hdr, out_hdr);
if ((size_t)out_hdr->len + MSG_HEADER_SIZE != frame_len) {
return OMNI_ERR_IO;
}
*out_payload = frame + MSG_HEADER_SIZE;
return OMNI_OK;
}
static void *recv_thread_main(void *arg)
{
ClientRuntime *rt = (ClientRuntime *)arg;
uint8_t frame[CLIENT_FRAME_BUF_SIZE];
/*
* 显式启用可取消:主线程收尾时通过 pthread_cancel 打断阻塞 recv
* 避免 UDP/KCP 场景下因长时间无回包导致 join 卡住。
*/
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
while (atomic_load(&rt->running)) {
ssize_t n = omni_recv(rt->ctx, frame, sizeof(frame));
if (n < 0) {
logger_log("ERROR", "client", "recv_failed n=%zd", n);
break;
}
if (n == 0) {
/* 0 在不同协议下可能代表“暂时无数据”或“对端关闭”,做短暂退避避免空转。 */
usleep(2 * 1000);
continue;
}
MsgHeader hdr;
const uint8_t *payload = NULL;
int rc = decode_app_message(frame, (size_t)n, &hdr, &payload);
if (rc != OMNI_OK) {
logger_log("ERROR", "client", "invalid_app_frame bytes=%zd rc=%d", n, rc);
continue;
}
if (hdr.type == MSG_TYPE_COMMAND) {
/* COMMAND 约定为 ASCII 文本,做安全截断后打印。 */
char cmd[2048];
size_t cpy = hdr.len < (uint32_t)(sizeof(cmd) - 1) ? hdr.len : (sizeof(cmd) - 1);
memcpy(cmd, payload, cpy);
cmd[cpy] = '\0';
printf("[server-cmd] %s\n", cmd);
fflush(stdout);
} else {
/* 客户端当前只消费 COMMAND其它类型保留日志便于调试。 */
logger_log("INFO", "client",
"recv_non_command type=%u len=%u",
(unsigned)hdr.type, (unsigned)hdr.len);
}
}
atomic_store(&rt->running, 0);
return NULL;
}
int main(int argc, char **argv)
{
install_signal_handlers();
/* 命令行参数默认值。 */
const char *proto_str = "tcp";
const char *server_ip = NULL;
const char *file_path = NULL;
int server_port = 0;
int bind_port = 0;
unsigned chunk_size = OMNI_DEFAULT_MTU;
int wait_seconds = 2;
int opt;
while ((opt = getopt(argc, argv, "p:H:P:f:b:m:w:")) != -1) {
switch (opt) {
case 'p':
proto_str = optarg;
break;
case 'H':
server_ip = optarg;
break;
case 'P':
server_port = atoi(optarg);
break;
case 'f':
file_path = optarg;
break;
case 'b':
bind_port = atoi(optarg);
break;
case 'm':
chunk_size = (unsigned)strtoul(optarg, NULL, 10);
break;
case 'w':
wait_seconds = atoi(optarg);
break;
default:
usage(argv[0]);
return 1;
}
}
if (!server_ip || server_port <= 0 || !file_path) {
usage(argv[0]);
return 1;
}
if (chunk_size == 0 || chunk_size > 65536u) {
/* 约束 chunk 上限,避免一次申请/发送过大缓冲。 */
fprintf(stderr, "invalid chunk size: %u\n", chunk_size);
return 1;
}
FILE *fp = fopen(file_path, "rb");
if (!fp) {
perror("fopen");
return 1;
}
OmniProtocol proto = parse_proto(proto_str);
/* 客户端角色:对端地址由 -H/-P 指定。 */
OmniContext *ctx = omni_init(OMNI_ROLE_CLIENT, proto,
NULL, (uint16_t)bind_port,
server_ip, (uint16_t)server_port);
if (!ctx) {
fclose(fp);
fprintf(stderr, "omni_init failed\n");
return 1;
}
ClientRuntime rt;
rt.ctx = ctx;
atomic_init(&rt.running, 1);
/* 启动异步接收线程(打印服务端指令)。 */
pthread_t recv_tid;
if (pthread_create(&recv_tid, NULL, recv_thread_main, &rt) != 0) {
perror("pthread_create");
atomic_store(&rt.running, 0);
fclose(fp);
omni_close(ctx);
return 1;
}
uint8_t *chunk = (uint8_t *)malloc(chunk_size);
if (!chunk) {
logger_log("ERROR", "client", "malloc_chunk_failed size=%u", chunk_size);
atomic_store(&rt.running, 0);
pthread_cancel(recv_tid);
pthread_join(recv_tid, NULL);
omni_close(ctx);
fclose(fp);
return 1;
}
uint64_t total_sent = 0;
/*
* 主发送循环:
* - 每次读取 chunk_size 字节
* - 发送 FILE_CHUNK
* - EOF 后发送 FILE_END
*/
while (atomic_load(&rt.running)) {
if (g_stop) {
logger_log("INFO", "client", "signal_received_stop_sending");
atomic_store(&rt.running, 0);
break;
}
size_t nread = fread(chunk, 1, chunk_size, fp);
if (nread == 0) {
if (feof(fp)) {
break;
}
if (ferror(fp)) {
logger_log("ERROR", "client", "fread_failed");
atomic_store(&rt.running, 0);
break;
}
}
if (nread > 0) {
int rc = send_app_message(ctx, MSG_TYPE_FILE_CHUNK, chunk, (uint32_t)nread);
if (rc != OMNI_OK) {
atomic_store(&rt.running, 0);
break;
}
total_sent += nread;
}
}
if (atomic_load(&rt.running)) {
/* 正常结束时发送 FILE_END通知服务端落盘完成。 */
int rc = send_app_message(ctx, MSG_TYPE_FILE_END, NULL, 0);
if (rc != OMNI_OK) {
atomic_store(&rt.running, 0);
}
}
logger_log("INFO", "client", "file_transfer_done bytes=%llu",
(unsigned long long)total_sent);
free(chunk);
fclose(fp);
/*
* 等待模式:
* - wait_seconds >= 0: 发送完成后最多等待 N 秒
* - wait_seconds < 0 : 常驻模式,直到 Ctrl+CSIGINT或连接异常
*/
if (wait_seconds < 0) {
logger_log("INFO", "client", "keepalive_mode=on press_ctrl_c_to_exit");
while (atomic_load(&rt.running) && !g_stop) {
sleep(1);
}
} else {
for (int i = 0; i < wait_seconds && atomic_load(&rt.running) && !g_stop; ++i) {
sleep(1);
}
}
/* 收尾顺序:先停接收线程,再关闭网络上下文。 */
atomic_store(&rt.running, 0);
pthread_cancel(recv_tid);
pthread_join(recv_tid, NULL);
omni_close(ctx);
return 0;
}

269
src/apps/relay_main.c Normal file
View File

@@ -0,0 +1,269 @@
/*
* relay_main.c
* 中转站:从 A 接收数据后立即转发到 B支持运行时动态修改转发目标
*
* 并发模型:
* - 主线程:阻塞接收上游流量并转发到当前目标
* - 控制线程:读取 stdin 命令,动态切换目标地址
*
* 线程安全策略:
* - tx_ctx / target_ip / target_port 受 tx_mu 互斥锁保护
* - 主线程转发发送与控制线程切换目标不会并发踩内存
*
* 控制命令stdin
* set <ip> <port> 修改目标地址
* show 打印当前目标
* quit 退出
*/
#include "common.h"
#include "network.h"
#include "logger.h"
#include <pthread.h>
#include <stdatomic.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#define RELAY_BUF_SIZE (MSG_HEADER_SIZE + 65536u)
typedef struct RelayState {
/* 当前 relay 工作协议。 */
OmniProtocol proto;
/* 上游接收上下文(通常是服务端角色)。 */
OmniContext *rx_ctx;
/* 下游发送上下文(通常是客户端角色,可动态替换)。 */
OmniContext *tx_ctx;
/* 保护 tx_ctx 与目标地址信息。 */
pthread_mutex_t tx_mu;
/* 运行标志。 */
atomic_int running;
/* 当前目标地址快照(用于 show 命令与日志)。 */
char target_ip[64];
uint16_t target_port;
} RelayState;
static void usage(const char *prog)
{
fprintf(stderr,
"Usage:\n"
" %s -p tcp|udp|kcp -L <listen_port> -H <target_ip> -P <target_port>\n",
prog);
}
static OmniProtocol parse_proto(const char *s)
{
/* 非法输入回退 TCP。 */
if (!s) return OMNI_PROTO_TCP;
if (strcmp(s, "tcp") == 0) return OMNI_PROTO_TCP;
if (strcmp(s, "udp") == 0) return OMNI_PROTO_UDP;
if (strcmp(s, "kcp") == 0) return OMNI_PROTO_KCP;
return OMNI_PROTO_TCP;
}
static int relay_set_target(RelayState *st, const char *ip, uint16_t port)
{
/*
* 动态切换目标步骤:
* 1) 先建立新 tx_ctx失败时保持旧目标不变
* 2) 加锁替换指针与目标参数
* 3) 解锁后关闭旧 tx_ctx避免持锁做慢操作
*/
OmniContext *new_tx = omni_init(OMNI_ROLE_CLIENT, st->proto,
NULL, 0,
ip, port);
if (!new_tx) {
logger_log("ERROR", "relay", "connect_target_failed ip=%s port=%u",
ip, (unsigned)port);
return OMNI_ERR_IO;
}
pthread_mutex_lock(&st->tx_mu);
OmniContext *old_tx = st->tx_ctx;
st->tx_ctx = new_tx;
snprintf(st->target_ip, sizeof(st->target_ip), "%s", ip);
st->target_port = port;
pthread_mutex_unlock(&st->tx_mu);
if (old_tx) {
omni_close(old_tx);
}
logger_log("INFO", "relay", "target_updated ip=%s port=%u", ip, (unsigned)port);
return OMNI_OK;
}
static void *control_thread_main(void *arg)
{
/* 控制线程负责解析 stdin 命令。 */
RelayState *st = (RelayState *)arg;
char line[256];
while (atomic_load(&st->running)) {
if (!fgets(line, sizeof(line), stdin)) {
/*
* 管道/重定向 EOF 时不要立刻退出 relay
* - 清理 EOF 状态
* - 短暂休眠后继续循环
* 这样 relay 仍可继续处理主数据面转发。
*/
clearerr(stdin);
usleep(100 * 1000);
continue;
}
size_t len = strlen(line);
while (len > 0 && (line[len - 1] == '\n' || line[len - 1] == '\r')) {
line[--len] = '\0';
}
if (len == 0) {
continue;
}
if (strcmp(line, "quit") == 0) {
/* 通知主线程退出。 */
atomic_store(&st->running, 0);
break;
}
if (strcmp(line, "show") == 0) {
/* 在锁保护下读取目标快照,避免与 set 并发冲突。 */
pthread_mutex_lock(&st->tx_mu);
fprintf(stderr, "relay target: %s:%u\n",
st->target_ip[0] ? st->target_ip : "N/A",
(unsigned)st->target_port);
pthread_mutex_unlock(&st->tx_mu);
continue;
}
char ip[64];
unsigned port = 0;
if (sscanf(line, "set %63s %u", ip, &port) == 2 && port > 0 && port <= 65535u) {
/* 动态切目标。 */
relay_set_target(st, ip, (uint16_t)port);
continue;
}
fprintf(stderr, "unknown command: %s\n", line);
fprintf(stderr, "commands: set <ip> <port> | show | quit\n");
}
return NULL;
}
int main(int argc, char **argv)
{
/* 命令行参数默认值。 */
const char *proto_str = "tcp";
const char *target_ip = NULL;
int listen_port = 0;
int target_port = 0;
int opt;
while ((opt = getopt(argc, argv, "p:L:H:P:")) != -1) {
switch (opt) {
case 'p':
proto_str = optarg;
break;
case 'L':
listen_port = atoi(optarg);
break;
case 'H':
target_ip = optarg;
break;
case 'P':
target_port = atoi(optarg);
break;
default:
usage(argv[0]);
return 1;
}
}
if (!target_ip || listen_port <= 0 || target_port <= 0) {
usage(argv[0]);
return 1;
}
RelayState st;
memset(&st, 0, sizeof(st));
st.proto = parse_proto(proto_str);
atomic_init(&st.running, 1);
pthread_mutex_init(&st.tx_mu, NULL);
/*
* rx_ctx 作为上游入口server 角色):
* - TCP: 等待上游 connect
* - UDP/KCP: 绑定监听端口接收上游包
*/
st.rx_ctx = omni_init(OMNI_ROLE_SERVER, st.proto, NULL, (uint16_t)listen_port, NULL, 0);
if (!st.rx_ctx) {
fprintf(stderr, "relay: omni_init rx failed\n");
pthread_mutex_destroy(&st.tx_mu);
return 1;
}
/*
* 初始目标连接失败不直接退出:
* - relay 可先启动数据入口
* - 后续通过 set 命令修复目标地址
*/
(void)relay_set_target(&st, target_ip, (uint16_t)target_port);
/* 启动控制面线程。 */
pthread_t ctrl_tid;
if (pthread_create(&ctrl_tid, NULL, control_thread_main, &st) != 0) {
perror("pthread_create");
omni_close(st.rx_ctx);
if (st.tx_ctx) omni_close(st.tx_ctx);
pthread_mutex_destroy(&st.tx_mu);
return 1;
}
uint8_t buf[RELAY_BUF_SIZE];
while (atomic_load(&st.running)) {
/* 数据面:收上游 -> 转发下游。 */
ssize_t n = omni_recv(st.rx_ctx, buf, sizeof(buf));
if (n < 0) {
logger_log("ERROR", "relay", "recv_failed n=%zd", n);
break;
}
if (n == 0) {
/* 暂时无数据时短暂退避。 */
usleep(2 * 1000);
continue;
}
ssize_t m = OMNI_ERR_PARAM;
/* 发送上下文受锁保护,防止与 set 命令并发替换。 */
pthread_mutex_lock(&st.tx_mu);
if (st.tx_ctx) {
m = omni_send(st.tx_ctx, buf, (size_t)n);
}
pthread_mutex_unlock(&st.tx_mu);
if (m != n) {
logger_log("ERROR", "relay", "forward_failed in=%zd out=%zd", n, m);
} else {
logger_log("INFO", "relay", "forward_ok bytes=%zd", n);
}
}
/* 收尾:先停主循环,再依次释放 rx / 控制线程 / tx。 */
atomic_store(&st.running, 0);
omni_close(st.rx_ctx);
pthread_join(ctrl_tid, NULL);
pthread_mutex_lock(&st.tx_mu);
OmniContext *tx = st.tx_ctx;
st.tx_ctx = NULL;
pthread_mutex_unlock(&st.tx_mu);
if (tx) {
omni_close(tx);
}
pthread_mutex_destroy(&st.tx_mu);
return 0;
}

303
src/apps/server_main.c Normal file
View File

@@ -0,0 +1,303 @@
/*
* server_main.c
* 服务端:接收文件写盘;主线程监听键盘输入并发送 ASCII 指令到客户端
*
* 线程模型:
* - 接收线程:持续收业务帧,写入文件,直到 FILE_END
* - 主线程:在交互终端下读取 stdin发送 COMMAND 给客户端
*
* 说明:
* - 当 stdin 不是 TTY例如被脚本后台拉起主线程不做交互输入
* 仅等待接收线程完成传输,便于自动化测试稳定运行。
*/
#include "common.h"
#include "network.h"
#include "logger.h"
#include <pthread.h>
#include <stdatomic.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#define SERVER_FRAME_BUF_SIZE (MSG_HEADER_SIZE + 65536u)
typedef struct ServerRuntime {
/* 协议抽象层句柄。 */
OmniContext *ctx;
/* 当前运行协议,用于区分 recv 返回 0 的语义TCP=对端关闭)。 */
OmniProtocol proto;
/* 接收文件写入目标。 */
FILE *out_fp;
/* 全局运行标记。 */
atomic_int running;
/* 收到 FILE_END 后置 1。 */
atomic_int transfer_done;
/* 已成功写入的文件字节数。 */
uint64_t bytes_written;
} ServerRuntime;
static void usage(const char *prog)
{
fprintf(stderr,
"Usage:\n"
" %s -p tcp|udp|kcp -P <listen_port> -o <output_file> [-b <bind_ip>]\n",
prog);
}
static OmniProtocol parse_proto(const char *s)
{
/* 输入不合法时回退到 TCP。 */
if (!s) return OMNI_PROTO_TCP;
if (strcmp(s, "tcp") == 0) return OMNI_PROTO_TCP;
if (strcmp(s, "udp") == 0) return OMNI_PROTO_UDP;
if (strcmp(s, "kcp") == 0) return OMNI_PROTO_KCP;
return OMNI_PROTO_TCP;
}
static int send_app_message(OmniContext *ctx,
uint32_t type,
const void *payload,
uint32_t payload_len)
{
/* 与客户端保持一致的统一发包函数。 */
size_t total_len = MSG_HEADER_SIZE + (size_t)payload_len;
uint8_t *frame = (uint8_t *)malloc(total_len);
if (!frame) {
logger_log("ERROR", "server", "malloc_frame_failed len=%zu", total_len);
return OMNI_ERR_GENERIC;
}
MsgHeader hdr;
omni_msg_header_encode(&hdr, type, payload_len, omni_now_ms());
memcpy(frame, &hdr, MSG_HEADER_SIZE);
if (payload_len > 0 && payload) {
memcpy(frame + MSG_HEADER_SIZE, payload, payload_len);
}
ssize_t n = omni_send(ctx, frame, total_len);
free(frame);
if (n != (ssize_t)total_len) {
logger_log("ERROR", "server",
"omni_send_failed expect=%zu got=%zd type=%u",
total_len, n, (unsigned)type);
return OMNI_ERR_IO;
}
return OMNI_OK;
}
static int decode_app_message(const uint8_t *frame,
size_t frame_len,
MsgHeader *out_hdr,
const uint8_t **out_payload)
{
/* 与客户端一致的统一解包校验。 */
if (!frame || frame_len < MSG_HEADER_SIZE || !out_hdr || !out_payload) {
return OMNI_ERR_PARAM;
}
MsgHeader net_hdr;
memcpy(&net_hdr, frame, MSG_HEADER_SIZE);
omni_msg_header_decode(&net_hdr, out_hdr);
if ((size_t)out_hdr->len + MSG_HEADER_SIZE != frame_len) {
return OMNI_ERR_IO;
}
*out_payload = frame + MSG_HEADER_SIZE;
return OMNI_OK;
}
static void *recv_thread_main(void *arg)
{
ServerRuntime *rt = (ServerRuntime *)arg;
uint8_t frame[SERVER_FRAME_BUF_SIZE];
/* 允许主线程在退出时取消本线程,避免阻塞 recv 导致无法收尾。 */
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
while (atomic_load(&rt->running)) {
ssize_t n = omni_recv(rt->ctx, frame, sizeof(frame));
if (n < 0) {
logger_log("ERROR", "server", "recv_failed n=%zd", n);
break;
}
if (n == 0) {
/*
* recv 返回 0 的语义依赖协议:
* - TCP: 对端连接关闭,接收线程可退出
* - UDP/KCP: 可能仅表示当前无可读数据,继续等待
*/
if (rt->proto == OMNI_PROTO_TCP) {
logger_log("INFO", "server", "tcp_peer_closed");
break;
}
usleep(2 * 1000);
continue;
}
MsgHeader hdr;
const uint8_t *payload = NULL;
int rc = decode_app_message(frame, (size_t)n, &hdr, &payload);
if (rc != OMNI_OK) {
logger_log("ERROR", "server", "invalid_app_frame bytes=%zd rc=%d", n, rc);
continue;
}
if (hdr.type == MSG_TYPE_FILE_CHUNK) {
/* 文件分片:直接按顺序落盘。 */
size_t nw = fwrite(payload, 1, hdr.len, rt->out_fp);
if (nw != hdr.len) {
logger_log("ERROR", "server", "fwrite_failed expect=%u got=%zu",
(unsigned)hdr.len, nw);
break;
}
rt->bytes_written += nw;
} else if (hdr.type == MSG_TYPE_FILE_END) {
/*
* 文件接收结束:
* - 仅置位 transfer_done
* - 不退出线程,让服务端在交互模式下继续保持长连接并可下发指令
*/
fflush(rt->out_fp);
atomic_store(&rt->transfer_done, 1);
logger_log("INFO", "server", "file_transfer_end bytes=%llu",
(unsigned long long)rt->bytes_written);
continue;
} else if (hdr.type == MSG_TYPE_COMMAND) {
/* 当前服务端不处理“来自客户端”的 COMMAND仅记录日志。 */
logger_log("INFO", "server",
"recv_command_from_peer len=%u (ignored)",
(unsigned)hdr.len);
} else {
logger_log("INFO", "server",
"recv_unknown_type type=%u len=%u",
(unsigned)hdr.type, (unsigned)hdr.len);
}
}
atomic_store(&rt->running, 0);
return NULL;
}
int main(int argc, char **argv)
{
/* 命令行参数默认值。 */
const char *proto_str = "tcp";
const char *bind_ip = NULL;
const char *output_path = NULL;
int listen_port = 0;
int opt;
while ((opt = getopt(argc, argv, "p:b:P:o:")) != -1) {
switch (opt) {
case 'p':
proto_str = optarg;
break;
case 'b':
bind_ip = optarg;
break;
case 'P':
listen_port = atoi(optarg);
break;
case 'o':
output_path = optarg;
break;
default:
usage(argv[0]);
return 1;
}
}
if (listen_port <= 0 || !output_path) {
usage(argv[0]);
return 1;
}
FILE *out_fp = fopen(output_path, "wb");
if (!out_fp) {
perror("fopen");
return 1;
}
OmniProtocol proto = parse_proto(proto_str);
/* 服务端角色:仅监听本地端口。 */
OmniContext *ctx = omni_init(OMNI_ROLE_SERVER, proto,
bind_ip, (uint16_t)listen_port,
NULL, 0);
if (!ctx) {
fclose(out_fp);
fprintf(stderr, "omni_init failed\n");
return 1;
}
ServerRuntime rt;
rt.ctx = ctx;
rt.proto = proto;
rt.out_fp = out_fp;
rt.bytes_written = 0;
atomic_init(&rt.running, 1);
atomic_init(&rt.transfer_done, 0);
/* 启动接收线程处理文件写入主流程。 */
pthread_t recv_tid;
if (pthread_create(&recv_tid, NULL, recv_thread_main, &rt) != 0) {
perror("pthread_create");
omni_close(ctx);
fclose(out_fp);
return 1;
}
if (isatty(STDIN_FILENO)) {
/*
* 交互模式:
* - 每次回车读取一行
* - 非空行封装为 COMMAND 发送给客户端
*/
char line[2048];
while (atomic_load(&rt.running)) {
if (!fgets(line, sizeof(line), stdin)) {
break;
}
size_t len = strlen(line);
while (len > 0 && (line[len - 1] == '\n' || line[len - 1] == '\r')) {
line[--len] = '\0';
}
if (len == 0) {
continue;
}
if (strcmp(line, "quit") == 0) {
/* 主动退出交互循环。 */
break;
}
int rc = send_app_message(ctx, MSG_TYPE_COMMAND, line, (uint32_t)len);
if (rc != OMNI_OK) {
logger_log("ERROR", "server", "send_command_failed");
break;
}
}
} else {
/*
* 非交互模式(如脚本后台):
* 只等待接收线程将 transfer_done 置位,避免阻塞在 stdin。
*/
while (atomic_load(&rt.running) && !atomic_load(&rt.transfer_done)) {
usleep(100 * 1000);
}
}
/* 收尾:取消接收线程 -> join -> 关闭网络 -> 关闭文件。 */
atomic_store(&rt.running, 0);
pthread_cancel(recv_tid);
pthread_join(recv_tid, NULL);
omni_close(ctx);
fclose(out_fp);
return 0;
}

View File

@@ -1,7 +1,13 @@
/*
* tcp_impl.c
* TCP 协议实现,带 16 字节包头解决粘包
*/
/*
* tcp_impl.c
* TCP 协议实现,带 16 字节包头解决粘包
*
* 设计说明:
* 1) TCP 是字节流,天然没有消息边界,因此这里通过“固定 16 字节头 + payload 长度”
* 显式划分消息边界,避免粘包/拆包带来的上层读取混乱。
* 2) 本层的头只用于“流边界管理”,上层业务仍可在 payload 中定义自己的消息头。
* 3) send/recv 均采用阻塞全量读写语义:要么完整收发一帧,要么返回错误/关闭状态。
*/
#include "common.h"
#include "network.h"
@@ -20,9 +26,10 @@
/* Linux 下 TCP_INFO 定义通常已在 <netinet/tcp.h> 提供,避免引入 <linux/tcp.h> 重定义 */
struct TcpContext {
int fd;
};
struct TcpContext {
/* 已建立连接的 socket fd服务端 accept 后或客户端 connect 后)。 */
int fd;
};
#ifdef __linux__
static void tcp_log_info(int fd, const char *tag)
@@ -64,29 +71,33 @@ static void tcp_log_info(int fd, const char *tag)
}
#endif
static int tcp_set_nodelay(int fd)
{
int flag = 1;
return setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(flag));
}
static int tcp_set_nodelay(int fd)
{
/* 关闭 Nagle降低小包时延更利于交互指令场景。 */
int flag = 1;
return setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(flag));
}
static int tcp_set_reuseaddr(int fd)
{
/* 允许端口快速复用,减少开发/测试时 TIME_WAIT 影响。 */
int flag = 1;
return setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof(flag));
}
static int tcp_set_reuseaddr(int fd)
{
int flag = 1;
return setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof(flag));
}
static int tcp_bind_and_listen(struct TcpContext *ctx,
const char *bind_ip,
uint16_t bind_port)
{
int fd = socket(AF_INET, SOCK_STREAM, 0);
static int tcp_bind_and_listen(struct TcpContext *ctx,
const char *bind_ip,
uint16_t bind_port)
{
/* 创建监听 socket。 */
int fd = socket(AF_INET, SOCK_STREAM, 0);
if (fd < 0) {
logger_log("ERROR", "tcp", "socket_failed errno=%d", errno);
return -1;
}
tcp_set_reuseaddr(fd);
/* 监听 socket 打开地址复用。 */
tcp_set_reuseaddr(fd);
struct sockaddr_in addr;
memset(&addr, 0, sizeof(addr));
@@ -100,7 +111,11 @@ static int tcp_bind_and_listen(struct TcpContext *ctx,
return -1;
}
if (listen(fd, 1) < 0) {
/*
* 这里 backlog 取 1符合当前“单连接演示/测试”场景。
* 若后续要支持多客户端,可提升 backlog 并改为事件循环/线程池模型。
*/
if (listen(fd, 1) < 0) {
logger_log("ERROR", "tcp", "listen_failed errno=%d", errno);
close(fd);
return -1;
@@ -108,26 +123,28 @@ static int tcp_bind_and_listen(struct TcpContext *ctx,
logger_log("INFO", "tcp", "listening port=%u", (unsigned)bind_port);
/* 简化:阻塞接受一个客户端,之后用于长连接 */
int cfd = accept(fd, NULL, NULL);
/* 简化:阻塞接受一个客户端,连接建立后作为长连接使用。 */
int cfd = accept(fd, NULL, NULL);
if (cfd < 0) {
logger_log("ERROR", "tcp", "accept_failed errno=%d", errno);
close(fd);
return -1;
}
close(fd);
tcp_set_nodelay(cfd);
/* 监听 fd 仅用于 accept一旦接入成功即可关闭监听 fd。 */
close(fd);
tcp_set_nodelay(cfd);
ctx->fd = cfd;
return 0;
}
static int tcp_connect_peer(struct TcpContext *ctx,
const char *peer_ip,
uint16_t peer_port)
{
int fd = socket(AF_INET, SOCK_STREAM, 0);
static int tcp_connect_peer(struct TcpContext *ctx,
const char *peer_ip,
uint16_t peer_port)
{
/* 创建主动连接 socket。 */
int fd = socket(AF_INET, SOCK_STREAM, 0);
if (fd < 0) {
logger_log("ERROR", "tcp", "socket_failed errno=%d", errno);
return -1;
@@ -139,7 +156,8 @@ static int tcp_connect_peer(struct TcpContext *ctx,
addr.sin_port = htons(peer_port);
addr.sin_addr.s_addr = inet_addr(peer_ip);
if (connect(fd, (struct sockaddr *)&addr, sizeof(addr)) < 0) {
/* 阻塞 connect 到对端。 */
if (connect(fd, (struct sockaddr *)&addr, sizeof(addr)) < 0) {
logger_log("ERROR", "tcp", "connect_failed errno=%d", errno);
close(fd);
return -1;
@@ -152,10 +170,16 @@ static int tcp_connect_peer(struct TcpContext *ctx,
return 0;
}
static ssize_t tcp_read_n(int fd, void *buf, size_t n)
{
size_t off = 0;
char *p = (char *)buf;
static ssize_t tcp_read_n(int fd, void *buf, size_t n)
{
/*
* 从 TCP 流中“恰好读取 n 字节”:
* - 正常返回 n
* - 返回 0 表示对端关闭(如果发生在中途,返回已读字节数)
* - 返回 -1 表示系统调用错误
*/
size_t off = 0;
char *p = (char *)buf;
while (off < n) {
ssize_t r = read(fd, p + off, n - off);
if (r < 0) {
@@ -170,10 +194,15 @@ static ssize_t tcp_read_n(int fd, void *buf, size_t n)
return (ssize_t)off;
}
static ssize_t tcp_write_n(int fd, const void *buf, size_t n)
{
size_t off = 0;
const char *p = (const char *)buf;
static ssize_t tcp_write_n(int fd, const void *buf, size_t n)
{
/*
* 向 TCP 流中“恰好写入 n 字节”:
* - EINTR 自动重试
* - 其余错误返回 -1
*/
size_t off = 0;
const char *p = (const char *)buf;
while (off < n) {
ssize_t r = write(fd, p + off, n - off);
if (r < 0) {
@@ -185,19 +214,21 @@ static ssize_t tcp_write_n(int fd, const void *buf, size_t n)
return (ssize_t)off;
}
static OmniContext *tcp_init(OmniRole role,
const char *bind_ip,
uint16_t bind_port,
const char *peer_ip,
uint16_t peer_port)
{
struct TcpContext *ctx = (struct TcpContext *)calloc(1, sizeof(*ctx));
static OmniContext *tcp_init(OmniRole role,
const char *bind_ip,
uint16_t bind_port,
const char *peer_ip,
uint16_t peer_port)
{
/* 协议私有上下文(通过 OmniContext* 向上层做不透明传递)。 */
struct TcpContext *ctx = (struct TcpContext *)calloc(1, sizeof(*ctx));
if (!ctx) {
return NULL;
}
int rc;
if (role == OMNI_ROLE_SERVER) {
/* 按角色决定是被动监听还是主动连接。 */
if (role == OMNI_ROLE_SERVER) {
rc = tcp_bind_and_listen(ctx, bind_ip, bind_port);
} else {
if (!peer_ip || peer_port == 0) {
@@ -216,21 +247,24 @@ static OmniContext *tcp_init(OmniRole role,
return (OmniContext *)ctx;
}
static ssize_t tcp_send(OmniContext *c, const void *buf, size_t len)
{
struct TcpContext *ctx = (struct TcpContext *)c;
if (!ctx || ctx->fd < 0) return OMNI_ERR_PARAM;
static ssize_t tcp_send(OmniContext *c, const void *buf, size_t len)
{
struct TcpContext *ctx = (struct TcpContext *)c;
if (!ctx || ctx->fd < 0) return OMNI_ERR_PARAM;
/*
* 外层 TCP 帧头16B仅用于切分消息边界。
* 当前 type 统一标记为 MSG_TYPE_RAW表示“payload 是上层透传内容”。
*/
uint64_t t0 = omni_now_ms();
MsgHeader hdr;
omni_msg_header_encode(&hdr, MSG_TYPE_RAW, (uint32_t)len, t0);
uint8_t header_buf[MSG_HEADER_SIZE];
memcpy(header_buf, &hdr, MSG_HEADER_SIZE);
uint64_t t0 = omni_now_ms();
MsgHeader hdr;
hdr.magic = htonl(MSG_MAGIC);
hdr.length = htonl((uint32_t)len);
hdr.seq = 0; /* 如有需要,上层可扩展维护序列号 */
uint8_t header_buf[MSG_HEADER_SIZE];
memcpy(header_buf, &hdr, MSG_HEADER_SIZE);
ssize_t n1 = tcp_write_n(ctx->fd, header_buf, MSG_HEADER_SIZE);
/* 先写固定头,再写 payload接收侧可据此恢复完整帧。 */
ssize_t n1 = tcp_write_n(ctx->fd, header_buf, MSG_HEADER_SIZE);
if (n1 != (ssize_t)MSG_HEADER_SIZE) {
return OMNI_ERR_IO;
}
@@ -240,8 +274,9 @@ static ssize_t tcp_send(OmniContext *c, const void *buf, size_t len)
return OMNI_ERR_IO;
}
uint64_t t1 = omni_now_ms();
logger_on_proto_send_latency(t1 - t0);
uint64_t t1 = omni_now_ms();
/* 记录协议层发送耗时,便于后续性能分析。 */
logger_on_proto_send_latency(t1 - t0);
logger_log("DEBUG", "tcp", "send payload_bytes=%zu header_bytes=%zu proto_ms=%llu",
len, (size_t)MSG_HEADER_SIZE, (unsigned long long)(t1 - t0));
#ifdef __linux__
@@ -250,13 +285,19 @@ static ssize_t tcp_send(OmniContext *c, const void *buf, size_t len)
return (ssize_t)len;
}
static ssize_t tcp_recv(OmniContext *c, void *buf, size_t len)
{
struct TcpContext *ctx = (struct TcpContext *)c;
if (!ctx || ctx->fd < 0) return OMNI_ERR_PARAM;
uint64_t t0 = omni_now_ms();
uint8_t header_buf[MSG_HEADER_SIZE];
static ssize_t tcp_recv(OmniContext *c, void *buf, size_t len)
{
struct TcpContext *ctx = (struct TcpContext *)c;
if (!ctx || ctx->fd < 0) return OMNI_ERR_PARAM;
/*
* 收包流程:
* 1) 固定先读 16 字节头
* 2) 解析 payload_len
* 3) 再读 payload_len 字节
*/
uint64_t t0 = omni_now_ms();
uint8_t header_buf[MSG_HEADER_SIZE];
ssize_t n1 = tcp_read_n(ctx->fd, header_buf, MSG_HEADER_SIZE);
if (n1 <= 0) {
return n1; /* 0 表示对端关闭,负数为错误 */
@@ -265,18 +306,21 @@ static ssize_t tcp_recv(OmniContext *c, void *buf, size_t len)
return OMNI_ERR_IO;
}
MsgHeader hdr;
memcpy(&hdr, header_buf, MSG_HEADER_SIZE);
if (ntohl(hdr.magic) != MSG_MAGIC) {
logger_log("ERROR", "tcp", "invalid_magic");
return OMNI_ERR_IO;
}
uint32_t payload_len = ntohl(hdr.length);
if (payload_len > len) {
logger_log("ERROR", "tcp", "buffer_too_small payload=%u buf_len=%zu",
payload_len, len);
/* 简化:这里返回错误,实际可考虑丢弃或扩展缓冲区 */
/* 解码网络字节序头字段。 */
MsgHeader hdr;
MsgHeader host_hdr;
memcpy(&hdr, header_buf, MSG_HEADER_SIZE);
omni_msg_header_decode(&hdr, &host_hdr);
uint32_t payload_len = host_hdr.len;
/*
* 调用方缓冲区不足时直接报错。
* 当前实现不做“读取并丢弃剩余字节”,因此调用方应保证 recv 缓冲足够大。
*/
if (payload_len > len) {
logger_log("ERROR", "tcp", "buffer_too_small payload=%u buf_len=%zu",
payload_len, len);
/* 简化:这里返回错误,实际可考虑丢弃或扩展缓冲区 */
return OMNI_ERR_PARAM;
}
@@ -285,23 +329,29 @@ static ssize_t tcp_recv(OmniContext *c, void *buf, size_t len)
return OMNI_ERR_IO;
}
uint64_t t1 = omni_now_ms();
logger_on_proto_recv_latency(t1 - t0);
logger_log("DEBUG", "tcp", "recv payload_bytes=%u header_bytes=%zu proto_ms=%llu",
payload_len, (size_t)MSG_HEADER_SIZE, (unsigned long long)(t1 - t0));
uint64_t t1 = omni_now_ms();
/* 记录协议层接收耗时。 */
logger_on_proto_recv_latency(t1 - t0);
logger_log("DEBUG", "tcp",
"recv payload_bytes=%u header_bytes=%zu msg_type=%u ts_ms=%llu proto_ms=%llu",
payload_len, (size_t)MSG_HEADER_SIZE,
(unsigned)host_hdr.type,
(unsigned long long)host_hdr.timestamp,
(unsigned long long)(t1 - t0));
#ifdef __linux__
tcp_log_info(ctx->fd, "after_recv");
#endif
return (ssize_t)payload_len;
}
static void tcp_close(OmniContext *c)
{
struct TcpContext *ctx = (struct TcpContext *)c;
if (!ctx) return;
if (ctx->fd >= 0) {
close(ctx->fd);
}
static void tcp_close(OmniContext *c)
{
struct TcpContext *ctx = (struct TcpContext *)c;
if (!ctx) return;
/* 关闭连接并释放私有上下文。 */
if (ctx->fd >= 0) {
close(ctx->fd);
}
free(ctx);
}