feat: 长保持连接,控制端可重启

This commit is contained in:
Mock
2026-04-10 11:11:03 +08:00
parent 2033db7268
commit 79dba2a664
10 changed files with 930 additions and 105 deletions

View File

@@ -1,24 +1,315 @@
#include "peer_kcp_client.h"
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#define KCP_CLIENT_REGISTER_TIMEOUT_MS 3000
#define KCP_CLIENT_CTRL_REGISTER_OK "{\"type\":\"server_register_ok\"}"
#define KCP_CLIENT_CTRL_PEER_REPLACED "{\"type\":\"server_peer_replaced\",\"reason\":\"new_instance_wins\"}"
#define KCP_CLIENT_CTRL_HEARTBEAT "{\"type\":\"server_heartbeat\"}"
#define KCP_CLIENT_CTRL_HEARTBEAT_ACK "{\"type\":\"server_heartbeat_ack\"}"
struct kcp_client {
char id[OMNI_MAX_PEER_ID];
char server_addr[OMNI_MAX_ADDR_TEXT];
kcp_conn_t *conn;
latency_logger_t *logger;
pthread_mutex_t id_mu;
pthread_mutex_t state_mu;
uint64_t next_message_id;
int registered;
char last_server_error[256];
};
static int kcp_client_next_message_id(kcp_client_t *client, uint64_t *out_id) {
pthread_mutex_lock(&client->id_mu);
if (client == NULL || out_id == NULL) {
errno = EINVAL;
return -1;
}
pthread_mutex_lock(&client->state_mu);
*out_id = ++client->next_message_id;
pthread_mutex_unlock(&client->id_mu);
pthread_mutex_unlock(&client->state_mu);
return 0;
}
static void kcp_client_set_registered(kcp_client_t *client, int registered) {
if (client == NULL) {
return;
}
pthread_mutex_lock(&client->state_mu);
client->registered = registered != 0;
pthread_mutex_unlock(&client->state_mu);
}
static void kcp_client_set_last_server_error(kcp_client_t *client, const char *message) {
if (client == NULL) {
return;
}
pthread_mutex_lock(&client->state_mu);
snprintf(client->last_server_error, sizeof(client->last_server_error), "%s", message == NULL ? "" : message);
pthread_mutex_unlock(&client->state_mu);
}
static void kcp_client_clear_last_server_error(kcp_client_t *client) {
kcp_client_set_last_server_error(client, "");
}
static int kcp_client_is_registered(kcp_client_t *client) {
int registered;
if (client == NULL) {
return 0;
}
pthread_mutex_lock(&client->state_mu);
registered = client->registered;
pthread_mutex_unlock(&client->state_mu);
return registered;
}
static int kcp_client_text_body_equals(const message_t *msg, const char *payload) {
size_t expected_len;
if (msg == NULL || payload == NULL || msg->body == NULL) {
return 0;
}
expected_len = strlen(payload);
return msg->body_len == expected_len && memcmp(msg->body, payload, expected_len) == 0;
}
static void kcp_client_copy_server_error_body(const message_t *msg, char *buffer, size_t buffer_len) {
size_t copy_len;
if (buffer == NULL || buffer_len == 0) {
return;
}
buffer[0] = '\0';
if (msg == NULL || msg->body == NULL || msg->body_len == 0) {
return;
}
copy_len = msg->body_len < (buffer_len - 1U) ? msg->body_len : (buffer_len - 1U);
memcpy(buffer, msg->body, copy_len);
buffer[copy_len] = '\0';
}
static int kcp_client_registration_errno_from_message(const char *message) {
if (message == NULL || message[0] == '\0') {
return ECONNREFUSED;
}
if (strstr(message, "duplicate peer id") != NULL) {
return EEXIST;
}
if (strstr(message, "first message must be register") != NULL) {
return EPROTO;
}
return ECONNREFUSED;
}
static int kcp_client_send_text_internal(kcp_client_t *client, const char *to, const char *text, int log_business_event) {
message_t msg;
uint64_t id;
if (client == NULL || to == NULL || text == NULL || client->conn == NULL) {
errno = EINVAL;
return -1;
}
protocol_message_init(&msg);
if (kcp_client_next_message_id(client, &id) != 0) {
return -1;
}
msg.type = MSG_TYPE_TEXT;
msg.id = id;
snprintf(msg.from, sizeof(msg.from), "%s", client->id);
snprintf(msg.to, sizeof(msg.to), "%s", to);
msg.body = (uint8_t *) omni_strdup(text);
if (msg.body == NULL) {
return -1;
}
msg.body_len = strlen((const char *) msg.body);
if (log_business_event) {
latencylog_log_message_event(client->logger, OMNI_NODE_ROLE_PEER, client->id, EVENT_A_APP_PREP_BEGIN, &msg);
}
if (kcp_conn_send(client->conn, &msg) != 0) {
protocol_message_clear(&msg);
return -1;
}
protocol_message_clear(&msg);
return 0;
}
static int kcp_client_send_business_preflight(kcp_client_t *client) {
if (client == NULL || client->conn == NULL) {
errno = ENOTCONN;
return -1;
}
if (!kcp_client_is_registered(client)) {
errno = ENOTCONN;
return -1;
}
return 0;
}
static int kcp_client_handle_reserved_server_message(kcp_client_t *client, const message_t *msg) {
if (client == NULL || msg == NULL) {
errno = EINVAL;
return -1;
}
if (msg->type != MSG_TYPE_TEXT || strcmp(msg->from, SERVER_PEER_ID) != 0) {
return 0;
}
if (kcp_client_text_body_equals(msg, KCP_CLIENT_CTRL_REGISTER_OK)) {
kcp_client_set_registered(client, 1);
kcp_client_clear_last_server_error(client);
return 1;
}
if (kcp_client_text_body_equals(msg, KCP_CLIENT_CTRL_HEARTBEAT)) {
if (kcp_client_send_text_internal(client, SERVER_PEER_ID, KCP_CLIENT_CTRL_HEARTBEAT_ACK, 0) != 0) {
kcp_client_set_registered(client, 0);
kcp_client_set_last_server_error(client, "failed to acknowledge server heartbeat");
(void) kcp_conn_close(client->conn);
return -1;
}
return 1;
}
if (kcp_client_text_body_equals(msg, KCP_CLIENT_CTRL_HEARTBEAT_ACK)) {
return 1;
}
if (kcp_client_text_body_equals(msg, KCP_CLIENT_CTRL_PEER_REPLACED)) {
kcp_client_set_registered(client, 0);
kcp_client_set_last_server_error(client, "server peer replaced this session");
(void) kcp_conn_close(client->conn);
errno = ECONNRESET;
return -1;
}
return 0;
}
static int kcp_client_remaining_timeout_ms(int original_timeout_ms, uint32_t start_ms) {
uint32_t elapsed_ms;
if (original_timeout_ms < 0) {
return -1;
}
elapsed_ms = omni_now_millis32() - start_ms;
if (elapsed_ms >= (uint32_t) original_timeout_ms) {
return 0;
}
return original_timeout_ms - (int) elapsed_ms;
}
static int kcp_client_wait_for_register_ok(kcp_client_t *client) {
uint32_t start_ms;
if (client == NULL || client->conn == NULL) {
errno = EINVAL;
return -1;
}
start_ms = omni_now_millis32();
for (;;) {
message_t msg;
int rc;
int remaining_timeout_ms = kcp_client_remaining_timeout_ms(KCP_CLIENT_REGISTER_TIMEOUT_MS, start_ms);
if (remaining_timeout_ms <= 0) {
kcp_client_set_registered(client, 0);
kcp_client_set_last_server_error(client, "timed out waiting for server_register_ok");
(void) kcp_conn_close(client->conn);
errno = ETIMEDOUT;
return -1;
}
protocol_message_init(&msg);
rc = kcp_conn_receive_timed(client->conn, &msg, remaining_timeout_ms);
if (rc == 1) {
protocol_message_clear(&msg);
kcp_client_set_registered(client, 0);
kcp_client_set_last_server_error(client, "timed out waiting for server_register_ok");
(void) kcp_conn_close(client->conn);
errno = ETIMEDOUT;
return -1;
}
if (rc != 0) {
protocol_message_clear(&msg);
kcp_client_set_registered(client, 0);
return -1;
}
if (msg.type == MSG_TYPE_ERROR && strcmp(msg.from, SERVER_PEER_ID) == 0) {
char error_text[256];
kcp_client_copy_server_error_body(&msg, error_text, sizeof(error_text));
kcp_client_set_registered(client, 0);
kcp_client_set_last_server_error(client, error_text);
protocol_message_clear(&msg);
(void) kcp_conn_close(client->conn);
errno = kcp_client_registration_errno_from_message(error_text);
return -1;
}
rc = kcp_client_handle_reserved_server_message(client, &msg);
protocol_message_clear(&msg);
if (rc < 0) {
return -1;
}
if (rc > 0 && kcp_client_is_registered(client)) {
return 0;
}
kcp_client_set_registered(client, 0);
kcp_client_set_last_server_error(client, "unexpected message while waiting for server_register_ok");
(void) kcp_conn_close(client->conn);
errno = EPROTO;
return -1;
}
}
static int kcp_client_receive_business_timed(kcp_client_t *client, message_t *out_msg, int timeout_ms) {
uint32_t start_ms;
if (client == NULL || out_msg == NULL || client->conn == NULL) {
errno = EINVAL;
return -1;
}
start_ms = omni_now_millis32();
protocol_message_init(out_msg);
for (;;) {
int rc;
int reserved_rc;
int effective_timeout_ms = timeout_ms < 0 ? -1 : kcp_client_remaining_timeout_ms(timeout_ms, start_ms);
if (timeout_ms >= 0 && effective_timeout_ms <= 0) {
return 1;
}
protocol_message_clear(out_msg);
rc = kcp_conn_receive_timed(client->conn, out_msg, effective_timeout_ms);
if (rc != 0) {
if (rc != 1) {
kcp_client_set_registered(client, 0);
}
return rc;
}
reserved_rc = kcp_client_handle_reserved_server_message(client, out_msg);
if (reserved_rc < 0) {
protocol_message_clear(out_msg);
return -1;
}
if (reserved_rc > 0) {
protocol_message_clear(out_msg);
continue;
}
if (out_msg->type == MSG_TYPE_ERROR && strcmp(out_msg->from, SERVER_PEER_ID) == 0) {
char error_text[256];
kcp_client_copy_server_error_body(out_msg, error_text, sizeof(error_text));
kcp_client_set_last_server_error(client, error_text);
}
latencylog_log_message_event(client->logger, OMNI_NODE_ROLE_PEER, client->id, EVENT_B_APP_RECV, out_msg);
return 0;
}
}
static int kcp_client_persist_message_to_disk(const message_t *msg, const char *inbox_dir, char *out_path, size_t out_path_len) {
char path[512];
@@ -107,7 +398,7 @@ kcp_client_t *kcp_client_dial_with_options(const char *server_addr, const char *
}
snprintf(client->id, sizeof(client->id), "%s", peer_id);
snprintf(client->server_addr, sizeof(client->server_addr), "%s", server_addr == NULL ? "" : server_addr);
pthread_mutex_init(&client->id_mu, NULL);
pthread_mutex_init(&client->state_mu, NULL);
client->logger = logger;
client->conn = kcp_conn_dial_with_options(actual_dial_addr, bind_ip, bind_device, options, packet_logger, logger, OMNI_NODE_ROLE_PEER, peer_id, stats_logger, stats_interval_ms);
if (client->conn == NULL) {
@@ -128,6 +419,12 @@ kcp_client_t *kcp_client_dial_with_options(const char *server_addr, const char *
errno = saved_errno;
return NULL;
}
if (kcp_client_wait_for_register_ok(client) != 0) {
saved_errno = errno;
kcp_client_free(client);
errno = saved_errno;
return NULL;
}
return client;
}
@@ -140,31 +437,14 @@ const char *kcp_client_id(const kcp_client_t *client) {
}
int kcp_client_send_text(kcp_client_t *client, const char *to, const char *text) {
message_t msg;
uint64_t id;
if (client == NULL || to == NULL || text == NULL) {
errno = EINVAL;
return -1;
}
protocol_message_init(&msg);
kcp_client_next_message_id(client, &id);
msg.type = MSG_TYPE_TEXT;
msg.id = id;
snprintf(msg.from, sizeof(msg.from), "%s", client->id);
snprintf(msg.to, sizeof(msg.to), "%s", to);
msg.body = (uint8_t *) omni_strdup(text);
if (msg.body == NULL) {
if (kcp_client_send_business_preflight(client) != 0) {
return -1;
}
msg.body_len = strlen((const char *) msg.body);
latencylog_log_message_event(client->logger, OMNI_NODE_ROLE_PEER, client->id, EVENT_A_APP_PREP_BEGIN, &msg);
if (kcp_conn_send(client->conn, &msg) != 0) {
protocol_message_clear(&msg);
return -1;
}
protocol_message_clear(&msg);
return 0;
return kcp_client_send_text_internal(client, to, text, 1);
}
int kcp_client_send_binary(kcp_client_t *client, const char *to, const void *data, size_t data_len) {
@@ -175,8 +455,13 @@ int kcp_client_send_binary(kcp_client_t *client, const char *to, const void *dat
errno = EINVAL;
return -1;
}
if (kcp_client_send_business_preflight(client) != 0) {
return -1;
}
protocol_message_init(&msg);
kcp_client_next_message_id(client, &id);
if (kcp_client_next_message_id(client, &id) != 0) {
return -1;
}
msg.type = MSG_TYPE_BINARY;
msg.id = id;
snprintf(msg.from, sizeof(msg.from), "%s", client->id);
@@ -209,11 +494,17 @@ int kcp_client_send_file_path(kcp_client_t *client, const char *to, const char *
errno = EINVAL;
return -1;
}
if (kcp_client_send_business_preflight(client) != 0) {
return -1;
}
if (omni_read_file(path, &body, &body_len) != 0) {
return -1;
}
protocol_message_init(&msg);
kcp_client_next_message_id(client, &id);
if (kcp_client_next_message_id(client, &id) != 0) {
free(body);
return -1;
}
msg.type = MSG_TYPE_FILE;
msg.id = id;
snprintf(msg.from, sizeof(msg.from), "%s", client->id);
@@ -231,18 +522,11 @@ int kcp_client_send_file_path(kcp_client_t *client, const char *to, const char *
}
int kcp_client_receive_timed(kcp_client_t *client, message_t *out_msg, int timeout_ms) {
int rc;
if (client == NULL || out_msg == NULL) {
errno = EINVAL;
return -1;
}
rc = kcp_conn_receive_timed(client->conn, out_msg, timeout_ms);
if (rc != 0) {
return rc;
}
latencylog_log_message_event(client->logger, OMNI_NODE_ROLE_PEER, client->id, EVENT_B_APP_RECV, out_msg);
return 0;
return kcp_client_receive_business_timed(client, out_msg, timeout_ms);
}
int kcp_client_receive(kcp_client_t *client, message_t *out_msg) {
@@ -264,6 +548,7 @@ int kcp_client_receive_binary_into(kcp_client_t *client, void *buffer, size_t bu
protocol_message_init(&msg);
rc = kcp_client_receive_timed(client, &msg, timeout_ms);
if (rc != 0) {
protocol_message_clear(&msg);
return rc;
}
@@ -294,6 +579,27 @@ int kcp_client_persist_message(kcp_client_t *client, const message_t *msg, const
return 0;
}
void kcp_client_state_snapshot(kcp_client_t *client, kcp_client_state_t *out_state) {
kcp_runtime_stats_t runtime_stats;
if (out_state == NULL) {
return;
}
memset(out_state, 0, sizeof(*out_state));
if (client == NULL) {
return;
}
memset(&runtime_stats, 0, sizeof(runtime_stats));
if (client->conn != NULL) {
kcp_conn_runtime_stats_snapshot(client->conn, &runtime_stats);
out_state->connected = runtime_stats.connected;
}
pthread_mutex_lock(&client->state_mu);
out_state->registered = client->registered;
snprintf(out_state->last_server_error, sizeof(out_state->last_server_error), "%s", client->last_server_error);
pthread_mutex_unlock(&client->state_mu);
}
void kcp_client_runtime_stats_snapshot(kcp_client_t *client, kcp_runtime_stats_t *out_stats) {
if (out_stats == NULL) {
return;
@@ -307,7 +613,11 @@ void kcp_client_runtime_stats_snapshot(kcp_client_t *client, kcp_runtime_stats_t
}
int kcp_client_close(kcp_client_t *client) {
return client == NULL ? 0 : kcp_conn_close(client->conn);
if (client == NULL) {
return 0;
}
kcp_client_set_registered(client, 0);
return kcp_conn_close(client->conn);
}
void kcp_client_free(kcp_client_t *client) {
@@ -315,6 +625,6 @@ void kcp_client_free(kcp_client_t *client) {
return;
}
kcp_conn_free(client->conn);
pthread_mutex_destroy(&client->id_mu);
pthread_mutex_destroy(&client->state_mu);
free(client);
}

View File

@@ -3,17 +3,29 @@
#include "cJSON.h"
#include <stdatomic.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#define KCP_RELAY_MAX_DATAGRAM_SIZE (60 * 1024)
#define KCP_HUB_MAINTENANCE_INTERVAL_MS 250
#define KCP_HUB_DEFAULT_TELEMETRY_INTERVAL_MS 500
#define KCP_HUB_DEFAULT_HEARTBEAT_INTERVAL_MS 1000
#define KCP_HUB_DEFAULT_LEASE_TIMEOUT_MS 4000
#define KCP_HUB_TELEMETRY_NODE_ID "hub-telemetry"
#define KCP_HUB_DEFAULT_NODE_ID "hub"
#define KCP_HUB_CTRL_REGISTER_OK "{\"type\":\"server_register_ok\"}"
#define KCP_HUB_CTRL_PEER_REPLACED "{\"type\":\"server_peer_replaced\",\"reason\":\"new_instance_wins\"}"
#define KCP_HUB_CTRL_HEARTBEAT "{\"type\":\"server_heartbeat\"}"
#define KCP_HUB_CTRL_HEARTBEAT_ACK "{\"type\":\"server_heartbeat_ack\"}"
typedef struct kcp_peer_entry {
struct kcp_peer_entry *next;
char peer_id[OMNI_MAX_PEER_ID];
kcp_conn_t *conn;
uint32_t last_seen_ms;
uint32_t last_heartbeat_sent_ms;
} kcp_peer_entry_t;
typedef struct kcp_session_thread_ctx {
@@ -21,6 +33,12 @@ typedef struct kcp_session_thread_ctx {
kcp_conn_t *conn;
} kcp_session_thread_ctx_t;
typedef struct kcp_hub_pending_action {
struct kcp_hub_pending_action *next;
char peer_id[OMNI_MAX_PEER_ID];
kcp_conn_t *conn;
} kcp_hub_pending_action_t;
struct kcp_hub {
pthread_rwlock_t lock;
kcp_peer_entry_t *peers;
@@ -29,6 +47,8 @@ struct kcp_hub {
int stats_interval_ms;
char telemetry_peer_id[OMNI_MAX_PEER_ID];
int telemetry_interval_ms;
int heartbeat_interval_ms;
int lease_timeout_ms;
pthread_t telemetry_thread;
int telemetry_thread_started;
int relay_fd;
@@ -41,11 +61,65 @@ struct kcp_hub {
static int kcp_hub_peer_id_has_suffix(const char *peer_id, const char *suffix);
static int kcp_hub_deliver_to_local_peer(kcp_hub_t *hub, const message_t *msg);
static int kcp_hub_send_server_text(kcp_conn_t *conn, const char *to, const char *payload);
static void kcp_hub_touch_peer(kcp_hub_t *hub, const char *peer_id, kcp_conn_t *conn);
static void kcp_hub_run_maintenance(kcp_hub_t *hub);
static uint32_t kcp_hub_now_ms(void) {
return omni_now_millis32();
}
static uint32_t kcp_hub_elapsed_ms(uint32_t now_ms, uint32_t then_ms) {
return now_ms - then_ms;
}
static int kcp_hub_text_body_equals(const message_t *msg, const char *payload) {
size_t expected_len;
if (msg == NULL || payload == NULL) {
return 0;
}
expected_len = strlen(payload);
return msg->body_len == expected_len && msg->body != NULL && memcmp(msg->body, payload, expected_len) == 0;
}
static int kcp_hub_append_pending_action(kcp_hub_pending_action_t **head, const char *peer_id, kcp_conn_t *conn) {
kcp_hub_pending_action_t *action;
if (head == NULL || peer_id == NULL || conn == NULL) {
errno = EINVAL;
return -1;
}
action = (kcp_hub_pending_action_t *) calloc(1, sizeof(*action));
if (action == NULL) {
return -1;
}
snprintf(action->peer_id, sizeof(action->peer_id), "%s", peer_id);
action->conn = conn;
action->next = *head;
*head = action;
return 0;
}
static void kcp_hub_free_pending_actions(kcp_hub_pending_action_t *head) {
while (head != NULL) {
kcp_hub_pending_action_t *next = head->next;
free(head);
head = next;
}
}
static int kcp_hub_peer_is_telemetry(const char *peer_id) {
return kcp_hub_peer_id_has_suffix(peer_id, "-telemetry");
}
static int kcp_hub_peer_uses_server_lease(const char *peer_id) {
if (peer_id == NULL || peer_id[0] == '\0') {
return 0;
}
return kcp_hub_peer_id_has_suffix(peer_id, "-ctrl") || kcp_hub_peer_is_telemetry(peer_id);
}
static const char *kcp_hub_peer_node_id(const char *peer_id) {
return kcp_hub_peer_is_telemetry(peer_id) ? KCP_HUB_TELEMETRY_NODE_ID : KCP_HUB_DEFAULT_NODE_ID;
}
@@ -118,6 +192,27 @@ static int kcp_hub_configure_peer_transport(kcp_conn_t *conn, const char *peer_i
return 0;
}
static void kcp_hub_touch_peer_locked(kcp_hub_t *hub, const char *peer_id, kcp_conn_t *conn) {
kcp_peer_entry_t *entry;
if (hub == NULL || peer_id == NULL || peer_id[0] == '\0') {
return;
}
entry = kcp_hub_find_peer(hub, peer_id);
if (entry != NULL && (conn == NULL || entry->conn == conn)) {
entry->last_seen_ms = kcp_hub_now_ms();
}
}
static void kcp_hub_touch_peer(kcp_hub_t *hub, const char *peer_id, kcp_conn_t *conn) {
if (hub == NULL || peer_id == NULL || peer_id[0] == '\0') {
return;
}
pthread_rwlock_wrlock(&hub->lock);
kcp_hub_touch_peer_locked(hub, peer_id, conn);
pthread_rwlock_unlock(&hub->lock);
}
static int kcp_hub_add_runtime_stats_json(cJSON *object, const kcp_runtime_stats_t *stats) {
if (object == NULL || stats == NULL) {
errno = EINVAL;
@@ -287,25 +382,53 @@ static int kcp_hub_push_telemetry_snapshot(kcp_hub_t *hub) {
static void *kcp_hub_telemetry_thread_main(void *arg) {
kcp_hub_t *hub = (kcp_hub_t *) arg;
uint32_t last_telemetry_push_ms = 0;
while (!atomic_load(&hub->closed)) {
int interval_ms = KCP_HUB_DEFAULT_TELEMETRY_INTERVAL_MS;
uint32_t now_ms = kcp_hub_now_ms();
int telemetry_enabled = 0;
pthread_rwlock_rdlock(&hub->lock);
if (hub->telemetry_interval_ms > 0) {
telemetry_enabled = hub->telemetry_peer_id[0] != '\0';
if (telemetry_enabled && hub->telemetry_interval_ms > 0) {
interval_ms = hub->telemetry_interval_ms;
}
pthread_rwlock_unlock(&hub->lock);
(void) kcp_hub_push_telemetry_snapshot(hub);
if (telemetry_enabled && (last_telemetry_push_ms == 0 || kcp_hub_elapsed_ms(now_ms, last_telemetry_push_ms) >= (uint32_t) interval_ms)) {
(void) kcp_hub_push_telemetry_snapshot(hub);
last_telemetry_push_ms = now_ms;
}
kcp_hub_run_maintenance(hub);
if (atomic_load(&hub->closed)) {
break;
}
usleep((useconds_t) interval_ms * 1000U);
usleep((useconds_t) KCP_HUB_MAINTENANCE_INTERVAL_MS * 1000U);
}
return NULL;
}
static int kcp_hub_send_server_text(kcp_conn_t *conn, const char *to, const char *payload) {
message_t msg;
protocol_message_init(&msg);
msg.type = MSG_TYPE_TEXT;
snprintf(msg.from, sizeof(msg.from), "%s", SERVER_PEER_ID);
snprintf(msg.to, sizeof(msg.to), "%s", (to == NULL || to[0] == '\0') ? "unknown" : to);
msg.body = (uint8_t *) omni_strdup(payload == NULL ? "" : payload);
if (msg.body == NULL) {
return -1;
}
msg.body_len = strlen((const char *) msg.body);
if (kcp_conn_send(conn, &msg) != 0) {
protocol_message_clear(&msg);
return -1;
}
protocol_message_clear(&msg);
return 0;
}
static int kcp_hub_send_server_error(kcp_conn_t *conn, const char *to, const char *message) {
message_t msg;
protocol_message_init(&msg);
@@ -495,8 +618,56 @@ static int kcp_hub_handle_peer_message(kcp_hub_t *hub, const char *peer_id, kcp_
char *error_text = NULL;
int relay_status = 0;
kcp_hub_touch_peer(hub, peer_id, conn);
switch (msg->type) {
case MSG_TYPE_TEXT:
if (strcmp(msg->to, SERVER_PEER_ID) == 0) {
if (kcp_hub_text_body_equals(msg, KCP_HUB_CTRL_HEARTBEAT_ACK)) {
return 0;
}
if (kcp_hub_send_server_error(conn, peer_id, "unsupported server control message") != 0) {
return -1;
}
errno = EPROTO;
return -1;
}
snprintf(msg->from, sizeof(msg->from), "%s", peer_id);
if (kcp_hub_deliver_to_local_peer(hub, msg) == 0) {
return 0;
}
if (errno != ENOENT) {
error_text = omni_strdup_printf("failed to forward to %s", msg->to);
if (error_text == NULL) {
return -1;
}
if (kcp_hub_send_server_error(conn, peer_id, error_text) != 0) {
free(error_text);
return -1;
}
free(error_text);
return 0;
}
if (kcp_hub_forward_to_relay(hub, msg, &relay_status) == 0) {
return 0;
}
if (relay_status == 1) {
error_text = omni_strdup_printf("unknown target: %s", msg->to);
} else if (relay_status == 2) {
error_text = omni_strdup("failed to relay to remote peer");
} else if (relay_status == 3) {
error_text = omni_strdup("message too large for relay udp");
} else {
error_text = omni_strdup("failed to relay to remote peer");
}
if (error_text == NULL) {
return -1;
}
if (kcp_hub_send_server_error(conn, peer_id, error_text) != 0) {
free(error_text);
return -1;
}
free(error_text);
return 0;
case MSG_TYPE_FILE:
case MSG_TYPE_BINARY:
snprintf(msg->from, sizeof(msg->from), "%s", peer_id);
@@ -558,9 +729,55 @@ static int kcp_hub_handle_peer_message(kcp_hub_t *hub, const char *peer_id, kcp_
}
}
static int kcp_hub_commit_registered_conn(
kcp_hub_t *hub,
const char *peer_id,
kcp_conn_t *conn,
uint32_t now_ms,
kcp_conn_t **out_old_conn
) {
kcp_peer_entry_t *entry;
if (hub == NULL || peer_id == NULL || peer_id[0] == '\0' || conn == NULL) {
errno = EINVAL;
return -1;
}
if (out_old_conn != NULL) {
*out_old_conn = NULL;
}
pthread_rwlock_wrlock(&hub->lock);
entry = kcp_hub_find_peer(hub, peer_id);
if (entry != NULL) {
if (out_old_conn != NULL) {
*out_old_conn = entry->conn;
}
entry->conn = conn;
entry->last_seen_ms = now_ms;
entry->last_heartbeat_sent_ms = 0;
pthread_rwlock_unlock(&hub->lock);
return 0;
}
entry = (kcp_peer_entry_t *) calloc(1, sizeof(*entry));
if (entry == NULL) {
pthread_rwlock_unlock(&hub->lock);
return -1;
}
snprintf(entry->peer_id, sizeof(entry->peer_id), "%s", peer_id);
entry->conn = conn;
entry->last_seen_ms = now_ms;
entry->last_heartbeat_sent_ms = 0;
entry->next = hub->peers;
hub->peers = entry;
pthread_rwlock_unlock(&hub->lock);
return 0;
}
static int kcp_hub_register_conn(kcp_hub_t *hub, kcp_conn_t *conn, char *peer_id, size_t peer_id_len) {
message_t msg;
kcp_peer_entry_t *entry;
kcp_conn_t *old_conn = NULL;
uint32_t now_ms;
protocol_message_init(&msg);
if (kcp_conn_receive(conn, &msg) != 0) {
@@ -574,34 +791,22 @@ static int kcp_hub_register_conn(kcp_hub_t *hub, kcp_conn_t *conn, char *peer_id
return -1;
}
pthread_rwlock_wrlock(&hub->lock);
entry = kcp_hub_find_peer(hub, msg.from);
if (entry != NULL) {
char *error_text;
pthread_rwlock_unlock(&hub->lock);
error_text = omni_strdup_printf("duplicate peer id: %s", msg.from);
if (error_text != NULL) {
(void) kcp_hub_send_server_error(conn, msg.from, error_text);
free(error_text);
}
protocol_message_clear(&msg);
errno = EEXIST;
return -1;
}
entry = (kcp_peer_entry_t *) calloc(1, sizeof(*entry));
if (entry == NULL) {
pthread_rwlock_unlock(&hub->lock);
protocol_message_clear(&msg);
return -1;
}
snprintf(entry->peer_id, sizeof(entry->peer_id), "%s", msg.from);
entry->conn = conn;
entry->next = hub->peers;
hub->peers = entry;
pthread_rwlock_unlock(&hub->lock);
snprintf(peer_id, peer_id_len, "%s", msg.from);
if (kcp_hub_send_server_text(conn, msg.from, KCP_HUB_CTRL_REGISTER_OK) != 0) {
protocol_message_clear(&msg);
return -1;
}
now_ms = kcp_hub_now_ms();
if (kcp_hub_commit_registered_conn(hub, msg.from, conn, now_ms, &old_conn) != 0) {
protocol_message_clear(&msg);
return -1;
}
if (old_conn != NULL && old_conn != conn) {
(void) kcp_hub_send_server_text(old_conn, msg.from, KCP_HUB_CTRL_PEER_REPLACED);
kcp_conn_close(old_conn);
}
protocol_message_clear(&msg);
return 0;
}
@@ -623,8 +828,16 @@ kcp_hub_t *kcp_hub_new(latency_logger_t *logger, kcp_session_stats_logger_t *sta
hub->stats_logger = stats_logger;
hub->stats_interval_ms = stats_interval_ms > 0 ? stats_interval_ms : KCP_DEFAULT_STATS_INTERVAL_MS;
hub->telemetry_interval_ms = KCP_HUB_DEFAULT_TELEMETRY_INTERVAL_MS;
hub->heartbeat_interval_ms = KCP_HUB_DEFAULT_HEARTBEAT_INTERVAL_MS;
hub->lease_timeout_ms = KCP_HUB_DEFAULT_LEASE_TIMEOUT_MS;
hub->relay_fd = -1;
atomic_init(&hub->closed, 0);
if (pthread_create(&hub->telemetry_thread, NULL, kcp_hub_telemetry_thread_main, hub) != 0) {
pthread_rwlock_destroy(&hub->lock);
free(hub);
return NULL;
}
hub->telemetry_thread_started = 1;
return hub;
}
@@ -732,8 +945,6 @@ int kcp_hub_set_relay(kcp_hub_t *hub, int relay_fd, const struct sockaddr *peer_
}
int kcp_hub_set_telemetry(kcp_hub_t *hub, const char *peer_id, int interval_ms) {
int start_thread = 0;
if (hub == NULL || peer_id == NULL) {
errno = EINVAL;
return -1;
@@ -741,22 +952,92 @@ int kcp_hub_set_telemetry(kcp_hub_t *hub, const char *peer_id, int interval_ms)
pthread_rwlock_wrlock(&hub->lock);
snprintf(hub->telemetry_peer_id, sizeof(hub->telemetry_peer_id), "%s", peer_id);
hub->telemetry_interval_ms = interval_ms > 0 ? interval_ms : KCP_HUB_DEFAULT_TELEMETRY_INTERVAL_MS;
if (!hub->telemetry_thread_started && hub->telemetry_peer_id[0] != '\0') {
start_thread = 1;
hub->telemetry_thread_started = 1;
pthread_rwlock_unlock(&hub->lock);
return 0;
}
static void kcp_hub_run_maintenance(kcp_hub_t *hub) {
kcp_hub_pending_action_t *heartbeat_actions = NULL;
kcp_hub_pending_action_t *close_actions = NULL;
uint32_t now_ms;
int heartbeat_interval_ms;
int lease_timeout_ms;
if (hub == NULL) {
return;
}
now_ms = kcp_hub_now_ms();
heartbeat_interval_ms = KCP_HUB_DEFAULT_HEARTBEAT_INTERVAL_MS;
lease_timeout_ms = KCP_HUB_DEFAULT_LEASE_TIMEOUT_MS;
pthread_rwlock_wrlock(&hub->lock);
if (hub->heartbeat_interval_ms > 0) {
heartbeat_interval_ms = hub->heartbeat_interval_ms;
}
if (hub->lease_timeout_ms > 0) {
lease_timeout_ms = hub->lease_timeout_ms;
}
{
kcp_peer_entry_t *prev = NULL;
kcp_peer_entry_t *entry = hub->peers;
while (entry != NULL) {
kcp_peer_entry_t *next = entry->next;
uint32_t idle_ms = kcp_hub_elapsed_ms(now_ms, entry->last_seen_ms);
int uses_server_lease = kcp_hub_peer_uses_server_lease(entry->peer_id);
if (entry->conn == NULL || entry->peer_id[0] == '\0') {
prev = entry;
entry = next;
continue;
}
if (uses_server_lease && lease_timeout_ms > 0 && idle_ms >= (uint32_t) lease_timeout_ms) {
if (prev == NULL) {
hub->peers = next;
} else {
prev->next = next;
}
(void) kcp_hub_append_pending_action(&close_actions, entry->peer_id, entry->conn);
free(entry);
entry = next;
continue;
}
if (
uses_server_lease
&&
heartbeat_interval_ms > 0
&& idle_ms >= (uint32_t) heartbeat_interval_ms
&& (entry->last_heartbeat_sent_ms == 0 || kcp_hub_elapsed_ms(now_ms, entry->last_heartbeat_sent_ms) >= (uint32_t) heartbeat_interval_ms)
) {
entry->last_heartbeat_sent_ms = now_ms;
(void) kcp_hub_append_pending_action(&heartbeat_actions, entry->peer_id, entry->conn);
}
prev = entry;
entry = next;
}
}
pthread_rwlock_unlock(&hub->lock);
if (start_thread) {
if (pthread_create(&hub->telemetry_thread, NULL, kcp_hub_telemetry_thread_main, hub) != 0) {
pthread_rwlock_wrlock(&hub->lock);
hub->telemetry_thread_started = 0;
hub->telemetry_peer_id[0] = '\0';
pthread_rwlock_unlock(&hub->lock);
return -1;
while (heartbeat_actions != NULL) {
kcp_hub_pending_action_t *next = heartbeat_actions->next;
if (kcp_hub_send_server_text(heartbeat_actions->conn, heartbeat_actions->peer_id, KCP_HUB_CTRL_HEARTBEAT) != 0) {
kcp_hub_unregister(hub, heartbeat_actions->peer_id, heartbeat_actions->conn);
kcp_conn_close(heartbeat_actions->conn);
}
free(heartbeat_actions);
heartbeat_actions = next;
}
return 0;
while (close_actions != NULL) {
kcp_hub_pending_action_t *next = close_actions->next;
kcp_conn_close(close_actions->conn);
free(close_actions);
close_actions = next;
}
kcp_hub_free_pending_actions(heartbeat_actions);
kcp_hub_free_pending_actions(close_actions);
}
int kcp_hub_serve_relay(kcp_hub_t *hub) {