848 lines
27 KiB
C
848 lines
27 KiB
C
#include "server_kcp_hub.h"
|
|
|
|
#include "cJSON.h"
|
|
|
|
#include <stdatomic.h>
|
|
#include <unistd.h>
|
|
|
|
#define KCP_RELAY_MAX_DATAGRAM_SIZE (60 * 1024)
|
|
#define KCP_HUB_DEFAULT_TELEMETRY_INTERVAL_MS 500
|
|
#define KCP_HUB_TELEMETRY_NODE_ID "hub-telemetry"
|
|
#define KCP_HUB_DEFAULT_NODE_ID "hub"
|
|
|
|
typedef struct kcp_peer_entry {
|
|
struct kcp_peer_entry *next;
|
|
char peer_id[OMNI_MAX_PEER_ID];
|
|
kcp_conn_t *conn;
|
|
} kcp_peer_entry_t;
|
|
|
|
typedef struct kcp_session_thread_ctx {
|
|
kcp_hub_t *hub;
|
|
kcp_conn_t *conn;
|
|
} kcp_session_thread_ctx_t;
|
|
|
|
struct kcp_hub {
|
|
pthread_rwlock_t lock;
|
|
kcp_peer_entry_t *peers;
|
|
latency_logger_t *logger;
|
|
kcp_session_stats_logger_t *stats_logger;
|
|
int stats_interval_ms;
|
|
char telemetry_peer_id[OMNI_MAX_PEER_ID];
|
|
int telemetry_interval_ms;
|
|
pthread_t telemetry_thread;
|
|
int telemetry_thread_started;
|
|
int relay_fd;
|
|
int relay_configured;
|
|
int relay_learn_peer;
|
|
struct sockaddr_storage relay_peer_addr;
|
|
socklen_t relay_peer_addr_len;
|
|
atomic_int closed;
|
|
};
|
|
|
|
static int kcp_hub_peer_id_has_suffix(const char *peer_id, const char *suffix);
|
|
static int kcp_hub_deliver_to_local_peer(kcp_hub_t *hub, const message_t *msg);
|
|
|
|
static int kcp_hub_peer_is_telemetry(const char *peer_id) {
|
|
return kcp_hub_peer_id_has_suffix(peer_id, "-telemetry");
|
|
}
|
|
|
|
static const char *kcp_hub_peer_node_id(const char *peer_id) {
|
|
return kcp_hub_peer_is_telemetry(peer_id) ? KCP_HUB_TELEMETRY_NODE_ID : KCP_HUB_DEFAULT_NODE_ID;
|
|
}
|
|
|
|
static void kcp_hub_unregister(kcp_hub_t *hub, const char *peer_id, kcp_conn_t *conn) {
|
|
kcp_peer_entry_t *prev = NULL;
|
|
kcp_peer_entry_t *entry;
|
|
|
|
if (hub == NULL || peer_id == NULL || peer_id[0] == '\0') {
|
|
return;
|
|
}
|
|
|
|
pthread_rwlock_wrlock(&hub->lock);
|
|
for (entry = hub->peers; entry != NULL; entry = entry->next) {
|
|
if (strcmp(entry->peer_id, peer_id) == 0 && entry->conn == conn) {
|
|
if (prev == NULL) {
|
|
hub->peers = entry->next;
|
|
} else {
|
|
prev->next = entry->next;
|
|
}
|
|
free(entry);
|
|
break;
|
|
}
|
|
prev = entry;
|
|
}
|
|
pthread_rwlock_unlock(&hub->lock);
|
|
}
|
|
|
|
static kcp_peer_entry_t *kcp_hub_find_peer(kcp_hub_t *hub, const char *peer_id) {
|
|
kcp_peer_entry_t *entry;
|
|
for (entry = hub->peers; entry != NULL; entry = entry->next) {
|
|
if (strcmp(entry->peer_id, peer_id) == 0) {
|
|
return entry;
|
|
}
|
|
}
|
|
return NULL;
|
|
}
|
|
|
|
static int kcp_hub_peer_id_has_suffix(const char *peer_id, const char *suffix) {
|
|
size_t peer_len;
|
|
size_t suffix_len;
|
|
|
|
if (peer_id == NULL || suffix == NULL) {
|
|
return 0;
|
|
}
|
|
peer_len = strlen(peer_id);
|
|
suffix_len = strlen(suffix);
|
|
return peer_len >= suffix_len && strcmp(peer_id + peer_len - suffix_len, suffix) == 0;
|
|
}
|
|
|
|
static int kcp_hub_configure_peer_transport(kcp_conn_t *conn, const char *peer_id) {
|
|
kcp_conn_options_t options;
|
|
|
|
if (conn == NULL || peer_id == NULL) {
|
|
errno = EINVAL;
|
|
return -1;
|
|
}
|
|
if (kcp_hub_peer_id_has_suffix(peer_id, "-ctrl")) {
|
|
kcp_conn_options_set_control_defaults(&options);
|
|
return kcp_conn_apply_options(conn, &options);
|
|
}
|
|
if (kcp_hub_peer_id_has_suffix(peer_id, "-video")) {
|
|
kcp_conn_options_set_video_defaults(&options);
|
|
return kcp_conn_apply_options(conn, &options);
|
|
}
|
|
if (kcp_hub_peer_is_telemetry(peer_id)) {
|
|
kcp_conn_options_set_telemetry_defaults(&options);
|
|
return kcp_conn_apply_options(conn, &options);
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
static int kcp_hub_add_runtime_stats_json(cJSON *object, const kcp_runtime_stats_t *stats) {
|
|
if (object == NULL || stats == NULL) {
|
|
errno = EINVAL;
|
|
return -1;
|
|
}
|
|
if (cJSON_AddNumberToObject(object, "connected", stats->connected) == NULL ||
|
|
cJSON_AddNumberToObject(object, "conv", (double) stats->conv) == NULL ||
|
|
cJSON_AddNumberToObject(object, "rto_ms", (double) stats->rto_ms) == NULL ||
|
|
cJSON_AddNumberToObject(object, "srtt_ms", (double) stats->srtt_ms) == NULL ||
|
|
cJSON_AddNumberToObject(object, "srttvar_ms", (double) stats->srttvar_ms) == NULL ||
|
|
cJSON_AddNumberToObject(object, "snd_wnd", (double) stats->snd_wnd) == NULL ||
|
|
cJSON_AddNumberToObject(object, "rmt_wnd", (double) stats->rmt_wnd) == NULL ||
|
|
cJSON_AddNumberToObject(object, "inflight", (double) stats->inflight) == NULL ||
|
|
cJSON_AddNumberToObject(object, "window_limit", (double) stats->window_limit) == NULL ||
|
|
cJSON_AddNumberToObject(object, "window_pressure_pct", stats->window_pressure_pct) == NULL ||
|
|
cJSON_AddNumberToObject(object, "snd_queue", (double) stats->snd_queue) == NULL ||
|
|
cJSON_AddNumberToObject(object, "rcv_queue", (double) stats->rcv_queue) == NULL ||
|
|
cJSON_AddNumberToObject(object, "snd_buffer", (double) stats->snd_buffer) == NULL ||
|
|
cJSON_AddNumberToObject(object, "out_segs_total", (double) stats->out_segs_total) == NULL ||
|
|
cJSON_AddNumberToObject(object, "retrans_total", (double) stats->retrans_total) == NULL ||
|
|
cJSON_AddNumberToObject(object, "fast_retrans_total", (double) stats->fast_retrans_total) == NULL ||
|
|
cJSON_AddNumberToObject(object, "lost_total", (double) stats->lost_total) == NULL ||
|
|
cJSON_AddNumberToObject(object, "repeat_total", (double) stats->repeat_total) == NULL ||
|
|
cJSON_AddNumberToObject(object, "xmit_total", (double) stats->xmit_total) == NULL) {
|
|
errno = ENOMEM;
|
|
return -1;
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
static int kcp_hub_build_telemetry_payload_locked(kcp_hub_t *hub, char **out_payload) {
|
|
cJSON *root = NULL;
|
|
cJSON *sessions = NULL;
|
|
char *ts_unix_nano_text = NULL;
|
|
char *payload = NULL;
|
|
kcp_peer_entry_t *entry;
|
|
|
|
if (hub == NULL || out_payload == NULL) {
|
|
errno = EINVAL;
|
|
return -1;
|
|
}
|
|
*out_payload = NULL;
|
|
|
|
root = cJSON_CreateObject();
|
|
if (root == NULL) {
|
|
errno = ENOMEM;
|
|
return -1;
|
|
}
|
|
sessions = cJSON_AddArrayToObject(root, "sessions");
|
|
if (sessions == NULL) {
|
|
cJSON_Delete(root);
|
|
errno = ENOMEM;
|
|
return -1;
|
|
}
|
|
|
|
ts_unix_nano_text = omni_strdup_printf("%" PRId64, omni_now_unix_nano());
|
|
if (ts_unix_nano_text == NULL) {
|
|
cJSON_Delete(root);
|
|
return -1;
|
|
}
|
|
if (cJSON_AddStringToObject(root, "type", "hub_kcp_snapshot") == NULL ||
|
|
cJSON_AddStringToObject(root, "ts_unix_nano", ts_unix_nano_text) == NULL ||
|
|
cJSON_AddStringToObject(root, "node_id", KCP_HUB_DEFAULT_NODE_ID) == NULL) {
|
|
free(ts_unix_nano_text);
|
|
cJSON_Delete(root);
|
|
errno = ENOMEM;
|
|
return -1;
|
|
}
|
|
free(ts_unix_nano_text);
|
|
|
|
for (entry = hub->peers; entry != NULL; entry = entry->next) {
|
|
cJSON *session = NULL;
|
|
kcp_runtime_stats_t stats;
|
|
struct sockaddr_storage local_addr;
|
|
struct sockaddr_storage remote_addr;
|
|
socklen_t local_len = sizeof(local_addr);
|
|
socklen_t remote_len = sizeof(remote_addr);
|
|
char local_text[OMNI_MAX_ADDR_TEXT] = "";
|
|
char remote_text[OMNI_MAX_ADDR_TEXT] = "";
|
|
|
|
if (entry->conn == NULL || entry->peer_id[0] == '\0' || kcp_hub_peer_is_telemetry(entry->peer_id)) {
|
|
continue;
|
|
}
|
|
|
|
memset(&stats, 0, sizeof(stats));
|
|
kcp_conn_runtime_stats_snapshot(entry->conn, &stats);
|
|
if (kcp_conn_local_addr(entry->conn, &local_addr, &local_len) != 0) {
|
|
local_len = 0;
|
|
}
|
|
if (kcp_conn_remote_addr(entry->conn, &remote_addr, &remote_len) != 0) {
|
|
remote_len = 0;
|
|
}
|
|
if (local_len > 0) {
|
|
omni_sockaddr_to_string((const struct sockaddr *) &local_addr, local_len, local_text, sizeof(local_text));
|
|
}
|
|
if (remote_len > 0) {
|
|
omni_sockaddr_to_string((const struct sockaddr *) &remote_addr, remote_len, remote_text, sizeof(remote_text));
|
|
}
|
|
|
|
session = cJSON_CreateObject();
|
|
if (session == NULL) {
|
|
cJSON_Delete(root);
|
|
errno = ENOMEM;
|
|
return -1;
|
|
}
|
|
cJSON_AddItemToArray(sessions, session);
|
|
if (cJSON_AddStringToObject(session, "peer_id", entry->peer_id) == NULL ||
|
|
cJSON_AddStringToObject(session, "local_addr", local_text) == NULL ||
|
|
cJSON_AddStringToObject(session, "remote_addr", remote_text) == NULL ||
|
|
kcp_hub_add_runtime_stats_json(session, &stats) != 0) {
|
|
cJSON_Delete(root);
|
|
errno = ENOMEM;
|
|
return -1;
|
|
}
|
|
}
|
|
|
|
payload = cJSON_PrintUnformatted(root);
|
|
cJSON_Delete(root);
|
|
if (payload == NULL) {
|
|
errno = ENOMEM;
|
|
return -1;
|
|
}
|
|
*out_payload = payload;
|
|
return 0;
|
|
}
|
|
|
|
static int kcp_hub_push_telemetry_snapshot(kcp_hub_t *hub) {
|
|
message_t msg;
|
|
char *payload = NULL;
|
|
char telemetry_peer_id[OMNI_MAX_PEER_ID];
|
|
int rc;
|
|
|
|
if (hub == NULL) {
|
|
errno = EINVAL;
|
|
return -1;
|
|
}
|
|
|
|
pthread_rwlock_rdlock(&hub->lock);
|
|
if (hub->telemetry_peer_id[0] == '\0' || kcp_hub_find_peer(hub, hub->telemetry_peer_id) == NULL) {
|
|
pthread_rwlock_unlock(&hub->lock);
|
|
return 0;
|
|
}
|
|
snprintf(telemetry_peer_id, sizeof(telemetry_peer_id), "%s", hub->telemetry_peer_id);
|
|
rc = kcp_hub_build_telemetry_payload_locked(hub, &payload);
|
|
pthread_rwlock_unlock(&hub->lock);
|
|
if (rc != 0) {
|
|
return -1;
|
|
}
|
|
|
|
protocol_message_init(&msg);
|
|
msg.type = MSG_TYPE_TEXT;
|
|
snprintf(msg.from, sizeof(msg.from), "%s", SERVER_PEER_ID);
|
|
snprintf(msg.to, sizeof(msg.to), "%s", telemetry_peer_id);
|
|
msg.body = (uint8_t *) omni_strdup(payload == NULL ? "" : payload);
|
|
cJSON_free(payload);
|
|
if (msg.body == NULL) {
|
|
return -1;
|
|
}
|
|
msg.body_len = strlen((const char *) msg.body);
|
|
rc = kcp_hub_deliver_to_local_peer(hub, &msg);
|
|
protocol_message_clear(&msg);
|
|
if (rc != 0 && errno == ENOENT) {
|
|
return 0;
|
|
}
|
|
return rc;
|
|
}
|
|
|
|
static void *kcp_hub_telemetry_thread_main(void *arg) {
|
|
kcp_hub_t *hub = (kcp_hub_t *) arg;
|
|
|
|
while (!atomic_load(&hub->closed)) {
|
|
int interval_ms = KCP_HUB_DEFAULT_TELEMETRY_INTERVAL_MS;
|
|
|
|
pthread_rwlock_rdlock(&hub->lock);
|
|
if (hub->telemetry_interval_ms > 0) {
|
|
interval_ms = hub->telemetry_interval_ms;
|
|
}
|
|
pthread_rwlock_unlock(&hub->lock);
|
|
|
|
(void) kcp_hub_push_telemetry_snapshot(hub);
|
|
if (atomic_load(&hub->closed)) {
|
|
break;
|
|
}
|
|
usleep((useconds_t) interval_ms * 1000U);
|
|
}
|
|
return NULL;
|
|
}
|
|
|
|
static int kcp_hub_send_server_error(kcp_conn_t *conn, const char *to, const char *message) {
|
|
message_t msg;
|
|
protocol_message_init(&msg);
|
|
msg.type = MSG_TYPE_ERROR;
|
|
snprintf(msg.from, sizeof(msg.from), "%s", SERVER_PEER_ID);
|
|
snprintf(msg.to, sizeof(msg.to), "%s", (to == NULL || to[0] == '\0') ? "unknown" : to);
|
|
msg.body = (uint8_t *) omni_strdup(message == NULL ? "" : message);
|
|
if (msg.body == NULL) {
|
|
return -1;
|
|
}
|
|
msg.body_len = strlen((const char *) msg.body);
|
|
if (kcp_conn_send(conn, &msg) != 0) {
|
|
protocol_message_clear(&msg);
|
|
return -1;
|
|
}
|
|
protocol_message_clear(&msg);
|
|
return 0;
|
|
}
|
|
|
|
static int kcp_hub_sockaddr_equal(const struct sockaddr *left, socklen_t left_len, const struct sockaddr *right, socklen_t right_len) {
|
|
char left_text[OMNI_MAX_ADDR_TEXT];
|
|
char right_text[OMNI_MAX_ADDR_TEXT];
|
|
|
|
if (left == NULL || right == NULL) {
|
|
return left == right;
|
|
}
|
|
return strcmp(
|
|
omni_sockaddr_to_string(left, left_len, left_text, sizeof(left_text)),
|
|
omni_sockaddr_to_string(right, right_len, right_text, sizeof(right_text))
|
|
) == 0;
|
|
}
|
|
|
|
static int kcp_hub_accept_relay_peer(kcp_hub_t *hub, const struct sockaddr *addr, socklen_t addr_len) {
|
|
int accepted = 0;
|
|
|
|
pthread_rwlock_wrlock(&hub->lock);
|
|
if (hub->relay_peer_addr_len == 0 && hub->relay_learn_peer) {
|
|
omni_clone_sockaddr(addr, addr_len, &hub->relay_peer_addr, &hub->relay_peer_addr_len);
|
|
accepted = 1;
|
|
} else if (hub->relay_peer_addr_len == 0) {
|
|
accepted = 1;
|
|
} else {
|
|
accepted = kcp_hub_sockaddr_equal((const struct sockaddr *) &hub->relay_peer_addr, hub->relay_peer_addr_len, addr, addr_len);
|
|
}
|
|
pthread_rwlock_unlock(&hub->lock);
|
|
return accepted;
|
|
}
|
|
|
|
static int kcp_hub_forward_to_relay(kcp_hub_t *hub, const message_t *msg, int *relay_status) {
|
|
uint8_t *payload = NULL;
|
|
size_t payload_len = 0;
|
|
struct sockaddr_storage relay_addr;
|
|
socklen_t relay_addr_len = 0;
|
|
int relay_fd = -1;
|
|
int relay_configured = 0;
|
|
|
|
if (relay_status != NULL) {
|
|
*relay_status = 0;
|
|
}
|
|
if (protocol_encode_message_datagram(msg, &payload, &payload_len) != 0) {
|
|
return -1;
|
|
}
|
|
if (payload_len > KCP_RELAY_MAX_DATAGRAM_SIZE) {
|
|
free(payload);
|
|
errno = EMSGSIZE;
|
|
if (relay_status != NULL) {
|
|
*relay_status = 3;
|
|
}
|
|
return -1;
|
|
}
|
|
|
|
pthread_rwlock_rdlock(&hub->lock);
|
|
relay_fd = hub->relay_fd;
|
|
relay_configured = hub->relay_configured;
|
|
if (hub->relay_peer_addr_len > 0) {
|
|
omni_clone_sockaddr((const struct sockaddr *) &hub->relay_peer_addr, hub->relay_peer_addr_len, &relay_addr, &relay_addr_len);
|
|
}
|
|
pthread_rwlock_unlock(&hub->lock);
|
|
|
|
if (!relay_configured || relay_fd < 0) {
|
|
free(payload);
|
|
errno = ENOTCONN;
|
|
if (relay_status != NULL) {
|
|
*relay_status = 1;
|
|
}
|
|
return -1;
|
|
}
|
|
if (relay_addr_len == 0) {
|
|
free(payload);
|
|
errno = EDESTADDRREQ;
|
|
if (relay_status != NULL) {
|
|
*relay_status = 2;
|
|
}
|
|
return -1;
|
|
}
|
|
if (sendto(relay_fd, payload, payload_len, 0, (struct sockaddr *) &relay_addr, relay_addr_len) < 0) {
|
|
free(payload);
|
|
return -1;
|
|
}
|
|
free(payload);
|
|
return 0;
|
|
}
|
|
|
|
static int kcp_hub_forward_relay_server_error(kcp_hub_t *hub, const char *to, const char *message) {
|
|
message_t msg;
|
|
int rc;
|
|
|
|
protocol_message_init(&msg);
|
|
msg.type = MSG_TYPE_ERROR;
|
|
snprintf(msg.from, sizeof(msg.from), "%s", SERVER_PEER_ID);
|
|
snprintf(msg.to, sizeof(msg.to), "%s", (to == NULL || to[0] == '\0') ? "unknown" : to);
|
|
msg.body = (uint8_t *) omni_strdup(message == NULL ? "" : message);
|
|
if (msg.body == NULL) {
|
|
return -1;
|
|
}
|
|
msg.body_len = strlen((const char *) msg.body);
|
|
rc = kcp_hub_forward_to_relay(hub, &msg, NULL);
|
|
protocol_message_clear(&msg);
|
|
return rc;
|
|
}
|
|
|
|
static int kcp_hub_deliver_to_local_peer(kcp_hub_t *hub, const message_t *msg) {
|
|
kcp_conn_t *target_conn = NULL;
|
|
int rc;
|
|
|
|
pthread_rwlock_rdlock(&hub->lock);
|
|
{
|
|
kcp_peer_entry_t *entry = kcp_hub_find_peer(hub, msg->to);
|
|
if (entry != NULL) {
|
|
target_conn = entry->conn;
|
|
}
|
|
}
|
|
pthread_rwlock_unlock(&hub->lock);
|
|
|
|
if (target_conn == NULL) {
|
|
errno = ENOENT;
|
|
return -1;
|
|
}
|
|
rc = kcp_conn_send(target_conn, msg);
|
|
if (rc != 0) {
|
|
kcp_hub_unregister(hub, msg->to, target_conn);
|
|
kcp_conn_close(target_conn);
|
|
return -1;
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
static int kcp_hub_deliver_relayed_message(kcp_hub_t *hub, const message_t *msg) {
|
|
char *error_text;
|
|
|
|
if (kcp_hub_deliver_to_local_peer(hub, msg) == 0) {
|
|
return 0;
|
|
}
|
|
if (errno != ENOENT) {
|
|
if (msg->type == MSG_TYPE_ERROR) {
|
|
return 0;
|
|
}
|
|
error_text = omni_strdup_printf("failed to forward to %s", msg->to);
|
|
if (error_text == NULL) {
|
|
return -1;
|
|
}
|
|
if (kcp_hub_forward_relay_server_error(hub, msg->from, error_text) != 0) {
|
|
free(error_text);
|
|
return -1;
|
|
}
|
|
free(error_text);
|
|
return 0;
|
|
}
|
|
|
|
if (msg->type == MSG_TYPE_ERROR) {
|
|
return 0;
|
|
}
|
|
|
|
error_text = omni_strdup_printf("unknown target: %s", msg->to);
|
|
if (error_text == NULL) {
|
|
return -1;
|
|
}
|
|
if (kcp_hub_forward_relay_server_error(hub, msg->from, error_text) != 0) {
|
|
free(error_text);
|
|
return -1;
|
|
}
|
|
free(error_text);
|
|
return 0;
|
|
}
|
|
|
|
static int kcp_hub_handle_peer_message(kcp_hub_t *hub, const char *peer_id, kcp_conn_t *conn, message_t *msg) {
|
|
char *error_text = NULL;
|
|
int relay_status = 0;
|
|
|
|
switch (msg->type) {
|
|
case MSG_TYPE_TEXT:
|
|
case MSG_TYPE_FILE:
|
|
case MSG_TYPE_BINARY:
|
|
snprintf(msg->from, sizeof(msg->from), "%s", peer_id);
|
|
if (kcp_hub_deliver_to_local_peer(hub, msg) == 0) {
|
|
return 0;
|
|
}
|
|
if (errno != ENOENT) {
|
|
error_text = omni_strdup_printf("failed to forward to %s", msg->to);
|
|
if (error_text == NULL) {
|
|
return -1;
|
|
}
|
|
if (kcp_hub_send_server_error(conn, peer_id, error_text) != 0) {
|
|
free(error_text);
|
|
return -1;
|
|
}
|
|
free(error_text);
|
|
return 0;
|
|
}
|
|
if (kcp_hub_forward_to_relay(hub, msg, &relay_status) == 0) {
|
|
return 0;
|
|
}
|
|
if (relay_status == 1) {
|
|
error_text = omni_strdup_printf("unknown target: %s", msg->to);
|
|
} else if (relay_status == 2) {
|
|
error_text = omni_strdup("failed to relay to remote peer");
|
|
} else if (relay_status == 3) {
|
|
error_text = omni_strdup("message too large for relay udp");
|
|
} else {
|
|
error_text = omni_strdup("failed to relay to remote peer");
|
|
}
|
|
if (error_text == NULL) {
|
|
return -1;
|
|
}
|
|
if (kcp_hub_send_server_error(conn, peer_id, error_text) != 0) {
|
|
free(error_text);
|
|
return -1;
|
|
}
|
|
free(error_text);
|
|
return 0;
|
|
case MSG_TYPE_REGISTER:
|
|
case MSG_TYPE_ERROR:
|
|
if (kcp_hub_send_server_error(conn, peer_id, "registered peers can only send text, file, or binary messages") != 0) {
|
|
return -1;
|
|
}
|
|
errno = EPROTO;
|
|
return -1;
|
|
default:
|
|
error_text = omni_strdup_printf("unsupported message type: %s", protocol_message_type_name(msg->type));
|
|
if (error_text == NULL) {
|
|
return -1;
|
|
}
|
|
if (kcp_hub_send_server_error(conn, peer_id, error_text) != 0) {
|
|
free(error_text);
|
|
return -1;
|
|
}
|
|
free(error_text);
|
|
errno = EPROTO;
|
|
return -1;
|
|
}
|
|
}
|
|
|
|
static int kcp_hub_register_conn(kcp_hub_t *hub, kcp_conn_t *conn, char *peer_id, size_t peer_id_len) {
|
|
message_t msg;
|
|
kcp_peer_entry_t *entry;
|
|
|
|
protocol_message_init(&msg);
|
|
if (kcp_conn_receive(conn, &msg) != 0) {
|
|
protocol_message_clear(&msg);
|
|
return -1;
|
|
}
|
|
if (msg.type != MSG_TYPE_REGISTER) {
|
|
kcp_hub_send_server_error(conn, msg.from, "first message must be register");
|
|
protocol_message_clear(&msg);
|
|
errno = EPROTO;
|
|
return -1;
|
|
}
|
|
|
|
pthread_rwlock_wrlock(&hub->lock);
|
|
entry = kcp_hub_find_peer(hub, msg.from);
|
|
if (entry != NULL) {
|
|
char *error_text;
|
|
pthread_rwlock_unlock(&hub->lock);
|
|
error_text = omni_strdup_printf("duplicate peer id: %s", msg.from);
|
|
if (error_text != NULL) {
|
|
(void) kcp_hub_send_server_error(conn, msg.from, error_text);
|
|
free(error_text);
|
|
}
|
|
protocol_message_clear(&msg);
|
|
errno = EEXIST;
|
|
return -1;
|
|
}
|
|
|
|
entry = (kcp_peer_entry_t *) calloc(1, sizeof(*entry));
|
|
if (entry == NULL) {
|
|
pthread_rwlock_unlock(&hub->lock);
|
|
protocol_message_clear(&msg);
|
|
return -1;
|
|
}
|
|
snprintf(entry->peer_id, sizeof(entry->peer_id), "%s", msg.from);
|
|
entry->conn = conn;
|
|
entry->next = hub->peers;
|
|
hub->peers = entry;
|
|
pthread_rwlock_unlock(&hub->lock);
|
|
|
|
snprintf(peer_id, peer_id_len, "%s", msg.from);
|
|
protocol_message_clear(&msg);
|
|
return 0;
|
|
}
|
|
|
|
static void *kcp_hub_session_thread_main(void *arg) {
|
|
kcp_session_thread_ctx_t *ctx = (kcp_session_thread_ctx_t *) arg;
|
|
kcp_hub_serve_session(ctx->hub, ctx->conn);
|
|
free(ctx);
|
|
return NULL;
|
|
}
|
|
|
|
kcp_hub_t *kcp_hub_new(latency_logger_t *logger, kcp_session_stats_logger_t *stats_logger, int stats_interval_ms) {
|
|
kcp_hub_t *hub = (kcp_hub_t *) calloc(1, sizeof(*hub));
|
|
if (hub == NULL) {
|
|
return NULL;
|
|
}
|
|
pthread_rwlock_init(&hub->lock, NULL);
|
|
hub->logger = logger;
|
|
hub->stats_logger = stats_logger;
|
|
hub->stats_interval_ms = stats_interval_ms > 0 ? stats_interval_ms : KCP_DEFAULT_STATS_INTERVAL_MS;
|
|
hub->telemetry_interval_ms = KCP_HUB_DEFAULT_TELEMETRY_INTERVAL_MS;
|
|
hub->relay_fd = -1;
|
|
atomic_init(&hub->closed, 0);
|
|
return hub;
|
|
}
|
|
|
|
int kcp_hub_serve_listener(kcp_hub_t *hub, kcp_listener_t *listener) {
|
|
if (hub == NULL || listener == NULL) {
|
|
errno = EINVAL;
|
|
return -1;
|
|
}
|
|
while (!atomic_load(&hub->closed)) {
|
|
kcp_conn_t *conn = kcp_listener_accept(listener);
|
|
kcp_session_thread_ctx_t *ctx;
|
|
pthread_t thread;
|
|
|
|
if (conn == NULL) {
|
|
if (atomic_load(&hub->closed)) {
|
|
return 0;
|
|
}
|
|
return -1;
|
|
}
|
|
ctx = (kcp_session_thread_ctx_t *) calloc(1, sizeof(*ctx));
|
|
if (ctx == NULL) {
|
|
kcp_conn_close(conn);
|
|
kcp_conn_free(conn);
|
|
return -1;
|
|
}
|
|
ctx->hub = hub;
|
|
ctx->conn = conn;
|
|
if (pthread_create(&thread, NULL, kcp_hub_session_thread_main, ctx) != 0) {
|
|
free(ctx);
|
|
kcp_conn_close(conn);
|
|
kcp_conn_free(conn);
|
|
return -1;
|
|
}
|
|
pthread_detach(thread);
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
int kcp_hub_serve_session(kcp_hub_t *hub, kcp_conn_t *conn) {
|
|
char peer_id[OMNI_MAX_PEER_ID];
|
|
const char *node_id;
|
|
int rc = 0;
|
|
|
|
if (hub == NULL || conn == NULL) {
|
|
errno = EINVAL;
|
|
return -1;
|
|
}
|
|
peer_id[0] = '\0';
|
|
if (kcp_hub_register_conn(hub, conn, peer_id, sizeof(peer_id)) != 0) {
|
|
kcp_conn_close(conn);
|
|
kcp_conn_free(conn);
|
|
return -1;
|
|
}
|
|
if (kcp_hub_configure_peer_transport(conn, peer_id) != 0) {
|
|
kcp_hub_unregister(hub, peer_id, conn);
|
|
kcp_conn_close(conn);
|
|
kcp_conn_free(conn);
|
|
return -1;
|
|
}
|
|
node_id = kcp_hub_peer_node_id(peer_id);
|
|
if (kcp_conn_configure_runtime(conn, hub->logger, OMNI_NODE_ROLE_SERVER, node_id, hub->stats_logger, hub->stats_interval_ms) != 0) {
|
|
kcp_hub_unregister(hub, peer_id, conn);
|
|
kcp_conn_close(conn);
|
|
kcp_conn_free(conn);
|
|
return -1;
|
|
}
|
|
|
|
for (;;) {
|
|
message_t msg;
|
|
protocol_message_init(&msg);
|
|
if (kcp_conn_receive(conn, &msg) != 0) {
|
|
protocol_message_clear(&msg);
|
|
rc = -1;
|
|
break;
|
|
}
|
|
if (kcp_hub_handle_peer_message(hub, peer_id, conn, &msg) != 0) {
|
|
protocol_message_clear(&msg);
|
|
rc = -1;
|
|
break;
|
|
}
|
|
protocol_message_clear(&msg);
|
|
}
|
|
|
|
kcp_hub_unregister(hub, peer_id, conn);
|
|
kcp_conn_close(conn);
|
|
kcp_conn_free(conn);
|
|
return rc;
|
|
}
|
|
|
|
int kcp_hub_set_relay(kcp_hub_t *hub, int relay_fd, const struct sockaddr *peer_addr, socklen_t peer_addr_len, int learn_peer) {
|
|
if (hub == NULL || relay_fd < 0) {
|
|
errno = EINVAL;
|
|
return -1;
|
|
}
|
|
pthread_rwlock_wrlock(&hub->lock);
|
|
hub->relay_fd = relay_fd;
|
|
hub->relay_configured = 1;
|
|
hub->relay_learn_peer = learn_peer;
|
|
hub->relay_peer_addr_len = 0;
|
|
if (peer_addr != NULL && peer_addr_len > 0) {
|
|
omni_clone_sockaddr(peer_addr, peer_addr_len, &hub->relay_peer_addr, &hub->relay_peer_addr_len);
|
|
}
|
|
pthread_rwlock_unlock(&hub->lock);
|
|
return 0;
|
|
}
|
|
|
|
int kcp_hub_set_telemetry(kcp_hub_t *hub, const char *peer_id, int interval_ms) {
|
|
int start_thread = 0;
|
|
|
|
if (hub == NULL || peer_id == NULL) {
|
|
errno = EINVAL;
|
|
return -1;
|
|
}
|
|
pthread_rwlock_wrlock(&hub->lock);
|
|
snprintf(hub->telemetry_peer_id, sizeof(hub->telemetry_peer_id), "%s", peer_id);
|
|
hub->telemetry_interval_ms = interval_ms > 0 ? interval_ms : KCP_HUB_DEFAULT_TELEMETRY_INTERVAL_MS;
|
|
if (!hub->telemetry_thread_started && hub->telemetry_peer_id[0] != '\0') {
|
|
start_thread = 1;
|
|
hub->telemetry_thread_started = 1;
|
|
}
|
|
pthread_rwlock_unlock(&hub->lock);
|
|
|
|
if (start_thread) {
|
|
if (pthread_create(&hub->telemetry_thread, NULL, kcp_hub_telemetry_thread_main, hub) != 0) {
|
|
pthread_rwlock_wrlock(&hub->lock);
|
|
hub->telemetry_thread_started = 0;
|
|
hub->telemetry_peer_id[0] = '\0';
|
|
pthread_rwlock_unlock(&hub->lock);
|
|
return -1;
|
|
}
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
int kcp_hub_serve_relay(kcp_hub_t *hub) {
|
|
uint8_t buffer[KCP_RELAY_MAX_DATAGRAM_SIZE];
|
|
|
|
if (hub == NULL) {
|
|
errno = EINVAL;
|
|
return -1;
|
|
}
|
|
while (!atomic_load(&hub->closed)) {
|
|
struct sockaddr_storage source;
|
|
socklen_t source_len = sizeof(source);
|
|
ssize_t n;
|
|
message_t msg;
|
|
char err[128];
|
|
int relay_fd;
|
|
|
|
pthread_rwlock_rdlock(&hub->lock);
|
|
relay_fd = hub->relay_fd;
|
|
pthread_rwlock_unlock(&hub->lock);
|
|
if (relay_fd < 0) {
|
|
errno = ENOTCONN;
|
|
return -1;
|
|
}
|
|
|
|
n = recvfrom(relay_fd, buffer, sizeof(buffer), 0, (struct sockaddr *) &source, &source_len);
|
|
if (n < 0) {
|
|
if (atomic_load(&hub->closed)) {
|
|
return 0;
|
|
}
|
|
if (errno == EINTR) {
|
|
continue;
|
|
}
|
|
return -1;
|
|
}
|
|
if (!kcp_hub_accept_relay_peer(hub, (struct sockaddr *) &source, source_len)) {
|
|
continue;
|
|
}
|
|
|
|
protocol_message_init(&msg);
|
|
if (protocol_decode_message_datagram(buffer, (size_t) n, &msg, err, sizeof(err)) != 0) {
|
|
protocol_message_clear(&msg);
|
|
continue;
|
|
}
|
|
if (msg.type != MSG_TYPE_TEXT && msg.type != MSG_TYPE_FILE && msg.type != MSG_TYPE_BINARY && msg.type != MSG_TYPE_ERROR) {
|
|
protocol_message_clear(&msg);
|
|
continue;
|
|
}
|
|
(void) kcp_hub_deliver_relayed_message(hub, &msg);
|
|
protocol_message_clear(&msg);
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
int kcp_hub_close(kcp_hub_t *hub) {
|
|
if (hub == NULL) {
|
|
return 0;
|
|
}
|
|
if (!atomic_exchange(&hub->closed, 1)) {
|
|
if (hub->relay_fd >= 0) {
|
|
close(hub->relay_fd);
|
|
hub->relay_fd = -1;
|
|
}
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
void kcp_hub_free(kcp_hub_t *hub) {
|
|
kcp_peer_entry_t *entry;
|
|
kcp_peer_entry_t *next;
|
|
|
|
if (hub == NULL) {
|
|
return;
|
|
}
|
|
kcp_hub_close(hub);
|
|
if (hub->telemetry_thread_started) {
|
|
pthread_join(hub->telemetry_thread, NULL);
|
|
}
|
|
for (entry = hub->peers; entry != NULL; entry = next) {
|
|
next = entry->next;
|
|
if (entry->conn != NULL) {
|
|
kcp_conn_close(entry->conn);
|
|
}
|
|
free(entry);
|
|
}
|
|
pthread_rwlock_destroy(&hub->lock);
|
|
free(hub);
|
|
}
|