/* * hub_main.c * 云端 hub:维护 client_id -> 连接 的映射,并负责 register / bind / tunnel 路由 * * 当前阶段只实现 TCP 控制面: * - 多个 peer 主动连接 hub * - peer 先 REGISTER 自己的逻辑 ID * - peer 可 BIND 默认目标 * - peer 发送 TUNNEL 后,hub 根据 dst_id 转发给目标 * * 后续文件/视频消息可以直接复用 MSG_TYPE_PEER_TUNNEL 的 inner_type。 */ #include "common.h" #include "logger.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #define HUB_MAX_PAYLOAD (PEER_TUNNEL_META_SIZE + 65536u) #define HUB_BACKLOG 64 typedef struct HubState HubState; typedef struct HubClient { HubState *hub; int fd; pthread_t tid; pthread_mutex_t write_mu; atomic_int running; char client_id[OMNI_PEER_ID_SIZE]; char bound_peer[OMNI_PEER_ID_SIZE]; char remote_ip[64]; uint16_t remote_port; struct HubClient *next; } HubClient; struct HubState { int listen_fd; atomic_int running; pthread_mutex_t mu; HubClient *clients; }; static volatile sig_atomic_t g_stop = 0; static void on_signal(int signo) { (void)signo; g_stop = 1; } static void install_signal_handlers(void) { struct sigaction sa; memset(&sa, 0, sizeof(sa)); sa.sa_handler = on_signal; sigemptyset(&sa.sa_mask); (void)sigaction(SIGINT, &sa, NULL); (void)sigaction(SIGTERM, &sa, NULL); (void)signal(SIGPIPE, SIG_IGN); } static void usage(const char *prog) { fprintf(stderr, "Usage:\n" " %s -P [-b ] [-p tcp]\n", prog); } static int peer_id_is_valid(const char *id) { size_t len = 0; if (!id || !id[0]) { return 0; } for (len = 0; id[len] != '\0'; ++len) { unsigned char ch = (unsigned char)id[len]; if (len + 1u >= OMNI_PEER_ID_SIZE) { return 0; } if (!(isalnum(ch) || ch == '_' || ch == '-' || ch == '.')) { return 0; } } return 1; } static const char *safe_client_id(const HubClient *client) { if (!client || client->client_id[0] == '\0') { return "unregistered"; } return client->client_id; } static ssize_t read_n(int fd, void *buf, size_t n) { uint8_t *p = (uint8_t *)buf; size_t done = 0; while (done < n) { ssize_t rc = recv(fd, p + done, n - done, 0); if (rc == 0) { return 0; } if (rc < 0) { if (errno == EINTR) { continue; } return -1; } done += (size_t)rc; } return (ssize_t)done; } static ssize_t write_n(int fd, const void *buf, size_t n) { const uint8_t *p = (const uint8_t *)buf; size_t done = 0; while (done < n) { ssize_t rc = send(fd, p + done, n - done, 0); if (rc < 0) { if (errno == EINTR) { continue; } return -1; } done += (size_t)rc; } return (ssize_t)done; } static int recv_app_message(int fd, MsgHeader *out_hdr, uint8_t *payload_buf, size_t payload_cap) { MsgHeader net_hdr; ssize_t n; if (!out_hdr || !payload_buf) { return OMNI_ERR_PARAM; } n = read_n(fd, &net_hdr, MSG_HEADER_SIZE); if (n == 0) { return 0; } if (n != (ssize_t)MSG_HEADER_SIZE) { return OMNI_ERR_IO; } omni_msg_header_decode(&net_hdr, out_hdr); if (out_hdr->len > payload_cap) { logger_log("ERROR", "hub", "payload_too_large len=%u cap=%zu", (unsigned)out_hdr->len, payload_cap); return OMNI_ERR_IO; } if (out_hdr->len == 0) { return 1; } n = read_n(fd, payload_buf, out_hdr->len); if (n != (ssize_t)out_hdr->len) { return OMNI_ERR_IO; } logger_on_recv(MSG_HEADER_SIZE + out_hdr->len); logger_maybe_print_performance_log("hub_recv"); return 1; } static int send_app_message_locked(HubClient *client, uint32_t type, const void *payload, uint32_t payload_len) { MsgHeader hdr; uint8_t header_buf[MSG_HEADER_SIZE]; if (!client) { return OMNI_ERR_PARAM; } omni_msg_header_encode(&hdr, type, payload_len, omni_now_ms()); memcpy(header_buf, &hdr, sizeof(header_buf)); if (write_n(client->fd, header_buf, sizeof(header_buf)) != (ssize_t)sizeof(header_buf)) { return OMNI_ERR_IO; } if (payload_len > 0 && payload) { if (write_n(client->fd, payload, payload_len) != (ssize_t)payload_len) { return OMNI_ERR_IO; } } logger_on_send(MSG_HEADER_SIZE + payload_len); logger_maybe_print_performance_log("hub_send"); return OMNI_OK; } static int send_app_message(HubClient *client, uint32_t type, const void *payload, uint32_t payload_len) { int rc; if (!client) { return OMNI_ERR_PARAM; } pthread_mutex_lock(&client->write_mu); rc = send_app_message_locked(client, type, payload, payload_len); pthread_mutex_unlock(&client->write_mu); return rc; } static int send_status_locked(HubClient *client, uint32_t code, const char *self_id, const char *peer_id, const char *detail) { PeerStatusMeta status_meta; omni_peer_status_meta_encode(&status_meta, code, self_id, peer_id, detail); return send_app_message_locked(client, MSG_TYPE_PEER_STATUS, &status_meta, PEER_STATUS_META_SIZE); } static int send_status(HubClient *client, uint32_t code, const char *self_id, const char *peer_id, const char *detail) { PeerStatusMeta status_meta; omni_peer_status_meta_encode(&status_meta, code, self_id, peer_id, detail); return send_app_message(client, MSG_TYPE_PEER_STATUS, &status_meta, PEER_STATUS_META_SIZE); } static HubClient *find_client_locked(HubState *hub, const char *client_id) { HubClient *cur; for (cur = hub->clients; cur != NULL; cur = cur->next) { if (cur->client_id[0] == '\0') { continue; } if (strcmp(cur->client_id, client_id) == 0) { return cur; } } return NULL; } static void add_client_locked(HubState *hub, HubClient *client) { client->next = hub->clients; hub->clients = client; } static void remove_client_locked(HubState *hub, HubClient *client) { HubClient **pp = &hub->clients; while (*pp) { if (*pp == client) { *pp = client->next; client->next = NULL; return; } pp = &(*pp)->next; } } static void unregister_client(HubClient *client) { HubState *hub; HubClient **notify = NULL; size_t notify_count = 0; size_t notify_cap = 0; HubClient *cur; char departed_id[OMNI_PEER_ID_SIZE]; if (!client || !client->hub) { return; } hub = client->hub; memset(departed_id, 0, sizeof(departed_id)); omni_copy_fixed_ascii(departed_id, sizeof(departed_id), client->client_id); pthread_mutex_lock(&hub->mu); remove_client_locked(hub, client); if (departed_id[0] != '\0') { for (cur = hub->clients; cur != NULL; cur = cur->next) { if (strcmp(cur->bound_peer, departed_id) != 0) { continue; } if (notify_count == notify_cap) { size_t new_cap = (notify_cap == 0) ? 4u : notify_cap * 2u; HubClient **new_notify = (HubClient **)realloc(notify, new_cap * sizeof(*new_notify)); if (!new_notify) { break; } notify = new_notify; notify_cap = new_cap; } cur->bound_peer[0] = '\0'; pthread_mutex_lock(&cur->write_mu); notify[notify_count++] = cur; } } pthread_mutex_unlock(&hub->mu); for (size_t i = 0; i < notify_count; ++i) { (void)send_status_locked(notify[i], PEER_STATUS_UNBOUND, notify[i]->client_id, departed_id, "peer_offline binding_cleared"); pthread_mutex_unlock(¬ify[i]->write_mu); } free(notify); } static void close_client(HubClient *client) { if (!client) { return; } pthread_mutex_lock(&client->write_mu); if (client->fd >= 0) { shutdown(client->fd, SHUT_RDWR); close(client->fd); client->fd = -1; } pthread_mutex_unlock(&client->write_mu); } static int handle_register(HubClient *client, const uint8_t *payload, uint32_t payload_len) { PeerRegisterMeta register_meta; char detail[128]; logger_log("DEBUG", "hub", "handle_register remote=%s:%u payload_len=%u", client->remote_ip, (unsigned)client->remote_port, (unsigned)payload_len); if (payload_len < PEER_REGISTER_META_SIZE) { return send_status(client, PEER_STATUS_ERROR, NULL, NULL, "short_register_payload"); } omni_peer_register_meta_decode((const PeerRegisterMeta *)payload, ®ister_meta); if (!peer_id_is_valid(register_meta.client_id)) { return send_status(client, PEER_STATUS_ERROR, NULL, NULL, "invalid_client_id"); } pthread_mutex_lock(&client->hub->mu); if (client->client_id[0] != '\0') { pthread_mutex_unlock(&client->hub->mu); return send_status(client, PEER_STATUS_ERROR, client->client_id, NULL, "already_registered"); } if (find_client_locked(client->hub, register_meta.client_id) != NULL) { pthread_mutex_unlock(&client->hub->mu); return send_status(client, PEER_STATUS_ERROR, NULL, register_meta.client_id, "client_id_in_use"); } omni_copy_fixed_ascii(client->client_id, sizeof(client->client_id), register_meta.client_id); pthread_mutex_unlock(&client->hub->mu); snprintf(detail, sizeof(detail), "registered remote=%s:%u", client->remote_ip, (unsigned)client->remote_port); logger_log("INFO", "hub", "client_registered client_id=%s remote=%s:%u", client->client_id, client->remote_ip, (unsigned)client->remote_port); return send_status(client, PEER_STATUS_REGISTERED, client->client_id, NULL, detail); } static int handle_bind(HubClient *client, const uint8_t *payload, uint32_t payload_len) { PeerBindMeta bind_meta; HubClient *target; logger_log("DEBUG", "hub", "handle_bind client_id=%s payload_len=%u", safe_client_id(client), (unsigned)payload_len); if (client->client_id[0] == '\0') { return send_status(client, PEER_STATUS_ERROR, NULL, NULL, "register_first"); } if (payload_len < PEER_BIND_META_SIZE) { return send_status(client, PEER_STATUS_ERROR, client->client_id, NULL, "short_bind_payload"); } omni_peer_bind_meta_decode((const PeerBindMeta *)payload, &bind_meta); if (!peer_id_is_valid(bind_meta.peer_id)) { return send_status(client, PEER_STATUS_ERROR, client->client_id, NULL, "invalid_peer_id"); } if (strcmp(bind_meta.peer_id, client->client_id) == 0) { return send_status(client, PEER_STATUS_ERROR, client->client_id, bind_meta.peer_id, "cannot_bind_self"); } pthread_mutex_lock(&client->hub->mu); target = find_client_locked(client->hub, bind_meta.peer_id); if (!target) { pthread_mutex_unlock(&client->hub->mu); return send_status(client, PEER_STATUS_ERROR, client->client_id, bind_meta.peer_id, "peer_not_online"); } omni_copy_fixed_ascii(client->bound_peer, sizeof(client->bound_peer), bind_meta.peer_id); pthread_mutex_unlock(&client->hub->mu); logger_log("INFO", "hub", "peer_bound client_id=%s peer_id=%s", client->client_id, bind_meta.peer_id); return send_status(client, PEER_STATUS_BOUND, client->client_id, bind_meta.peer_id, "bind_ok"); } static int handle_tunnel(HubClient *client, const uint8_t *payload, uint32_t payload_len) { PeerTunnelMeta tunnel_meta; HubClient *target = NULL; char effective_dst[OMNI_PEER_ID_SIZE]; uint8_t *forward_payload = NULL; uint32_t inner_len; int rc = OMNI_OK; logger_log("DEBUG", "hub", "handle_tunnel client_id=%s payload_len=%u", safe_client_id(client), (unsigned)payload_len); if (client->client_id[0] == '\0') { return send_status(client, PEER_STATUS_ERROR, NULL, NULL, "register_first"); } if (payload_len < PEER_TUNNEL_META_SIZE) { return send_status(client, PEER_STATUS_ERROR, client->client_id, NULL, "short_tunnel_payload"); } omni_peer_tunnel_meta_decode((const PeerTunnelMeta *)payload, &tunnel_meta); inner_len = payload_len - PEER_TUNNEL_META_SIZE; memset(effective_dst, 0, sizeof(effective_dst)); pthread_mutex_lock(&client->hub->mu); if (tunnel_meta.dst_id[0] != '\0') { omni_copy_fixed_ascii(effective_dst, sizeof(effective_dst), tunnel_meta.dst_id); } else { omni_copy_fixed_ascii(effective_dst, sizeof(effective_dst), client->bound_peer); } if (effective_dst[0] != '\0') { target = find_client_locked(client->hub, effective_dst); if (target) { pthread_mutex_lock(&target->write_mu); } } pthread_mutex_unlock(&client->hub->mu); if (!peer_id_is_valid(effective_dst)) { return send_status(client, PEER_STATUS_ERROR, client->client_id, NULL, "missing_or_invalid_destination"); } if (!target) { return send_status(client, PEER_STATUS_ERROR, client->client_id, effective_dst, "destination_not_online"); } forward_payload = (uint8_t *)malloc(payload_len); if (!forward_payload) { pthread_mutex_unlock(&target->write_mu); return send_status(client, PEER_STATUS_ERROR, client->client_id, effective_dst, "malloc_forward_payload_failed"); } { PeerTunnelMeta forward_meta; omni_peer_tunnel_meta_encode(&forward_meta, client->client_id, effective_dst, tunnel_meta.inner_type); memcpy(forward_payload, &forward_meta, PEER_TUNNEL_META_SIZE); if (inner_len > 0) { memcpy(forward_payload + PEER_TUNNEL_META_SIZE, payload + PEER_TUNNEL_META_SIZE, inner_len); } } rc = send_app_message_locked(target, MSG_TYPE_PEER_TUNNEL, forward_payload, payload_len); pthread_mutex_unlock(&target->write_mu); free(forward_payload); if (rc != OMNI_OK) { logger_log("ERROR", "hub", "forward_failed src_id=%s dst_id=%s inner_type=%u", client->client_id, effective_dst, (unsigned)tunnel_meta.inner_type); return send_status(client, PEER_STATUS_ERROR, client->client_id, effective_dst, "forward_failed"); } logger_log("INFO", "hub", "forward_ok src_id=%s dst_id=%s inner_type=%u payload_bytes=%u", client->client_id, effective_dst, (unsigned)tunnel_meta.inner_type, (unsigned)inner_len); return OMNI_OK; } static void *client_thread_main(void *arg) { HubClient *client = (HubClient *)arg; uint8_t payload[HUB_MAX_PAYLOAD]; while (atomic_load(&client->hub->running) && atomic_load(&client->running)) { MsgHeader hdr; int rc = recv_app_message(client->fd, &hdr, payload, sizeof(payload)); if (rc == 0) { logger_log("INFO", "hub", "client_closed client_id=%s remote=%s:%u", safe_client_id(client), client->remote_ip, (unsigned)client->remote_port); break; } if (rc < 0) { logger_log("ERROR", "hub", "client_recv_failed client_id=%s remote=%s:%u rc=%d", safe_client_id(client), client->remote_ip, (unsigned)client->remote_port, rc); break; } logger_log("DEBUG", "hub", "message_recv client_id=%s type=%u len=%u", safe_client_id(client), (unsigned)hdr.type, (unsigned)hdr.len); switch (hdr.type) { case MSG_TYPE_PEER_REGISTER: (void)handle_register(client, payload, hdr.len); break; case MSG_TYPE_PEER_BIND: (void)handle_bind(client, payload, hdr.len); break; case MSG_TYPE_PEER_TUNNEL: (void)handle_tunnel(client, payload, hdr.len); break; default: (void)send_status(client, PEER_STATUS_ERROR, client->client_id[0] ? client->client_id : NULL, NULL, "unsupported_message_type"); break; } } atomic_store(&client->running, 0); unregister_client(client); close_client(client); pthread_mutex_destroy(&client->write_mu); free(client); return NULL; } static int create_listen_socket(const char *bind_ip, uint16_t port) { int fd; int reuse = 1; struct sockaddr_in addr; fd = socket(AF_INET, SOCK_STREAM, 0); if (fd < 0) { return -1; } (void)setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)); memset(&addr, 0, sizeof(addr)); addr.sin_family = AF_INET; addr.sin_port = htons(port); if (bind_ip && bind_ip[0] != '\0') { if (inet_pton(AF_INET, bind_ip, &addr.sin_addr) != 1) { close(fd); return -1; } } else { addr.sin_addr.s_addr = htonl(INADDR_ANY); } if (bind(fd, (struct sockaddr *)&addr, sizeof(addr)) != 0) { close(fd); return -1; } if (listen(fd, HUB_BACKLOG) != 0) { close(fd); return -1; } return fd; } int main(int argc, char **argv) { const char *bind_ip = NULL; const char *proto_str = "tcp"; int listen_port = 0; int opt; HubState hub; while ((opt = getopt(argc, argv, "b:p:P:")) != -1) { switch (opt) { case 'b': bind_ip = optarg; break; case 'p': proto_str = optarg; break; case 'P': listen_port = atoi(optarg); break; default: usage(argv[0]); return 1; } } if (listen_port <= 0 || strcmp(proto_str, "tcp") != 0) { usage(argv[0]); return 1; } logger_init(); install_signal_handlers(); memset(&hub, 0, sizeof(hub)); atomic_init(&hub.running, 1); pthread_mutex_init(&hub.mu, NULL); hub.listen_fd = create_listen_socket(bind_ip, (uint16_t)listen_port); if (hub.listen_fd < 0) { perror("hub listen"); pthread_mutex_destroy(&hub.mu); return 1; } logger_log("INFO", "hub", "listening bind_ip=%s port=%u", bind_ip ? bind_ip : "0.0.0.0", (unsigned)listen_port); while (atomic_load(&hub.running) && !g_stop) { struct sockaddr_in peer_addr; socklen_t peer_len = sizeof(peer_addr); int cfd = accept(hub.listen_fd, (struct sockaddr *)&peer_addr, &peer_len); if (cfd < 0) { if (errno == EINTR && !g_stop) { continue; } if (g_stop) { break; } perror("hub accept"); break; } HubClient *client = (HubClient *)calloc(1, sizeof(*client)); if (!client) { close(cfd); continue; } client->hub = &hub; client->fd = cfd; atomic_init(&client->running, 1); pthread_mutex_init(&client->write_mu, NULL); if (!inet_ntop(AF_INET, &peer_addr.sin_addr, client->remote_ip, sizeof(client->remote_ip))) { omni_copy_fixed_ascii(client->remote_ip, sizeof(client->remote_ip), "unknown"); } client->remote_port = ntohs(peer_addr.sin_port); pthread_mutex_lock(&hub.mu); add_client_locked(&hub, client); pthread_mutex_unlock(&hub.mu); if (pthread_create(&client->tid, NULL, client_thread_main, client) != 0) { perror("hub pthread_create"); unregister_client(client); close_client(client); pthread_mutex_destroy(&client->write_mu); free(client); continue; } pthread_detach(client->tid); logger_log("INFO", "hub", "client_connected remote=%s:%u", client->remote_ip, (unsigned)client->remote_port); } atomic_store(&hub.running, 0); if (hub.listen_fd >= 0) { close(hub.listen_fd); } logger_print_performance_log("final"); return 0; }