Files
OmniSocketGo/src/transport_kcp.c

1886 lines
68 KiB
C

#include "transport_kcp.h"
#include "ikcp.h"
#include "linux_timestamping.h"
#include <arpa/inet.h>
#include <netdb.h>
#include <stdatomic.h>
#include <unistd.h>
#define KCP_RECV_CHUNK_SIZE (32U * 1024U)
typedef struct kcp_packet_debug_pending {
struct kcp_packet_debug_pending *next;
uint32_t tx_id;
struct sockaddr_storage remote_addr;
socklen_t remote_addr_len;
int packet_bytes;
int has_conv;
uint32_t conv;
kcp_packet_debug_segment_t *segments;
size_t segment_count;
int saw_sched;
int saw_software;
} kcp_packet_debug_pending_t;
typedef struct kcp_socket_debug_state {
int fd;
char node_role[OMNI_MAX_NODE_ROLE];
char node_id[OMNI_MAX_PEER_ID];
kcp_packet_debug_logger_t *logger;
pthread_mutex_t write_mu;
pthread_mutex_t pending_mu;
pthread_t errqueue_thread;
int errqueue_thread_started;
uint32_t next_tx_id;
kcp_packet_debug_pending_t *pending_head;
atomic_int closed;
atomic_int last_send_errno;
} kcp_socket_debug_state_t;
typedef struct kcp_session_entry kcp_session_entry_t;
typedef struct kcp_process_sampler kcp_process_sampler_t;
struct kcp_conn {
ikcpcb *kcp;
int fd;
int is_client;
int owns_socket;
int socket_closed;
atomic_int closed;
struct sockaddr_storage remote_addr;
socklen_t remote_addr_len;
pthread_mutex_t kcp_mu;
pthread_mutex_t close_mu;
pthread_cond_t rx_cond;
pthread_t recv_thread;
int recv_thread_started;
pthread_t update_thread;
int update_thread_started;
pthread_t stats_thread;
int stats_thread_started;
kcp_conn_options_t options;
int update_interval_ms;
uint64_t pending_bytes_sent;
uint64_t pending_bytes_received;
uint64_t pending_in_pkts;
uint64_t pending_out_pkts;
uint64_t pending_in_segs;
uint64_t pending_out_segs;
uint64_t pending_in_errs;
uint64_t pending_kcp_in_errs;
protocol_frame_decoder_t decoder;
uint8_t scratch[KCP_RECV_CHUNK_SIZE];
latency_logger_t *logger;
char node_role[OMNI_MAX_NODE_ROLE];
char node_id[OMNI_MAX_PEER_ID];
kcp_session_stats_logger_t *stats_logger;
int stats_interval_ms;
kcp_process_sampler_t *process_sampler;
kcp_socket_debug_state_t *sock_state;
struct kcp_listener *listener;
struct kcp_conn *accept_next;
struct kcp_conn *process_next;
};
struct kcp_listener {
int fd;
int closed;
pthread_mutex_t lock;
pthread_mutex_t accept_mu;
pthread_cond_t accept_cond;
pthread_t recv_thread;
int recv_thread_started;
kcp_session_entry_t *sessions;
kcp_conn_t *accept_head;
kcp_conn_t *accept_tail;
kcp_socket_debug_state_t sock_state;
};
struct kcp_session_entry {
uint32_t conv;
kcp_conn_t *conn;
kcp_session_entry_t *next;
};
struct kcp_process_sampler {
kcp_process_sampler_t *next;
kcp_session_stats_logger_t *logger;
char node_role[OMNI_MAX_NODE_ROLE];
char node_id[OMNI_MAX_PEER_ID];
int stats_interval_ms;
pthread_mutex_t lock;
pthread_cond_t cond;
pthread_t thread;
int thread_started;
int stopped;
int refcount;
int request_pending;
uint64_t pending_request_id;
uint64_t completed_request_id;
char pending_reason[32];
kcp_conn_t *members;
uint64_t prev_bytes_sent;
uint64_t prev_bytes_received;
uint64_t prev_in_pkts;
uint64_t prev_out_pkts;
uint64_t prev_in_segs;
uint64_t prev_out_segs;
uint64_t prev_in_errs;
uint64_t prev_kcp_in_errs;
atomic_uint_fast64_t bytes_sent;
atomic_uint_fast64_t bytes_received;
atomic_uint_fast64_t in_pkts;
atomic_uint_fast64_t out_pkts;
atomic_uint_fast64_t in_segs;
atomic_uint_fast64_t out_segs;
atomic_uint_fast64_t in_errs;
atomic_uint_fast64_t kcp_in_errs;
atomic_uint_fast64_t curr_estab;
};
static pthread_mutex_t g_kcp_process_sampler_mu = PTHREAD_MUTEX_INITIALIZER;
static kcp_process_sampler_t *g_kcp_process_samplers = NULL;
void kcp_conn_options_init(kcp_conn_options_t *options) {
if (options == NULL) {
return;
}
memset(options, 0, sizeof(*options));
options->nodelay = KCP_DEFAULT_NODELAY;
options->interval_ms = KCP_DEFAULT_INTERVAL_MS;
options->resend = KCP_DEFAULT_RESEND;
options->nc = KCP_DEFAULT_NC;
options->sndwnd = KCP_DEFAULT_SND_WND;
options->rcvwnd = KCP_DEFAULT_RCV_WND;
options->mtu = KCP_DEFAULT_MTU;
}
void kcp_conn_options_set_control_defaults(kcp_conn_options_t *options) {
if (options == NULL) {
return;
}
memset(options, 0, sizeof(*options));
options->nodelay = KCP_CONTROL_NODELAY;
options->interval_ms = KCP_CONTROL_INTERVAL_MS;
options->resend = KCP_CONTROL_RESEND;
options->nc = KCP_CONTROL_NC;
options->sndwnd = KCP_CONTROL_SND_WND;
options->rcvwnd = KCP_CONTROL_RCV_WND;
options->mtu = KCP_CONTROL_MTU;
}
void kcp_conn_options_set_video_defaults(kcp_conn_options_t *options) {
if (options == NULL) {
return;
}
memset(options, 0, sizeof(*options));
options->nodelay = KCP_VIDEO_NODELAY;
options->interval_ms = KCP_VIDEO_INTERVAL_MS;
options->resend = KCP_VIDEO_RESEND;
options->nc = KCP_VIDEO_NC;
options->sndwnd = KCP_VIDEO_SND_WND;
options->rcvwnd = KCP_VIDEO_RCV_WND;
options->mtu = KCP_VIDEO_MTU;
}
static int kcp_conn_validate_options(const kcp_conn_options_t *options) {
if (options == NULL) {
errno = EINVAL;
return -1;
}
if (options->interval_ms <= 0 || options->sndwnd <= 0 || options->rcvwnd <= 0 || options->mtu <= 0) {
errno = EINVAL;
return -1;
}
return 0;
}
static int kcp_conn_apply_options_locked(kcp_conn_t *conn, const kcp_conn_options_t *options) {
if (conn == NULL || conn->kcp == NULL || kcp_conn_validate_options(options) != 0) {
return -1;
}
if (ikcp_wndsize(conn->kcp, options->sndwnd, options->rcvwnd) != 0) {
errno = EINVAL;
return -1;
}
if (ikcp_setmtu(conn->kcp, options->mtu) != 0) {
errno = EINVAL;
return -1;
}
if (ikcp_nodelay(conn->kcp, options->nodelay, options->interval_ms, options->resend, options->nc) != 0) {
errno = EINVAL;
return -1;
}
conn->kcp->stream = 1;
conn->options = *options;
conn->update_interval_ms = options->interval_ms;
return 0;
}
static void kcp_parse_packet_segments(const uint8_t *packet, size_t len, uint32_t *conv, kcp_packet_debug_segment_t **segments, size_t *segment_count) {
size_t offset = 0;
size_t count = 0;
kcp_packet_debug_segment_t *items = NULL;
if (conv != NULL) {
*conv = 0;
}
if (segments != NULL) {
*segments = NULL;
}
if (segment_count != NULL) {
*segment_count = 0;
}
if (len < 4) {
return;
}
if (conv != NULL) {
*conv = (uint32_t) ((unsigned char) packet[0] |
((unsigned char) packet[1] << 8) |
((unsigned char) packet[2] << 16) |
((unsigned char) packet[3] << 24));
}
while (offset + 24U <= len) {
uint32_t seg_len = (uint32_t) ((unsigned char) packet[offset + 20] |
((unsigned char) packet[offset + 21] << 8) |
((unsigned char) packet[offset + 22] << 16) |
((unsigned char) packet[offset + 23] << 24));
if (offset + 24U + seg_len > len) {
free(items);
return;
}
if (segments != NULL) {
kcp_packet_debug_segment_t *next = (kcp_packet_debug_segment_t *) realloc(items, (count + 1U) * sizeof(*items));
if (next == NULL) {
free(items);
return;
}
items = next;
items[count].cmd = packet[offset + 4];
items[count].frg = packet[offset + 5];
items[count].wnd = (uint16_t) ((unsigned char) packet[offset + 6] | ((unsigned char) packet[offset + 7] << 8));
items[count].sn = (uint32_t) ((unsigned char) packet[offset + 12] |
((unsigned char) packet[offset + 13] << 8) |
((unsigned char) packet[offset + 14] << 16) |
((unsigned char) packet[offset + 15] << 24));
items[count].una = (uint32_t) ((unsigned char) packet[offset + 16] |
((unsigned char) packet[offset + 17] << 8) |
((unsigned char) packet[offset + 18] << 16) |
((unsigned char) packet[offset + 19] << 24));
items[count].len = seg_len;
}
count++;
offset += 24U + seg_len;
}
if (segments != NULL) {
*segments = items;
} else {
free(items);
}
if (segment_count != NULL) {
*segment_count = count;
}
}
static uint64_t kcp_counter_diff(uint64_t previous, uint64_t current) {
return current < previous ? 0 : current - previous;
}
static int kcp_process_sampler_matches(const kcp_process_sampler_t *sampler, kcp_session_stats_logger_t *logger, const char *node_role, const char *node_id, int stats_interval_ms) {
if (sampler == NULL) {
return 0;
}
return sampler->logger == logger &&
sampler->stats_interval_ms == stats_interval_ms &&
strcmp(sampler->node_role, node_role == NULL ? "" : node_role) == 0 &&
strcmp(sampler->node_id, node_id == NULL ? "" : node_id) == 0;
}
static void kcp_process_sampler_record_send(kcp_process_sampler_t *sampler, int packet_bytes, size_t segments) {
if (sampler == NULL) {
return;
}
atomic_fetch_add_explicit(&sampler->bytes_sent, (uint64_t) packet_bytes, memory_order_relaxed);
atomic_fetch_add_explicit(&sampler->out_pkts, 1, memory_order_relaxed);
atomic_fetch_add_explicit(&sampler->out_segs, (uint64_t) segments, memory_order_relaxed);
}
static void kcp_process_sampler_record_input(kcp_process_sampler_t *sampler, int packet_bytes, size_t segments) {
if (sampler == NULL) {
return;
}
atomic_fetch_add_explicit(&sampler->bytes_received, (uint64_t) packet_bytes, memory_order_relaxed);
atomic_fetch_add_explicit(&sampler->in_pkts, 1, memory_order_relaxed);
atomic_fetch_add_explicit(&sampler->in_segs, (uint64_t) segments, memory_order_relaxed);
}
static void kcp_process_sampler_record_error(kcp_process_sampler_t *sampler) {
if (sampler == NULL) {
return;
}
atomic_fetch_add_explicit(&sampler->in_errs, 1, memory_order_relaxed);
atomic_fetch_add_explicit(&sampler->kcp_in_errs, 1, memory_order_relaxed);
}
static void kcp_conn_record_send(kcp_conn_t *conn, int packet_bytes, size_t segments) {
if (conn == NULL) {
return;
}
if (conn->process_sampler != NULL) {
kcp_process_sampler_record_send(conn->process_sampler, packet_bytes, segments);
return;
}
conn->pending_bytes_sent += (uint64_t) packet_bytes;
conn->pending_out_pkts += 1;
conn->pending_out_segs += (uint64_t) segments;
}
static void kcp_conn_record_input(kcp_conn_t *conn, int packet_bytes, size_t segments) {
if (conn == NULL) {
return;
}
if (conn->process_sampler != NULL) {
kcp_process_sampler_record_input(conn->process_sampler, packet_bytes, segments);
return;
}
conn->pending_bytes_received += (uint64_t) packet_bytes;
conn->pending_in_pkts += 1;
conn->pending_in_segs += (uint64_t) segments;
}
static void kcp_conn_record_error(kcp_conn_t *conn) {
if (conn == NULL) {
return;
}
if (conn->process_sampler != NULL) {
kcp_process_sampler_record_error(conn->process_sampler);
return;
}
conn->pending_in_errs += 1;
conn->pending_kcp_in_errs += 1;
}
static void kcp_process_sampler_curr_estab_inc(kcp_process_sampler_t *sampler) {
if (sampler == NULL) {
return;
}
atomic_fetch_add_explicit(&sampler->curr_estab, 1, memory_order_relaxed);
}
static void kcp_process_sampler_curr_estab_dec(kcp_process_sampler_t *sampler) {
uint_fast64_t current;
if (sampler == NULL) {
return;
}
current = atomic_load_explicit(&sampler->curr_estab, memory_order_relaxed);
while (current > 0) {
if (atomic_compare_exchange_weak_explicit(&sampler->curr_estab, &current, current - 1U, memory_order_relaxed, memory_order_relaxed)) {
return;
}
}
}
static void kcp_process_sampler_add_conn(kcp_process_sampler_t *sampler, kcp_conn_t *conn) {
if (sampler == NULL || conn == NULL) {
return;
}
pthread_mutex_lock(&sampler->lock);
conn->process_next = sampler->members;
sampler->members = conn;
pthread_mutex_unlock(&sampler->lock);
kcp_process_sampler_curr_estab_inc(sampler);
}
static void kcp_process_sampler_remove_conn(kcp_process_sampler_t *sampler, kcp_conn_t *conn) {
kcp_conn_t *prev = NULL;
kcp_conn_t *cur;
if (sampler == NULL || conn == NULL) {
return;
}
pthread_mutex_lock(&sampler->lock);
for (cur = sampler->members; cur != NULL; cur = cur->process_next) {
if (cur == conn) {
if (prev == NULL) {
sampler->members = cur->process_next;
} else {
prev->process_next = cur->process_next;
}
conn->process_next = NULL;
break;
}
prev = cur;
}
pthread_mutex_unlock(&sampler->lock);
if (cur == conn) {
kcp_process_sampler_curr_estab_dec(sampler);
}
}
static void kcp_process_sampler_collect_gauges(kcp_process_sampler_t *sampler, uint64_t *snd_queue, uint64_t *rcv_queue, uint64_t *snd_buffer) {
kcp_conn_t *conn;
if (snd_queue != NULL) {
*snd_queue = 0;
}
if (rcv_queue != NULL) {
*rcv_queue = 0;
}
if (snd_buffer != NULL) {
*snd_buffer = 0;
}
if (sampler == NULL) {
return;
}
pthread_mutex_lock(&sampler->lock);
for (conn = sampler->members; conn != NULL; conn = conn->process_next) {
pthread_mutex_lock(&conn->kcp_mu);
if (conn->kcp != NULL) {
if (snd_queue != NULL) {
*snd_queue += conn->kcp->nsnd_que;
}
if (rcv_queue != NULL) {
*rcv_queue += conn->kcp->nrcv_que;
}
if (snd_buffer != NULL) {
*snd_buffer += conn->kcp->nsnd_buf;
}
}
pthread_mutex_unlock(&conn->kcp_mu);
}
pthread_mutex_unlock(&sampler->lock);
}
static void kcp_process_sampler_log_snapshot(kcp_process_sampler_t *sampler, const char *reason) {
kcp_session_stats_record_t record;
uint64_t bytes_sent;
uint64_t bytes_received;
uint64_t in_pkts;
uint64_t out_pkts;
uint64_t in_segs;
uint64_t out_segs;
uint64_t in_errs;
uint64_t kcp_in_errs;
uint64_t snd_queue = 0;
uint64_t rcv_queue = 0;
uint64_t snd_buffer = 0;
if (sampler == NULL || sampler->logger == NULL) {
return;
}
bytes_sent = atomic_load_explicit(&sampler->bytes_sent, memory_order_relaxed);
bytes_received = atomic_load_explicit(&sampler->bytes_received, memory_order_relaxed);
in_pkts = atomic_load_explicit(&sampler->in_pkts, memory_order_relaxed);
out_pkts = atomic_load_explicit(&sampler->out_pkts, memory_order_relaxed);
in_segs = atomic_load_explicit(&sampler->in_segs, memory_order_relaxed);
out_segs = atomic_load_explicit(&sampler->out_segs, memory_order_relaxed);
in_errs = atomic_load_explicit(&sampler->in_errs, memory_order_relaxed);
kcp_in_errs = atomic_load_explicit(&sampler->kcp_in_errs, memory_order_relaxed);
kcp_process_sampler_collect_gauges(sampler, &snd_queue, &rcv_queue, &snd_buffer);
memset(&record, 0, sizeof(record));
snprintf(record.record_type, sizeof(record.record_type), "%s", KCP_SESSION_STATS_RECORD_PROCESS_SAMPLE);
snprintf(record.node_role, sizeof(record.node_role), "%s", sampler->node_role);
snprintf(record.node_id, sizeof(record.node_id), "%s", sampler->node_id);
snprintf(record.sample_reason, sizeof(record.sample_reason), "%s", reason == NULL ? "" : reason);
record.ts_unix_nano = omni_now_unix_nano();
record.has_bytes_sent = 1;
record.bytes_sent = kcp_counter_diff(sampler->prev_bytes_sent, bytes_sent);
record.has_bytes_received = 1;
record.bytes_received = kcp_counter_diff(sampler->prev_bytes_received, bytes_received);
record.has_in_pkts = 1;
record.in_pkts = kcp_counter_diff(sampler->prev_in_pkts, in_pkts);
record.has_out_pkts = 1;
record.out_pkts = kcp_counter_diff(sampler->prev_out_pkts, out_pkts);
record.has_in_segs = 1;
record.in_segs = kcp_counter_diff(sampler->prev_in_segs, in_segs);
record.has_out_segs = 1;
record.out_segs = kcp_counter_diff(sampler->prev_out_segs, out_segs);
record.has_in_errs = 1;
record.in_errs = kcp_counter_diff(sampler->prev_in_errs, in_errs);
record.has_kcp_in_errs = 1;
record.kcp_in_errs = kcp_counter_diff(sampler->prev_kcp_in_errs, kcp_in_errs);
record.has_ring_buffer_snd_queue = 1;
record.ring_buffer_snd_queue = snd_queue;
record.has_ring_buffer_rcv_queue = 1;
record.ring_buffer_rcv_queue = rcv_queue;
record.has_ring_buffer_snd_buffer = 1;
record.ring_buffer_snd_buffer = snd_buffer;
record.has_curr_estab = 1;
record.curr_estab = atomic_load_explicit(&sampler->curr_estab, memory_order_relaxed);
sampler->prev_bytes_sent = bytes_sent;
sampler->prev_bytes_received = bytes_received;
sampler->prev_in_pkts = in_pkts;
sampler->prev_out_pkts = out_pkts;
sampler->prev_in_segs = in_segs;
sampler->prev_out_segs = out_segs;
sampler->prev_in_errs = in_errs;
sampler->prev_kcp_in_errs = kcp_in_errs;
(void) kcp_session_stats_log(sampler->logger, &record);
}
static void *kcp_process_sampler_thread_main(void *arg) {
kcp_process_sampler_t *sampler = (kcp_process_sampler_t *) arg;
for (;;) {
int has_request = 0;
uint64_t request_id = 0;
char reason[32];
struct timespec deadline;
clock_gettime(CLOCK_REALTIME, &deadline);
deadline.tv_sec += sampler->stats_interval_ms / 1000;
deadline.tv_nsec += (long) (sampler->stats_interval_ms % 1000) * 1000000L;
if (deadline.tv_nsec >= 1000000000L) {
deadline.tv_sec += 1;
deadline.tv_nsec -= 1000000000L;
}
pthread_mutex_lock(&sampler->lock);
while (!sampler->stopped && !sampler->request_pending) {
int wait_rc = pthread_cond_timedwait(&sampler->cond, &sampler->lock, &deadline);
if (wait_rc == ETIMEDOUT) {
break;
}
}
if (sampler->stopped) {
pthread_mutex_unlock(&sampler->lock);
return NULL;
}
if (sampler->request_pending) {
has_request = 1;
request_id = sampler->pending_request_id;
snprintf(reason, sizeof(reason), "%s", sampler->pending_reason);
sampler->request_pending = 0;
} else {
snprintf(reason, sizeof(reason), "%s", "periodic");
}
pthread_mutex_unlock(&sampler->lock);
kcp_process_sampler_log_snapshot(sampler, reason);
if (has_request) {
pthread_mutex_lock(&sampler->lock);
if (request_id > sampler->completed_request_id) {
sampler->completed_request_id = request_id;
}
pthread_cond_broadcast(&sampler->cond);
pthread_mutex_unlock(&sampler->lock);
}
}
}
static kcp_process_sampler_t *kcp_process_sampler_acquire(kcp_session_stats_logger_t *logger, const char *node_role, const char *node_id, int stats_interval_ms) {
kcp_process_sampler_t *sampler;
if (logger == NULL) {
return NULL;
}
pthread_mutex_lock(&g_kcp_process_sampler_mu);
for (sampler = g_kcp_process_samplers; sampler != NULL; sampler = sampler->next) {
if (kcp_process_sampler_matches(sampler, logger, node_role, node_id, stats_interval_ms)) {
sampler->refcount++;
pthread_mutex_unlock(&g_kcp_process_sampler_mu);
return sampler;
}
}
sampler = (kcp_process_sampler_t *) calloc(1, sizeof(*sampler));
if (sampler == NULL) {
pthread_mutex_unlock(&g_kcp_process_sampler_mu);
return NULL;
}
sampler->logger = logger;
sampler->stats_interval_ms = stats_interval_ms > 0 ? stats_interval_ms : KCP_DEFAULT_STATS_INTERVAL_MS;
sampler->refcount = 1;
snprintf(sampler->node_role, sizeof(sampler->node_role), "%s", node_role == NULL ? "" : node_role);
snprintf(sampler->node_id, sizeof(sampler->node_id), "%s", node_id == NULL ? "" : node_id);
pthread_mutex_init(&sampler->lock, NULL);
pthread_cond_init(&sampler->cond, NULL);
if (pthread_create(&sampler->thread, NULL, kcp_process_sampler_thread_main, sampler) != 0) {
pthread_cond_destroy(&sampler->cond);
pthread_mutex_destroy(&sampler->lock);
free(sampler);
pthread_mutex_unlock(&g_kcp_process_sampler_mu);
return NULL;
}
sampler->thread_started = 1;
sampler->next = g_kcp_process_samplers;
g_kcp_process_samplers = sampler;
pthread_mutex_unlock(&g_kcp_process_sampler_mu);
return sampler;
}
static void kcp_process_sampler_release(kcp_process_sampler_t *sampler) {
kcp_process_sampler_t **cursor;
if (sampler == NULL) {
return;
}
pthread_mutex_lock(&g_kcp_process_sampler_mu);
sampler->refcount--;
if (sampler->refcount > 0) {
pthread_mutex_unlock(&g_kcp_process_sampler_mu);
return;
}
for (cursor = &g_kcp_process_samplers; *cursor != NULL; cursor = &(*cursor)->next) {
if (*cursor == sampler) {
*cursor = sampler->next;
break;
}
}
pthread_mutex_unlock(&g_kcp_process_sampler_mu);
pthread_mutex_lock(&sampler->lock);
sampler->stopped = 1;
pthread_cond_broadcast(&sampler->cond);
pthread_mutex_unlock(&sampler->lock);
if (sampler->thread_started) {
pthread_join(sampler->thread, NULL);
}
pthread_cond_destroy(&sampler->cond);
pthread_mutex_destroy(&sampler->lock);
free(sampler);
}
static void kcp_process_sampler_request_sample(kcp_process_sampler_t *sampler, const char *reason) {
if (sampler == NULL) {
return;
}
pthread_mutex_lock(&sampler->lock);
if (!sampler->stopped && !sampler->request_pending) {
sampler->request_pending = 1;
sampler->pending_request_id++;
snprintf(sampler->pending_reason, sizeof(sampler->pending_reason), "%s", reason == NULL ? "" : reason);
pthread_cond_broadcast(&sampler->cond);
}
pthread_mutex_unlock(&sampler->lock);
}
static void kcp_process_sampler_request_sample_and_wait(kcp_process_sampler_t *sampler, const char *reason) {
uint64_t request_id;
if (sampler == NULL) {
return;
}
pthread_mutex_lock(&sampler->lock);
if (sampler->stopped) {
pthread_mutex_unlock(&sampler->lock);
return;
}
sampler->request_pending = 1;
request_id = ++sampler->pending_request_id;
snprintf(sampler->pending_reason, sizeof(sampler->pending_reason), "%s", reason == NULL ? "" : reason);
pthread_cond_broadcast(&sampler->cond);
while (!sampler->stopped && sampler->completed_request_id < request_id) {
pthread_cond_wait(&sampler->cond, &sampler->lock);
}
pthread_mutex_unlock(&sampler->lock);
}
static int kcp_socket_debug_log_record(kcp_socket_debug_state_t *state, const char *event_name, const struct sockaddr_storage *remote_addr, socklen_t remote_addr_len, int packet_bytes, int has_tx_id, uint32_t tx_id, int has_conv, uint32_t conv, const kcp_packet_debug_segment_t *segments, size_t segment_count, int64_t ts_unix_nano) {
char local_addr_text[OMNI_MAX_ADDR_TEXT];
char remote_addr_text[OMNI_MAX_ADDR_TEXT];
struct sockaddr_storage local_addr;
socklen_t local_addr_len = sizeof(local_addr);
kcp_packet_debug_record_t record;
if (state->logger == NULL) {
return 0;
}
memset(&record, 0, sizeof(record));
getsockname(state->fd, (struct sockaddr *) &local_addr, &local_addr_len);
omni_sockaddr_to_string((struct sockaddr *) &local_addr, local_addr_len, local_addr_text, sizeof(local_addr_text));
omni_sockaddr_to_string((const struct sockaddr *) remote_addr, remote_addr_len, remote_addr_text, sizeof(remote_addr_text));
snprintf(record.event, sizeof(record.event), "%s", event_name);
snprintf(record.node_role, sizeof(record.node_role), "%s", state->node_role);
snprintf(record.node_id, sizeof(record.node_id), "%s", state->node_id);
snprintf(record.local_addr, sizeof(record.local_addr), "%s", local_addr_text);
snprintf(record.remote_addr, sizeof(record.remote_addr), "%s", remote_addr_text);
record.packet_bytes = packet_bytes;
record.has_udp_tx_id = has_tx_id;
record.udp_tx_id = tx_id;
record.has_kcp_conv = has_conv;
record.kcp_conv = conv;
record.ts_unix_nano = ts_unix_nano;
if (segment_count > 0) {
record.segments = (kcp_packet_debug_segment_t *) calloc(segment_count, sizeof(*record.segments));
if (record.segments == NULL) {
return -1;
}
memcpy(record.segments, segments, segment_count * sizeof(*segments));
record.segment_count = segment_count;
}
kcp_packet_debug_log(state->logger, &record);
kcp_packet_debug_record_clear(&record);
return 0;
}
static void kcp_socket_debug_pending_free(kcp_packet_debug_pending_t *pending) {
while (pending != NULL) {
kcp_packet_debug_pending_t *next = pending->next;
free(pending->segments);
free(pending);
pending = next;
}
}
static int kcp_socket_debug_reserve_tx(kcp_socket_debug_state_t *state, const struct sockaddr_storage *remote_addr, socklen_t remote_addr_len, const uint8_t *packet, size_t packet_len, uint32_t *out_tx_id) {
kcp_packet_debug_pending_t *pending;
if (state->logger == NULL) {
*out_tx_id = 0;
return 0;
}
pending = (kcp_packet_debug_pending_t *) calloc(1, sizeof(*pending));
if (pending == NULL) {
return -1;
}
pending->tx_id = state->next_tx_id++;
pending->packet_bytes = (int) packet_len;
memcpy(&pending->remote_addr, remote_addr, sizeof(*remote_addr));
pending->remote_addr_len = remote_addr_len;
kcp_parse_packet_segments(packet, packet_len, &pending->conv, &pending->segments, &pending->segment_count);
pending->has_conv = packet_len >= 4;
pthread_mutex_lock(&state->pending_mu);
pending->next = state->pending_head;
state->pending_head = pending;
pthread_mutex_unlock(&state->pending_mu);
*out_tx_id = pending->tx_id;
return 0;
}
static void kcp_socket_debug_rollback_tx(kcp_socket_debug_state_t *state, uint32_t tx_id) {
kcp_packet_debug_pending_t *prev = NULL;
kcp_packet_debug_pending_t *cur;
pthread_mutex_lock(&state->pending_mu);
for (cur = state->pending_head; cur != NULL; cur = cur->next) {
if (cur->tx_id == tx_id) {
if (prev == NULL) {
state->pending_head = cur->next;
} else {
prev->next = cur->next;
}
free(cur->segments);
free(cur);
break;
}
prev = cur;
}
pthread_mutex_unlock(&state->pending_mu);
}
static void *kcp_socket_debug_errqueue_thread(void *arg) {
kcp_socket_debug_state_t *state = (kcp_socket_debug_state_t *) arg;
uint8_t control[512];
uint8_t dummy = 0;
struct iovec iov;
struct msghdr msg;
while (!atomic_load(&state->closed)) {
ssize_t rc;
omni_tx_timestamp_event_t event;
kcp_packet_debug_pending_t *prev = NULL;
kcp_packet_debug_pending_t *cur = NULL;
memset(&msg, 0, sizeof(msg));
iov.iov_base = &dummy;
iov.iov_len = sizeof(dummy);
msg.msg_iov = &iov;
msg.msg_iovlen = 1;
msg.msg_control = control;
msg.msg_controllen = sizeof(control);
rc = recvmsg(state->fd, &msg, MSG_ERRQUEUE | MSG_DONTWAIT);
if (rc < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
usleep(10000);
continue;
}
if (atomic_load(&state->closed)) {
return NULL;
}
usleep(10000);
continue;
}
if (linux_timestamping_parse_tx_timestamp(&msg, &event) != 0) {
continue;
}
pthread_mutex_lock(&state->pending_mu);
for (cur = state->pending_head; cur != NULL; cur = cur->next) {
if (cur->tx_id == event.ee_data) {
break;
}
prev = cur;
}
if (cur != NULL) {
if (strcmp(event.event_name, EVENT_A_TX_SCHED) == 0) {
cur->saw_sched = 1;
} else if (strcmp(event.event_name, EVENT_A_TX_SOFTWARE) == 0) {
cur->saw_software = 1;
}
kcp_socket_debug_log_record(state, event.event_name, &cur->remote_addr, cur->remote_addr_len, cur->packet_bytes, 1, cur->tx_id, cur->has_conv, cur->conv, cur->segments, cur->segment_count, event.ts_unix_nano);
if (cur->saw_sched && cur->saw_software) {
if (prev == NULL) {
state->pending_head = cur->next;
} else {
prev->next = cur->next;
}
free(cur->segments);
free(cur);
}
}
pthread_mutex_unlock(&state->pending_mu);
}
return NULL;
}
static int kcp_socket_debug_init(kcp_socket_debug_state_t *state, int fd, kcp_packet_debug_logger_t *logger, const char *node_role, const char *node_id) {
int thread_rc;
memset(state, 0, sizeof(*state));
state->fd = fd;
state->logger = logger;
snprintf(state->node_role, sizeof(state->node_role), "%s", node_role == NULL ? "" : node_role);
snprintf(state->node_id, sizeof(state->node_id), "%s", node_id == NULL ? "" : node_id);
pthread_mutex_init(&state->write_mu, NULL);
pthread_mutex_init(&state->pending_mu, NULL);
if (logger != NULL) {
if (linux_timestamping_enable_udp_socket(fd, 1) != 0) {
pthread_mutex_destroy(&state->write_mu);
pthread_mutex_destroy(&state->pending_mu);
return -1;
}
thread_rc = pthread_create(&state->errqueue_thread, NULL, kcp_socket_debug_errqueue_thread, state);
if (thread_rc != 0) {
errno = thread_rc;
pthread_mutex_destroy(&state->write_mu);
pthread_mutex_destroy(&state->pending_mu);
return -1;
}
state->errqueue_thread_started = 1;
}
return 0;
}
static void kcp_socket_debug_destroy(kcp_socket_debug_state_t *state) {
atomic_store(&state->closed, 1);
if (state->errqueue_thread_started) {
pthread_join(state->errqueue_thread, NULL);
}
kcp_socket_debug_pending_free(state->pending_head);
pthread_mutex_destroy(&state->write_mu);
pthread_mutex_destroy(&state->pending_mu);
}
static int kcp_socket_send_packet(kcp_socket_debug_state_t *state, const struct sockaddr_storage *remote_addr, socklen_t remote_addr_len, const uint8_t *packet, size_t packet_len) {
uint32_t tx_id = 0;
ssize_t rc;
if (state->logger != NULL && kcp_socket_debug_reserve_tx(state, remote_addr, remote_addr_len, packet, packet_len, &tx_id) != 0) {
atomic_store(&state->last_send_errno, errno != 0 ? errno : EIO);
return -1;
}
pthread_mutex_lock(&state->write_mu);
rc = sendto(state->fd, packet, packet_len, 0, (const struct sockaddr *) remote_addr, remote_addr_len);
pthread_mutex_unlock(&state->write_mu);
if (rc < 0 || (size_t) rc != packet_len) {
if (rc >= 0 && (size_t) rc != packet_len && errno == 0) {
errno = EIO;
}
atomic_store(&state->last_send_errno, errno != 0 ? errno : EIO);
if (state->logger != NULL) {
kcp_socket_debug_rollback_tx(state, tx_id);
}
return -1;
}
atomic_store(&state->last_send_errno, 0);
return 0;
}
static int kcp_output_callback_impl(const char *buf, int len, struct IKCPCB *kcp, void *user) {
kcp_conn_t *conn = (kcp_conn_t *) user;
size_t segment_count = 0;
(void) kcp;
if (conn == NULL || atomic_load(&conn->closed)) {
return -1;
}
kcp_parse_packet_segments((const uint8_t *) buf, (size_t) len, NULL, NULL, &segment_count);
if (kcp_socket_send_packet(conn->sock_state, &conn->remote_addr, conn->remote_addr_len, (const uint8_t *) buf, (size_t) len) != 0) {
return -1;
}
kcp_conn_record_send(conn, len, segment_count);
return len;
}
static int kcp_conn_attach_process_sampler(kcp_conn_t *conn) {
kcp_process_sampler_t *next_sampler;
kcp_process_sampler_t *previous_sampler;
uint64_t pending_bytes_sent = 0;
uint64_t pending_bytes_received = 0;
uint64_t pending_in_pkts = 0;
uint64_t pending_out_pkts = 0;
uint64_t pending_in_segs = 0;
uint64_t pending_out_segs = 0;
uint64_t pending_in_errs = 0;
uint64_t pending_kcp_in_errs = 0;
if (conn == NULL) {
errno = EINVAL;
return -1;
}
next_sampler = kcp_process_sampler_acquire(conn->stats_logger, conn->node_role, conn->node_id, conn->stats_interval_ms);
if (conn->stats_logger != NULL && next_sampler == NULL) {
return -1;
}
previous_sampler = conn->process_sampler;
if (previous_sampler == next_sampler) {
return 0;
}
if (next_sampler != NULL) {
kcp_process_sampler_add_conn(next_sampler, conn);
}
pthread_mutex_lock(&conn->kcp_mu);
previous_sampler = conn->process_sampler;
conn->process_sampler = next_sampler;
pending_bytes_sent = conn->pending_bytes_sent;
pending_bytes_received = conn->pending_bytes_received;
pending_in_pkts = conn->pending_in_pkts;
pending_out_pkts = conn->pending_out_pkts;
pending_in_segs = conn->pending_in_segs;
pending_out_segs = conn->pending_out_segs;
pending_in_errs = conn->pending_in_errs;
pending_kcp_in_errs = conn->pending_kcp_in_errs;
conn->pending_bytes_sent = 0;
conn->pending_bytes_received = 0;
conn->pending_in_pkts = 0;
conn->pending_out_pkts = 0;
conn->pending_in_segs = 0;
conn->pending_out_segs = 0;
conn->pending_in_errs = 0;
conn->pending_kcp_in_errs = 0;
pthread_mutex_unlock(&conn->kcp_mu);
if (next_sampler != NULL) {
atomic_fetch_add_explicit(&next_sampler->bytes_sent, pending_bytes_sent, memory_order_relaxed);
atomic_fetch_add_explicit(&next_sampler->bytes_received, pending_bytes_received, memory_order_relaxed);
atomic_fetch_add_explicit(&next_sampler->in_pkts, pending_in_pkts, memory_order_relaxed);
atomic_fetch_add_explicit(&next_sampler->out_pkts, pending_out_pkts, memory_order_relaxed);
atomic_fetch_add_explicit(&next_sampler->in_segs, pending_in_segs, memory_order_relaxed);
atomic_fetch_add_explicit(&next_sampler->out_segs, pending_out_segs, memory_order_relaxed);
atomic_fetch_add_explicit(&next_sampler->in_errs, pending_in_errs, memory_order_relaxed);
atomic_fetch_add_explicit(&next_sampler->kcp_in_errs, pending_kcp_in_errs, memory_order_relaxed);
}
if (previous_sampler != NULL) {
kcp_process_sampler_remove_conn(previous_sampler, conn);
kcp_process_sampler_release(previous_sampler);
}
return 0;
}
static void kcp_conn_detach_process_sampler(kcp_conn_t *conn) {
kcp_process_sampler_t *sampler;
if (conn == NULL || conn->process_sampler == NULL) {
return;
}
sampler = conn->process_sampler;
conn->process_sampler = NULL;
kcp_process_sampler_remove_conn(sampler, conn);
kcp_process_sampler_release(sampler);
}
static void kcp_log_session_snapshot(kcp_conn_t *conn, const char *reason) {
kcp_session_stats_record_t record;
struct sockaddr_storage local_addr;
socklen_t local_len = sizeof(local_addr);
char local_text[OMNI_MAX_ADDR_TEXT];
char remote_text[OMNI_MAX_ADDR_TEXT];
if (conn == NULL || conn->stats_logger == NULL || conn->sock_state == NULL || conn->kcp == NULL) {
return;
}
memset(&record, 0, sizeof(record));
snprintf(record.record_type, sizeof(record.record_type), "%s", KCP_SESSION_STATS_RECORD_SESSION_SAMPLE);
snprintf(record.node_role, sizeof(record.node_role), "%s", conn->node_role);
snprintf(record.node_id, sizeof(record.node_id), "%s", conn->node_id);
getsockname(conn->sock_state->fd, (struct sockaddr *) &local_addr, &local_len);
omni_sockaddr_to_string((struct sockaddr *) &local_addr, local_len, local_text, sizeof(local_text));
omni_sockaddr_to_string((struct sockaddr *) &conn->remote_addr, conn->remote_addr_len, remote_text, sizeof(remote_text));
snprintf(record.local_addr, sizeof(record.local_addr), "%s", local_text);
snprintf(record.remote_addr, sizeof(record.remote_addr), "%s", remote_text);
record.has_conv = 1;
record.conv = conn->kcp->conv;
record.ts_unix_nano = omni_now_unix_nano();
snprintf(record.sample_reason, sizeof(record.sample_reason), "%s", reason);
pthread_mutex_lock(&conn->kcp_mu);
record.has_rto_ms = 1;
record.rto_ms = conn->kcp->rx_rto;
record.has_srtt_ms = 1;
record.srtt_ms = conn->kcp->rx_srtt;
record.has_srttvar_ms = 1;
record.srttvar_ms = conn->kcp->rx_rttval;
pthread_mutex_unlock(&conn->kcp_mu);
(void) kcp_session_stats_log(conn->stats_logger, &record);
}
static void *kcp_stats_thread_main(void *arg) {
kcp_conn_t *conn = (kcp_conn_t *) arg;
while (!atomic_load(&conn->closed)) {
usleep((useconds_t) conn->stats_interval_ms * 1000U);
if (!atomic_load(&conn->closed)) {
kcp_log_session_snapshot(conn, "periodic");
}
}
return NULL;
}
static int kcp_socket_open_bound(const char *listen_addr, const char *bind_device, struct sockaddr_storage *local_addr, socklen_t *local_len) {
int family;
int fd;
if (omni_parse_sockaddr(listen_addr, 1, local_addr, local_len, &family) != 0) {
return -1;
}
fd = socket(family, SOCK_DGRAM, 0);
if (fd < 0) {
return -1;
}
if (bind_device != NULL && bind_device[0] != '\0' && omni_bind_device(fd, bind_device) != 0) {
close(fd);
return -1;
}
if (bind(fd, (struct sockaddr *) local_addr, *local_len) != 0) {
close(fd);
return -1;
}
return fd;
}
static int kcp_socket_open_dial(const char *server_addr, const char *bind_ip, const char *bind_device, struct sockaddr_storage *remote_addr, socklen_t *remote_len, int *family_out) {
int family;
struct sockaddr_storage local_addr;
socklen_t local_len;
int fd;
if (omni_parse_sockaddr(server_addr, 0, remote_addr, remote_len, &family) != 0) {
return -1;
}
fd = socket(family, SOCK_DGRAM, 0);
if (fd < 0) {
return -1;
}
if (bind_device != NULL && bind_device[0] != '\0' && omni_bind_device(fd, bind_device) != 0) {
close(fd);
return -1;
}
if (bind_ip != NULL && bind_ip[0] != '\0') {
struct addrinfo hints;
struct addrinfo *result = NULL;
memset(&hints, 0, sizeof(hints));
hints.ai_family = family;
hints.ai_socktype = SOCK_DGRAM;
if (getaddrinfo(bind_ip, "0", &hints, &result) != 0 || result == NULL) {
close(fd);
errno = EINVAL;
return -1;
}
memcpy(&local_addr, result->ai_addr, result->ai_addrlen);
local_len = (socklen_t) result->ai_addrlen;
freeaddrinfo(result);
if (bind(fd, (struct sockaddr *) &local_addr, local_len) != 0) {
close(fd);
return -1;
}
}
if (family_out != NULL) {
*family_out = family;
}
return fd;
}
static int kcp_sockaddr_equal(const struct sockaddr_storage *left, socklen_t left_len, const struct sockaddr_storage *right, socklen_t right_len) {
char left_text[OMNI_MAX_ADDR_TEXT];
char right_text[OMNI_MAX_ADDR_TEXT];
if (left == NULL || right == NULL) {
return left == right;
}
return strcmp(
omni_sockaddr_to_string((const struct sockaddr *) left, left_len, left_text, sizeof(left_text)),
omni_sockaddr_to_string((const struct sockaddr *) right, right_len, right_text, sizeof(right_text))
) == 0;
}
static void *kcp_client_recv_thread_main(void *arg) {
kcp_conn_t *conn = (kcp_conn_t *) arg;
uint8_t buffer[64 * 1024];
uint8_t control[512];
struct sockaddr_storage source;
struct iovec iov;
struct msghdr msg;
ssize_t n;
uint32_t conv = 0;
kcp_packet_debug_segment_t *segments = NULL;
size_t segment_count = 0;
int64_t rx_ts;
while (!atomic_load(&conn->closed)) {
memset(&msg, 0, sizeof(msg));
memset(&source, 0, sizeof(source));
iov.iov_base = buffer;
iov.iov_len = sizeof(buffer);
msg.msg_name = &source;
msg.msg_namelen = sizeof(source);
msg.msg_iov = &iov;
msg.msg_iovlen = 1;
if (conn->sock_state->logger != NULL) {
msg.msg_control = control;
msg.msg_controllen = sizeof(control);
}
n = recvmsg(conn->fd, &msg, 0);
if (n < 0) {
if (errno == EINTR) {
continue;
}
if (atomic_load(&conn->closed)) {
return NULL;
}
return NULL;
}
kcp_parse_packet_segments(buffer, (size_t) n, &conv, &segments, &segment_count);
rx_ts = conn->sock_state->logger != NULL ? linux_timestamping_parse_rx_timestamp(&msg) : 0;
if (rx_ts > 0) {
kcp_socket_debug_log_record(conn->sock_state, EVENT_B_RX_SOFTWARE, &source, msg.msg_namelen, (int) n, 0, 0, 1, conv, segments, segment_count, rx_ts);
}
if (!kcp_sockaddr_equal(&source, msg.msg_namelen, &conn->remote_addr, conn->remote_addr_len)) {
free(segments);
segments = NULL;
segment_count = 0;
continue;
}
pthread_mutex_lock(&conn->kcp_mu);
conn->kcp->current = omni_now_millis32();
if (ikcp_input(conn->kcp, (const char *) buffer, n) != 0) {
kcp_conn_record_error(conn);
} else {
kcp_conn_record_input(conn, (int) n, segment_count);
}
pthread_mutex_unlock(&conn->kcp_mu);
pthread_cond_broadcast(&conn->rx_cond);
free(segments);
segments = NULL;
segment_count = 0;
}
return NULL;
}
static void *kcp_update_thread_main(void *arg) {
kcp_conn_t *conn = (kcp_conn_t *) arg;
while (!atomic_load(&conn->closed)) {
int interval_ms;
pthread_mutex_lock(&conn->kcp_mu);
ikcp_update(conn->kcp, omni_now_millis32());
interval_ms = conn->update_interval_ms > 0 ? conn->update_interval_ms : KCP_DEFAULT_INTERVAL_MS;
pthread_mutex_unlock(&conn->kcp_mu);
usleep((useconds_t) interval_ms * 1000U);
}
return NULL;
}
static int kcp_conn_start_stats_thread(kcp_conn_t *conn) {
int thread_rc;
if (conn == NULL || conn->stats_logger == NULL || conn->stats_thread_started) {
return 0;
}
thread_rc = pthread_create(&conn->stats_thread, NULL, kcp_stats_thread_main, conn);
if (thread_rc != 0) {
errno = thread_rc;
return -1;
}
conn->stats_thread_started = 1;
return 0;
}
static kcp_conn_t *kcp_conn_alloc_common(int fd, const struct sockaddr_storage *remote_addr, socklen_t remote_addr_len, const kcp_conn_options_t *options, kcp_socket_debug_state_t *sock_state, latency_logger_t *logger, const char *node_role, const char *node_id, kcp_session_stats_logger_t *stats_logger, int stats_interval_ms) {
kcp_conn_t *conn = (kcp_conn_t *) calloc(1, sizeof(*conn));
uint32_t conv;
int thread_rc;
kcp_conn_options_t effective_options;
if (conn == NULL) {
errno = ENOMEM;
return NULL;
}
conn->fd = fd;
memcpy(&conn->remote_addr, remote_addr, sizeof(*remote_addr));
conn->remote_addr_len = remote_addr_len;
pthread_mutex_init(&conn->kcp_mu, NULL);
pthread_mutex_init(&conn->close_mu, NULL);
pthread_cond_init(&conn->rx_cond, NULL);
protocol_frame_decoder_init(&conn->decoder);
conn->logger = logger;
snprintf(conn->node_role, sizeof(conn->node_role), "%s", node_role == NULL ? "" : node_role);
snprintf(conn->node_id, sizeof(conn->node_id), "%s", node_id == NULL ? "" : node_id);
conn->stats_logger = stats_logger;
conn->stats_interval_ms = stats_interval_ms > 0 ? stats_interval_ms : KCP_DEFAULT_STATS_INTERVAL_MS;
kcp_conn_options_init(&effective_options);
if (options != NULL) {
effective_options = *options;
}
conn->options = effective_options;
conn->update_interval_ms = effective_options.interval_ms;
conn->sock_state = sock_state;
if (omni_random_u32(&conv) != 0) {
protocol_frame_decoder_destroy(&conn->decoder);
pthread_cond_destroy(&conn->rx_cond);
pthread_mutex_destroy(&conn->kcp_mu);
pthread_mutex_destroy(&conn->close_mu);
free(conn);
return NULL;
}
conn->kcp = ikcp_create(conv, conn);
if (conn->kcp == NULL) {
errno = ENOMEM;
protocol_frame_decoder_destroy(&conn->decoder);
pthread_cond_destroy(&conn->rx_cond);
pthread_mutex_destroy(&conn->kcp_mu);
pthread_mutex_destroy(&conn->close_mu);
free(conn);
return NULL;
}
ikcp_setoutput(conn->kcp, kcp_output_callback_impl);
if (kcp_conn_apply_options_locked(conn, &effective_options) != 0) {
ikcp_release(conn->kcp);
protocol_frame_decoder_destroy(&conn->decoder);
pthread_cond_destroy(&conn->rx_cond);
pthread_mutex_destroy(&conn->kcp_mu);
pthread_mutex_destroy(&conn->close_mu);
free(conn);
return NULL;
}
if (kcp_conn_attach_process_sampler(conn) != 0) {
ikcp_release(conn->kcp);
protocol_frame_decoder_destroy(&conn->decoder);
pthread_cond_destroy(&conn->rx_cond);
pthread_mutex_destroy(&conn->kcp_mu);
pthread_mutex_destroy(&conn->close_mu);
free(conn);
return NULL;
}
if (kcp_conn_start_stats_thread(conn) != 0) {
kcp_conn_detach_process_sampler(conn);
ikcp_release(conn->kcp);
protocol_frame_decoder_destroy(&conn->decoder);
pthread_cond_destroy(&conn->rx_cond);
pthread_mutex_destroy(&conn->kcp_mu);
pthread_mutex_destroy(&conn->close_mu);
free(conn);
return NULL;
}
thread_rc = pthread_create(&conn->update_thread, NULL, kcp_update_thread_main, conn);
if (thread_rc != 0) {
errno = thread_rc;
if (conn->stats_thread_started) {
atomic_store(&conn->closed, 1);
pthread_join(conn->stats_thread, NULL);
}
kcp_conn_detach_process_sampler(conn);
ikcp_release(conn->kcp);
protocol_frame_decoder_destroy(&conn->decoder);
pthread_cond_destroy(&conn->rx_cond);
pthread_mutex_destroy(&conn->kcp_mu);
pthread_mutex_destroy(&conn->close_mu);
free(conn);
return NULL;
}
conn->update_thread_started = 1;
return conn;
}
kcp_conn_t *kcp_conn_dial_with_options(const char *server_addr, const char *bind_ip, const char *bind_device, const kcp_conn_options_t *options, kcp_packet_debug_logger_t *packet_logger, latency_logger_t *logger, const char *node_role, const char *node_id, kcp_session_stats_logger_t *stats_logger, int stats_interval_ms) {
struct sockaddr_storage remote_addr;
socklen_t remote_len;
int family;
int fd = kcp_socket_open_dial(server_addr, bind_ip, bind_device, &remote_addr, &remote_len, &family);
kcp_conn_t *conn;
kcp_socket_debug_state_t *sock_state;
int thread_rc;
(void) family;
if (fd < 0) {
return NULL;
}
sock_state = (kcp_socket_debug_state_t *) calloc(1, sizeof(*sock_state));
if (sock_state == NULL) {
errno = ENOMEM;
close(fd);
return NULL;
}
if (kcp_socket_debug_init(sock_state, fd, packet_logger, node_role, node_id) != 0) {
free(sock_state);
close(fd);
return NULL;
}
conn = kcp_conn_alloc_common(fd, &remote_addr, remote_len, options, sock_state, logger, node_role, node_id, stats_logger, stats_interval_ms);
if (conn == NULL) {
kcp_socket_debug_destroy(sock_state);
free(sock_state);
close(fd);
return NULL;
}
conn->is_client = 1;
conn->owns_socket = 1;
thread_rc = pthread_create(&conn->recv_thread, NULL, kcp_client_recv_thread_main, conn);
if (thread_rc != 0) {
errno = thread_rc;
kcp_conn_free(conn);
return NULL;
}
conn->recv_thread_started = 1;
return conn;
}
kcp_conn_t *kcp_conn_dial(const char *server_addr, const char *bind_ip, const char *bind_device, kcp_packet_debug_logger_t *packet_logger, latency_logger_t *logger, const char *node_role, const char *node_id, kcp_session_stats_logger_t *stats_logger, int stats_interval_ms) {
return kcp_conn_dial_with_options(server_addr, bind_ip, bind_device, NULL, packet_logger, logger, node_role, node_id, stats_logger, stats_interval_ms);
}
static void kcp_listener_enqueue_accept(kcp_listener_t *listener, kcp_conn_t *conn) {
pthread_mutex_lock(&listener->accept_mu);
if (listener->accept_tail == NULL) {
listener->accept_head = conn;
} else {
listener->accept_tail->accept_next = conn;
}
listener->accept_tail = conn;
conn->accept_next = NULL;
pthread_cond_signal(&listener->accept_cond);
pthread_mutex_unlock(&listener->accept_mu);
}
static kcp_conn_t *kcp_listener_find_session(kcp_listener_t *listener, uint32_t conv) {
kcp_session_entry_t *entry;
for (entry = listener->sessions; entry != NULL; entry = entry->next) {
if (entry->conv == conv) {
return entry->conn;
}
}
return NULL;
}
static int kcp_listener_add_session(kcp_listener_t *listener, uint32_t conv, kcp_conn_t *conn) {
kcp_session_entry_t *entry = (kcp_session_entry_t *) calloc(1, sizeof(*entry));
if (entry == NULL) {
return -1;
}
entry->conv = conv;
entry->conn = conn;
entry->next = listener->sessions;
listener->sessions = entry;
return 0;
}
static void kcp_listener_remove_session(kcp_listener_t *listener, kcp_conn_t *conn) {
kcp_session_entry_t *prev_entry = NULL;
kcp_session_entry_t *entry;
kcp_conn_t *prev_accept = NULL;
kcp_conn_t *accept;
if (listener == NULL || conn == NULL) {
return;
}
pthread_mutex_lock(&listener->lock);
for (entry = listener->sessions; entry != NULL; entry = entry->next) {
if (entry->conn == conn) {
if (prev_entry == NULL) {
listener->sessions = entry->next;
} else {
prev_entry->next = entry->next;
}
free(entry);
break;
}
prev_entry = entry;
}
pthread_mutex_unlock(&listener->lock);
pthread_mutex_lock(&listener->accept_mu);
for (accept = listener->accept_head; accept != NULL; accept = accept->accept_next) {
if (accept == conn) {
if (prev_accept == NULL) {
listener->accept_head = accept->accept_next;
} else {
prev_accept->accept_next = accept->accept_next;
}
if (listener->accept_tail == conn) {
listener->accept_tail = prev_accept;
}
conn->accept_next = NULL;
break;
}
prev_accept = accept;
}
pthread_mutex_unlock(&listener->accept_mu);
}
static void *kcp_listener_recv_thread_main(void *arg) {
kcp_listener_t *listener = (kcp_listener_t *) arg;
uint8_t buffer[64 * 1024];
uint8_t control[512];
struct sockaddr_storage source;
struct iovec iov;
struct msghdr msg;
ssize_t n;
uint32_t conv;
kcp_packet_debug_segment_t *segments = NULL;
size_t segment_count = 0;
int64_t rx_ts;
while (!listener->closed) {
memset(&msg, 0, sizeof(msg));
memset(&source, 0, sizeof(source));
iov.iov_base = buffer;
iov.iov_len = sizeof(buffer);
msg.msg_name = &source;
msg.msg_namelen = sizeof(source);
msg.msg_iov = &iov;
msg.msg_iovlen = 1;
if (listener->sock_state.logger != NULL) {
msg.msg_control = control;
msg.msg_controllen = sizeof(control);
}
n = recvmsg(listener->fd, &msg, 0);
if (n < 0) {
if (errno == EINTR) {
continue;
}
if (listener->closed) {
return NULL;
}
return NULL;
}
kcp_parse_packet_segments(buffer, (size_t) n, &conv, &segments, &segment_count);
rx_ts = listener->sock_state.logger != NULL ? linux_timestamping_parse_rx_timestamp(&msg) : 0;
if (rx_ts > 0) {
kcp_socket_debug_log_record(&listener->sock_state, EVENT_B_RX_SOFTWARE, &source, msg.msg_namelen, (int) n, 0, 0, 1, conv, segments, segment_count, rx_ts);
}
pthread_mutex_lock(&listener->lock);
{
kcp_conn_t *conn = kcp_listener_find_session(listener, conv);
if (conn == NULL) {
conn = (kcp_conn_t *) calloc(1, sizeof(*conn));
if (conn != NULL) {
kcp_conn_options_t accepted_options;
conn->fd = listener->fd;
memcpy(&conn->remote_addr, &source, sizeof(source));
conn->remote_addr_len = msg.msg_namelen;
pthread_mutex_init(&conn->kcp_mu, NULL);
pthread_mutex_init(&conn->close_mu, NULL);
pthread_cond_init(&conn->rx_cond, NULL);
protocol_frame_decoder_init(&conn->decoder);
snprintf(conn->node_role, sizeof(conn->node_role), "%s", listener->sock_state.node_role);
snprintf(conn->node_id, sizeof(conn->node_id), "%s", listener->sock_state.node_id);
conn->stats_interval_ms = KCP_DEFAULT_STATS_INTERVAL_MS;
conn->sock_state = &listener->sock_state;
conn->listener = listener;
kcp_conn_options_init(&accepted_options);
conn->options = accepted_options;
conn->update_interval_ms = accepted_options.interval_ms;
conn->kcp = ikcp_create(conv, conn);
if (conn->kcp != NULL) {
int update_started = 0;
ikcp_setoutput(conn->kcp, kcp_output_callback_impl);
if (kcp_conn_apply_options_locked(conn, &accepted_options) == 0 &&
pthread_create(&conn->update_thread, NULL, kcp_update_thread_main, conn) == 0) {
update_started = 1;
}
if (update_started && kcp_listener_add_session(listener, conv, conn) == 0) {
conn->update_thread_started = 1;
kcp_listener_enqueue_accept(listener, conn);
} else {
atomic_store(&conn->closed, 1);
if (update_started) {
pthread_join(conn->update_thread, NULL);
}
ikcp_release(conn->kcp);
protocol_frame_decoder_destroy(&conn->decoder);
pthread_cond_destroy(&conn->rx_cond);
pthread_mutex_destroy(&conn->kcp_mu);
pthread_mutex_destroy(&conn->close_mu);
free(conn);
conn = NULL;
}
} else {
protocol_frame_decoder_destroy(&conn->decoder);
pthread_cond_destroy(&conn->rx_cond);
pthread_mutex_destroy(&conn->kcp_mu);
pthread_mutex_destroy(&conn->close_mu);
free(conn);
conn = NULL;
}
}
}
if (conn != NULL && conn->kcp != NULL) {
pthread_mutex_lock(&conn->kcp_mu);
conn->kcp->current = omni_now_millis32();
if (ikcp_input(conn->kcp, (const char *) buffer, n) != 0) {
kcp_conn_record_error(conn);
} else {
kcp_conn_record_input(conn, (int) n, segment_count);
}
pthread_mutex_unlock(&conn->kcp_mu);
pthread_cond_broadcast(&conn->rx_cond);
}
}
pthread_mutex_unlock(&listener->lock);
free(segments);
segments = NULL;
segment_count = 0;
}
return NULL;
}
kcp_listener_t *kcp_listener_listen(const char *listen_addr, const char *bind_device, kcp_packet_debug_logger_t *packet_logger, const char *node_role, const char *node_id) {
struct sockaddr_storage local_addr;
socklen_t local_len;
int fd = kcp_socket_open_bound(listen_addr, bind_device, &local_addr, &local_len);
kcp_listener_t *listener;
if (fd < 0) {
return NULL;
}
listener = (kcp_listener_t *) calloc(1, sizeof(*listener));
if (listener == NULL) {
close(fd);
return NULL;
}
listener->fd = fd;
pthread_mutex_init(&listener->lock, NULL);
pthread_mutex_init(&listener->accept_mu, NULL);
pthread_cond_init(&listener->accept_cond, NULL);
if (kcp_socket_debug_init(&listener->sock_state, fd, packet_logger, node_role, node_id) != 0) {
kcp_listener_free(listener);
return NULL;
}
if (pthread_create(&listener->recv_thread, NULL, kcp_listener_recv_thread_main, listener) != 0) {
kcp_listener_free(listener);
return NULL;
}
listener->recv_thread_started = 1;
return listener;
}
kcp_conn_t *kcp_listener_accept(kcp_listener_t *listener) {
kcp_conn_t *conn;
if (listener == NULL) {
errno = EINVAL;
return NULL;
}
pthread_mutex_lock(&listener->accept_mu);
while (!listener->closed && listener->accept_head == NULL) {
pthread_cond_wait(&listener->accept_cond, &listener->accept_mu);
}
if (listener->closed) {
pthread_mutex_unlock(&listener->accept_mu);
errno = ECANCELED;
return NULL;
}
conn = listener->accept_head;
listener->accept_head = conn->accept_next;
if (listener->accept_head == NULL) {
listener->accept_tail = NULL;
}
conn->accept_next = NULL;
pthread_mutex_unlock(&listener->accept_mu);
return conn;
}
int kcp_conn_configure_runtime(kcp_conn_t *conn, latency_logger_t *logger, const char *node_role, const char *node_id, kcp_session_stats_logger_t *stats_logger, int stats_interval_ms) {
if (conn == NULL) {
errno = EINVAL;
return -1;
}
pthread_mutex_lock(&conn->close_mu);
conn->logger = logger;
if (node_role != NULL) {
snprintf(conn->node_role, sizeof(conn->node_role), "%s", node_role);
}
if (node_id != NULL) {
snprintf(conn->node_id, sizeof(conn->node_id), "%s", node_id);
}
conn->stats_logger = stats_logger;
if (stats_interval_ms > 0) {
conn->stats_interval_ms = stats_interval_ms;
} else if (conn->stats_interval_ms <= 0) {
conn->stats_interval_ms = KCP_DEFAULT_STATS_INTERVAL_MS;
}
if (kcp_conn_attach_process_sampler(conn) != 0) {
pthread_mutex_unlock(&conn->close_mu);
return -1;
}
pthread_mutex_unlock(&conn->close_mu);
if (kcp_conn_start_stats_thread(conn) != 0) {
pthread_mutex_lock(&conn->close_mu);
kcp_conn_detach_process_sampler(conn);
pthread_mutex_unlock(&conn->close_mu);
return -1;
}
return 0;
}
int kcp_conn_apply_options(kcp_conn_t *conn, const kcp_conn_options_t *options) {
int rc;
if (conn == NULL || options == NULL) {
errno = EINVAL;
return -1;
}
pthread_mutex_lock(&conn->kcp_mu);
rc = kcp_conn_apply_options_locked(conn, options);
pthread_mutex_unlock(&conn->kcp_mu);
return rc;
}
int kcp_conn_send(kcp_conn_t *conn, const message_t *msg) {
uint8_t *frame = NULL;
size_t frame_len = 0;
int send_errno = 0;
int kcp_send_rc = 0;
if (conn == NULL || msg == NULL) {
errno = EINVAL;
return -1;
}
if (protocol_encode_message_stream(msg, &frame, &frame_len) != 0) {
return -1;
}
latencylog_log_message_event(conn->logger, conn->node_role, conn->node_id, EVENT_SEND_HANDOFF_BEGIN, msg);
kcp_log_session_snapshot(conn, "send_handoff_begin");
kcp_process_sampler_request_sample(conn->process_sampler, "send_handoff_begin");
pthread_mutex_lock(&conn->kcp_mu);
atomic_store(&conn->sock_state->last_send_errno, 0);
conn->kcp->current = omni_now_millis32();
kcp_send_rc = ikcp_send(conn->kcp, (const char *) frame, (int) frame_len);
if (kcp_send_rc < 0) {
pthread_mutex_unlock(&conn->kcp_mu);
errno = kcp_send_rc == -2 ? EMSGSIZE : EINVAL;
free(frame);
return -1;
}
ikcp_flush(conn->kcp);
send_errno = atomic_load(&conn->sock_state->last_send_errno);
pthread_mutex_unlock(&conn->kcp_mu);
if (send_errno != 0) {
errno = send_errno;
free(frame);
return -1;
}
kcp_log_session_snapshot(conn, "send_handoff_end");
kcp_process_sampler_request_sample(conn->process_sampler, "send_handoff_end");
latencylog_log_message_event(conn->logger, conn->node_role, conn->node_id, EVENT_SEND_HANDOFF_END, msg);
free(frame);
return 0;
}
static void kcp_timespec_deadline_after_ms(struct timespec *deadline, int timeout_ms) {
clock_gettime(CLOCK_REALTIME, deadline);
deadline->tv_sec += timeout_ms / 1000;
deadline->tv_nsec += (long) (timeout_ms % 1000) * 1000000L;
if (deadline->tv_nsec >= 1000000000L) {
deadline->tv_sec += 1;
deadline->tv_nsec -= 1000000000L;
}
}
int kcp_conn_receive_timed(kcp_conn_t *conn, message_t *out_msg, int timeout_ms) {
uint8_t *frame = NULL;
size_t frame_len = 0;
char err[128];
int next_rc;
struct timespec deadline;
int use_deadline = timeout_ms > 0;
if (conn == NULL || out_msg == NULL) {
errno = EINVAL;
return -1;
}
if (use_deadline) {
kcp_timespec_deadline_after_ms(&deadline, timeout_ms);
}
for (;;) {
next_rc = protocol_frame_decoder_next(&conn->decoder, &frame, &frame_len);
if (next_rc < 0) {
return -1;
}
if (next_rc == 1) {
if (protocol_decode_message_stream_payload(frame, frame_len, out_msg, err, sizeof(err)) != 0) {
free(frame);
errno = EPROTO;
return -1;
}
free(frame);
kcp_log_session_snapshot(conn, "receive");
kcp_process_sampler_request_sample(conn->process_sampler, "receive");
return 0;
}
pthread_mutex_lock(&conn->kcp_mu);
{
int n = ikcp_recv(conn->kcp, (char *) conn->scratch, (int) sizeof(conn->scratch));
if (n > 0) {
if (protocol_frame_decoder_feed(&conn->decoder, conn->scratch, (size_t) n) != 0) {
pthread_mutex_unlock(&conn->kcp_mu);
return -1;
}
pthread_mutex_unlock(&conn->kcp_mu);
continue;
}
if (atomic_load(&conn->closed)) {
pthread_mutex_unlock(&conn->kcp_mu);
errno = ECANCELED;
return -1;
}
if (timeout_ms == 0) {
pthread_mutex_unlock(&conn->kcp_mu);
return 1;
}
if (timeout_ms < 0) {
pthread_cond_wait(&conn->rx_cond, &conn->kcp_mu);
} else {
int wait_rc = pthread_cond_timedwait(&conn->rx_cond, &conn->kcp_mu, &deadline);
if (wait_rc == ETIMEDOUT) {
pthread_mutex_unlock(&conn->kcp_mu);
return 1;
}
if (wait_rc != 0) {
pthread_mutex_unlock(&conn->kcp_mu);
errno = wait_rc;
return -1;
}
}
}
pthread_mutex_unlock(&conn->kcp_mu);
}
}
int kcp_conn_receive(kcp_conn_t *conn, message_t *out_msg) {
return kcp_conn_receive_timed(conn, out_msg, -1);
}
uint32_t kcp_conn_conv(const kcp_conn_t *conn) {
return conn == NULL || conn->kcp == NULL ? 0 : conn->kcp->conv;
}
int kcp_conn_local_addr(const kcp_conn_t *conn, struct sockaddr_storage *addr, socklen_t *addr_len) {
socklen_t len = sizeof(*addr);
if (conn == NULL || addr == NULL || addr_len == NULL || conn->sock_state == NULL) {
errno = EINVAL;
return -1;
}
if (getsockname(conn->sock_state->fd, (struct sockaddr *) addr, &len) != 0) {
return -1;
}
*addr_len = len;
return 0;
}
int kcp_conn_close(kcp_conn_t *conn) {
if (conn == NULL) {
return 0;
}
pthread_mutex_lock(&conn->close_mu);
if (!atomic_load(&conn->closed)) {
kcp_log_session_snapshot(conn, "close");
kcp_process_sampler_request_sample_and_wait(conn->process_sampler, "close");
pthread_mutex_lock(&conn->kcp_mu);
atomic_store(&conn->closed, 1);
if (conn->owns_socket && !conn->socket_closed) {
close(conn->fd);
conn->socket_closed = 1;
}
pthread_cond_broadcast(&conn->rx_cond);
pthread_mutex_unlock(&conn->kcp_mu);
}
pthread_mutex_unlock(&conn->close_mu);
return 0;
}
void kcp_conn_free(kcp_conn_t *conn) {
if (conn == NULL) {
return;
}
kcp_conn_close(conn);
if (conn->recv_thread_started) {
pthread_join(conn->recv_thread, NULL);
}
if (conn->update_thread_started) {
pthread_join(conn->update_thread, NULL);
}
if (conn->stats_thread_started) {
pthread_join(conn->stats_thread, NULL);
}
if (conn->listener != NULL && !conn->listener->closed) {
kcp_listener_remove_session(conn->listener, conn);
}
kcp_conn_detach_process_sampler(conn);
if (conn->owns_socket && conn->sock_state != NULL) {
if (!conn->socket_closed) {
close(conn->fd);
conn->socket_closed = 1;
}
kcp_socket_debug_destroy(conn->sock_state);
free(conn->sock_state);
}
if (conn->kcp != NULL) {
ikcp_release(conn->kcp);
}
protocol_frame_decoder_destroy(&conn->decoder);
pthread_cond_destroy(&conn->rx_cond);
pthread_mutex_destroy(&conn->kcp_mu);
pthread_mutex_destroy(&conn->close_mu);
free(conn);
}
int kcp_listener_close(kcp_listener_t *listener) {
if (listener == NULL) {
return 0;
}
if (!listener->closed) {
listener->closed = 1;
close(listener->fd);
pthread_cond_broadcast(&listener->accept_cond);
}
return 0;
}
void kcp_listener_free(kcp_listener_t *listener) {
kcp_session_entry_t *entry;
kcp_session_entry_t *next;
if (listener == NULL) {
return;
}
kcp_listener_close(listener);
if (listener->recv_thread_started) {
pthread_join(listener->recv_thread, NULL);
}
for (entry = listener->sessions; entry != NULL; entry = next) {
next = entry->next;
entry->conn->listener = NULL;
kcp_conn_free(entry->conn);
free(entry);
}
kcp_socket_debug_destroy(&listener->sock_state);
pthread_mutex_destroy(&listener->lock);
pthread_mutex_destroy(&listener->accept_mu);
pthread_cond_destroy(&listener->accept_cond);
free(listener);
}
int kcp_session_stats_parse_interval_ms(const char *raw, int *out_ms) {
return omni_parse_duration_ms(raw, KCP_DEFAULT_STATS_INTERVAL_MS, out_ms);
}