#include "transport_kcp.h" #include "ikcp.h" #include "linux_timestamping.h" #include #include #include #include #define KCP_RECV_CHUNK_SIZE (32U * 1024U) typedef struct kcp_packet_debug_pending { struct kcp_packet_debug_pending *next; uint32_t tx_id; struct sockaddr_storage remote_addr; socklen_t remote_addr_len; int packet_bytes; int has_conv; uint32_t conv; kcp_packet_debug_segment_t *segments; size_t segment_count; int saw_sched; int saw_software; } kcp_packet_debug_pending_t; typedef struct kcp_socket_debug_state { int fd; char node_role[OMNI_MAX_NODE_ROLE]; char node_id[OMNI_MAX_PEER_ID]; kcp_packet_debug_logger_t *logger; pthread_mutex_t write_mu; pthread_mutex_t pending_mu; pthread_t errqueue_thread; int errqueue_thread_started; uint32_t next_tx_id; kcp_packet_debug_pending_t *pending_head; atomic_int closed; atomic_int last_send_errno; } kcp_socket_debug_state_t; typedef struct kcp_session_entry kcp_session_entry_t; typedef struct kcp_process_sampler kcp_process_sampler_t; struct kcp_conn { ikcpcb *kcp; int fd; int is_client; int owns_socket; int socket_closed; atomic_int closed; struct sockaddr_storage remote_addr; socklen_t remote_addr_len; pthread_mutex_t kcp_mu; pthread_mutex_t close_mu; pthread_cond_t rx_cond; pthread_t recv_thread; int recv_thread_started; pthread_t update_thread; int update_thread_started; pthread_t stats_thread; int stats_thread_started; kcp_conn_options_t options; int update_interval_ms; atomic_uint_fast64_t total_out_segs; uint64_t pending_bytes_sent; uint64_t pending_bytes_received; uint64_t pending_in_pkts; uint64_t pending_out_pkts; uint64_t pending_in_segs; uint64_t pending_out_segs; uint64_t pending_in_errs; uint64_t pending_kcp_in_errs; protocol_frame_decoder_t decoder; uint8_t scratch[KCP_RECV_CHUNK_SIZE]; latency_logger_t *logger; char node_role[OMNI_MAX_NODE_ROLE]; char node_id[OMNI_MAX_PEER_ID]; kcp_session_stats_logger_t *stats_logger; int stats_interval_ms; kcp_process_sampler_t *process_sampler; kcp_socket_debug_state_t *sock_state; struct kcp_listener *listener; struct kcp_conn *accept_next; struct kcp_conn *process_next; }; struct kcp_listener { int fd; int closed; pthread_mutex_t lock; pthread_mutex_t accept_mu; pthread_cond_t accept_cond; pthread_t recv_thread; int recv_thread_started; kcp_session_entry_t *sessions; kcp_conn_t *accept_head; kcp_conn_t *accept_tail; kcp_socket_debug_state_t sock_state; }; struct kcp_session_entry { uint32_t conv; kcp_conn_t *conn; kcp_session_entry_t *next; }; struct kcp_process_sampler { kcp_process_sampler_t *next; kcp_session_stats_logger_t *logger; char node_role[OMNI_MAX_NODE_ROLE]; char node_id[OMNI_MAX_PEER_ID]; int stats_interval_ms; pthread_mutex_t lock; pthread_cond_t cond; pthread_t thread; int thread_started; int stopped; int refcount; int request_pending; uint64_t pending_request_id; uint64_t completed_request_id; char pending_reason[32]; kcp_conn_t *members; uint64_t prev_bytes_sent; uint64_t prev_bytes_received; uint64_t prev_in_pkts; uint64_t prev_out_pkts; uint64_t prev_in_segs; uint64_t prev_out_segs; uint64_t prev_in_errs; uint64_t prev_kcp_in_errs; uint64_t prev_retrans_segs; uint64_t prev_fast_retrans_segs; uint64_t prev_lost_segs; uint64_t prev_repeat_segs; atomic_uint_fast64_t bytes_sent; atomic_uint_fast64_t bytes_received; atomic_uint_fast64_t in_pkts; atomic_uint_fast64_t out_pkts; atomic_uint_fast64_t in_segs; atomic_uint_fast64_t out_segs; atomic_uint_fast64_t in_errs; atomic_uint_fast64_t kcp_in_errs; atomic_uint_fast64_t curr_estab; }; static pthread_mutex_t g_kcp_process_sampler_mu = PTHREAD_MUTEX_INITIALIZER; static kcp_process_sampler_t *g_kcp_process_samplers = NULL; void kcp_conn_options_init(kcp_conn_options_t *options) { if (options == NULL) { return; } memset(options, 0, sizeof(*options)); options->nodelay = KCP_DEFAULT_NODELAY; options->interval_ms = KCP_DEFAULT_INTERVAL_MS; options->resend = KCP_DEFAULT_RESEND; options->nc = KCP_DEFAULT_NC; options->sndwnd = KCP_DEFAULT_SND_WND; options->rcvwnd = KCP_DEFAULT_RCV_WND; options->mtu = KCP_DEFAULT_MTU; } void kcp_conn_options_set_control_defaults(kcp_conn_options_t *options) { if (options == NULL) { return; } memset(options, 0, sizeof(*options)); options->nodelay = KCP_CONTROL_NODELAY; options->interval_ms = KCP_CONTROL_INTERVAL_MS; options->resend = KCP_CONTROL_RESEND; options->nc = KCP_CONTROL_NC; options->sndwnd = KCP_CONTROL_SND_WND; options->rcvwnd = KCP_CONTROL_RCV_WND; options->mtu = KCP_CONTROL_MTU; } void kcp_conn_options_set_video_defaults(kcp_conn_options_t *options) { if (options == NULL) { return; } memset(options, 0, sizeof(*options)); options->nodelay = KCP_VIDEO_NODELAY; options->interval_ms = KCP_VIDEO_INTERVAL_MS; options->resend = KCP_VIDEO_RESEND; options->nc = KCP_VIDEO_NC; options->sndwnd = KCP_VIDEO_SND_WND; options->rcvwnd = KCP_VIDEO_RCV_WND; options->mtu = KCP_VIDEO_MTU; } void kcp_conn_options_set_telemetry_defaults(kcp_conn_options_t *options) { if (options == NULL) { return; } memset(options, 0, sizeof(*options)); options->nodelay = KCP_TELEMETRY_NODELAY; options->interval_ms = KCP_TELEMETRY_INTERVAL_MS; options->resend = KCP_TELEMETRY_RESEND; options->nc = KCP_TELEMETRY_NC; options->sndwnd = KCP_TELEMETRY_SND_WND; options->rcvwnd = KCP_TELEMETRY_RCV_WND; options->mtu = KCP_TELEMETRY_MTU; } static int kcp_conn_validate_options(const kcp_conn_options_t *options) { if (options == NULL) { errno = EINVAL; return -1; } if (options->interval_ms <= 0 || options->sndwnd <= 0 || options->rcvwnd <= 0 || options->mtu <= 0) { errno = EINVAL; return -1; } return 0; } static int kcp_conn_apply_options_locked(kcp_conn_t *conn, const kcp_conn_options_t *options) { if (conn == NULL || conn->kcp == NULL || kcp_conn_validate_options(options) != 0) { return -1; } if (ikcp_wndsize(conn->kcp, options->sndwnd, options->rcvwnd) != 0) { errno = EINVAL; return -1; } if (ikcp_setmtu(conn->kcp, options->mtu) != 0) { errno = EINVAL; return -1; } if (ikcp_nodelay(conn->kcp, options->nodelay, options->interval_ms, options->resend, options->nc) != 0) { errno = EINVAL; return -1; } conn->kcp->stream = 1; conn->options = *options; conn->update_interval_ms = options->interval_ms; return 0; } static void kcp_parse_packet_segments(const uint8_t *packet, size_t len, uint32_t *conv, kcp_packet_debug_segment_t **segments, size_t *segment_count) { size_t offset = 0; size_t count = 0; kcp_packet_debug_segment_t *items = NULL; if (conv != NULL) { *conv = 0; } if (segments != NULL) { *segments = NULL; } if (segment_count != NULL) { *segment_count = 0; } if (len < 4) { return; } if (conv != NULL) { *conv = (uint32_t) ((unsigned char) packet[0] | ((unsigned char) packet[1] << 8) | ((unsigned char) packet[2] << 16) | ((unsigned char) packet[3] << 24)); } while (offset + 24U <= len) { uint32_t seg_len = (uint32_t) ((unsigned char) packet[offset + 20] | ((unsigned char) packet[offset + 21] << 8) | ((unsigned char) packet[offset + 22] << 16) | ((unsigned char) packet[offset + 23] << 24)); if (offset + 24U + seg_len > len) { free(items); return; } if (segments != NULL) { kcp_packet_debug_segment_t *next = (kcp_packet_debug_segment_t *) realloc(items, (count + 1U) * sizeof(*items)); if (next == NULL) { free(items); return; } items = next; items[count].cmd = packet[offset + 4]; items[count].frg = packet[offset + 5]; items[count].wnd = (uint16_t) ((unsigned char) packet[offset + 6] | ((unsigned char) packet[offset + 7] << 8)); items[count].sn = (uint32_t) ((unsigned char) packet[offset + 12] | ((unsigned char) packet[offset + 13] << 8) | ((unsigned char) packet[offset + 14] << 16) | ((unsigned char) packet[offset + 15] << 24)); items[count].una = (uint32_t) ((unsigned char) packet[offset + 16] | ((unsigned char) packet[offset + 17] << 8) | ((unsigned char) packet[offset + 18] << 16) | ((unsigned char) packet[offset + 19] << 24)); items[count].len = seg_len; } count++; offset += 24U + seg_len; } if (segments != NULL) { *segments = items; } else { free(items); } if (segment_count != NULL) { *segment_count = count; } } static uint64_t kcp_counter_diff(uint64_t previous, uint64_t current) { return current < previous ? 0 : current - previous; } static int kcp_process_sampler_matches(const kcp_process_sampler_t *sampler, kcp_session_stats_logger_t *logger, const char *node_role, const char *node_id, int stats_interval_ms) { if (sampler == NULL) { return 0; } return sampler->logger == logger && sampler->stats_interval_ms == stats_interval_ms && strcmp(sampler->node_role, node_role == NULL ? "" : node_role) == 0 && strcmp(sampler->node_id, node_id == NULL ? "" : node_id) == 0; } static void kcp_process_sampler_record_send(kcp_process_sampler_t *sampler, int packet_bytes, size_t segments) { if (sampler == NULL) { return; } atomic_fetch_add_explicit(&sampler->bytes_sent, (uint64_t) packet_bytes, memory_order_relaxed); atomic_fetch_add_explicit(&sampler->out_pkts, 1, memory_order_relaxed); atomic_fetch_add_explicit(&sampler->out_segs, (uint64_t) segments, memory_order_relaxed); } static void kcp_process_sampler_record_input(kcp_process_sampler_t *sampler, int packet_bytes, size_t segments) { if (sampler == NULL) { return; } atomic_fetch_add_explicit(&sampler->bytes_received, (uint64_t) packet_bytes, memory_order_relaxed); atomic_fetch_add_explicit(&sampler->in_pkts, 1, memory_order_relaxed); atomic_fetch_add_explicit(&sampler->in_segs, (uint64_t) segments, memory_order_relaxed); } static void kcp_process_sampler_record_error(kcp_process_sampler_t *sampler) { if (sampler == NULL) { return; } atomic_fetch_add_explicit(&sampler->in_errs, 1, memory_order_relaxed); atomic_fetch_add_explicit(&sampler->kcp_in_errs, 1, memory_order_relaxed); } static void kcp_conn_record_send(kcp_conn_t *conn, int packet_bytes, size_t segments) { if (conn == NULL) { return; } atomic_fetch_add_explicit(&conn->total_out_segs, (uint64_t) segments, memory_order_relaxed); if (conn->process_sampler != NULL) { kcp_process_sampler_record_send(conn->process_sampler, packet_bytes, segments); return; } conn->pending_bytes_sent += (uint64_t) packet_bytes; conn->pending_out_pkts += 1; conn->pending_out_segs += (uint64_t) segments; } static void kcp_conn_record_input(kcp_conn_t *conn, int packet_bytes, size_t segments) { if (conn == NULL) { return; } if (conn->process_sampler != NULL) { kcp_process_sampler_record_input(conn->process_sampler, packet_bytes, segments); return; } conn->pending_bytes_received += (uint64_t) packet_bytes; conn->pending_in_pkts += 1; conn->pending_in_segs += (uint64_t) segments; } static void kcp_conn_record_error(kcp_conn_t *conn) { if (conn == NULL) { return; } if (conn->process_sampler != NULL) { kcp_process_sampler_record_error(conn->process_sampler); return; } conn->pending_in_errs += 1; conn->pending_kcp_in_errs += 1; } static void kcp_process_sampler_curr_estab_inc(kcp_process_sampler_t *sampler) { if (sampler == NULL) { return; } atomic_fetch_add_explicit(&sampler->curr_estab, 1, memory_order_relaxed); } static void kcp_process_sampler_curr_estab_dec(kcp_process_sampler_t *sampler) { uint_fast64_t current; if (sampler == NULL) { return; } current = atomic_load_explicit(&sampler->curr_estab, memory_order_relaxed); while (current > 0) { if (atomic_compare_exchange_weak_explicit(&sampler->curr_estab, ¤t, current - 1U, memory_order_relaxed, memory_order_relaxed)) { return; } } } static void kcp_process_sampler_add_conn(kcp_process_sampler_t *sampler, kcp_conn_t *conn) { if (sampler == NULL || conn == NULL) { return; } pthread_mutex_lock(&sampler->lock); conn->process_next = sampler->members; sampler->members = conn; pthread_mutex_unlock(&sampler->lock); kcp_process_sampler_curr_estab_inc(sampler); } static void kcp_process_sampler_remove_conn(kcp_process_sampler_t *sampler, kcp_conn_t *conn) { kcp_conn_t *prev = NULL; kcp_conn_t *cur; if (sampler == NULL || conn == NULL) { return; } pthread_mutex_lock(&sampler->lock); for (cur = sampler->members; cur != NULL; cur = cur->process_next) { if (cur == conn) { if (prev == NULL) { sampler->members = cur->process_next; } else { prev->process_next = cur->process_next; } conn->process_next = NULL; break; } prev = cur; } pthread_mutex_unlock(&sampler->lock); if (cur == conn) { kcp_process_sampler_curr_estab_dec(sampler); } } static void kcp_process_sampler_collect_gauges(kcp_process_sampler_t *sampler, uint64_t *snd_queue, uint64_t *rcv_queue, uint64_t *snd_buffer, uint64_t *retrans_segs, uint64_t *fast_retrans_segs, uint64_t *lost_segs, uint64_t *repeat_segs) { kcp_conn_t *conn; if (snd_queue != NULL) { *snd_queue = 0; } if (rcv_queue != NULL) { *rcv_queue = 0; } if (snd_buffer != NULL) { *snd_buffer = 0; } if (retrans_segs != NULL) { *retrans_segs = 0; } if (fast_retrans_segs != NULL) { *fast_retrans_segs = 0; } if (lost_segs != NULL) { *lost_segs = 0; } if (repeat_segs != NULL) { *repeat_segs = 0; } if (sampler == NULL) { return; } pthread_mutex_lock(&sampler->lock); for (conn = sampler->members; conn != NULL; conn = conn->process_next) { pthread_mutex_lock(&conn->kcp_mu); if (conn->kcp != NULL) { if (snd_queue != NULL) { *snd_queue += conn->kcp->nsnd_que; } if (rcv_queue != NULL) { *rcv_queue += conn->kcp->nrcv_que; } if (snd_buffer != NULL) { *snd_buffer += conn->kcp->nsnd_buf; } if (lost_segs != NULL) { *lost_segs += conn->kcp->timeout_retrans_total; } if (fast_retrans_segs != NULL) { *fast_retrans_segs += conn->kcp->fast_retrans_total; } if (retrans_segs != NULL) { *retrans_segs += conn->kcp->timeout_retrans_total + conn->kcp->fast_retrans_total; } if (repeat_segs != NULL) { *repeat_segs += conn->kcp->duplicate_recv_total; } } pthread_mutex_unlock(&conn->kcp_mu); } pthread_mutex_unlock(&sampler->lock); } static void kcp_process_sampler_log_snapshot(kcp_process_sampler_t *sampler, const char *reason) { kcp_session_stats_record_t record; uint64_t bytes_sent; uint64_t bytes_received; uint64_t in_pkts; uint64_t out_pkts; uint64_t in_segs; uint64_t out_segs; uint64_t in_errs; uint64_t kcp_in_errs; uint64_t snd_queue = 0; uint64_t rcv_queue = 0; uint64_t snd_buffer = 0; uint64_t retrans_segs = 0; uint64_t fast_retrans_segs = 0; uint64_t lost_segs = 0; uint64_t repeat_segs = 0; if (sampler == NULL || sampler->logger == NULL) { return; } bytes_sent = atomic_load_explicit(&sampler->bytes_sent, memory_order_relaxed); bytes_received = atomic_load_explicit(&sampler->bytes_received, memory_order_relaxed); in_pkts = atomic_load_explicit(&sampler->in_pkts, memory_order_relaxed); out_pkts = atomic_load_explicit(&sampler->out_pkts, memory_order_relaxed); in_segs = atomic_load_explicit(&sampler->in_segs, memory_order_relaxed); out_segs = atomic_load_explicit(&sampler->out_segs, memory_order_relaxed); in_errs = atomic_load_explicit(&sampler->in_errs, memory_order_relaxed); kcp_in_errs = atomic_load_explicit(&sampler->kcp_in_errs, memory_order_relaxed); kcp_process_sampler_collect_gauges( sampler, &snd_queue, &rcv_queue, &snd_buffer, &retrans_segs, &fast_retrans_segs, &lost_segs, &repeat_segs); memset(&record, 0, sizeof(record)); snprintf(record.record_type, sizeof(record.record_type), "%s", KCP_SESSION_STATS_RECORD_PROCESS_SAMPLE); snprintf(record.node_role, sizeof(record.node_role), "%s", sampler->node_role); snprintf(record.node_id, sizeof(record.node_id), "%s", sampler->node_id); snprintf(record.sample_reason, sizeof(record.sample_reason), "%s", reason == NULL ? "" : reason); record.ts_unix_nano = omni_now_unix_nano(); record.has_bytes_sent = 1; record.bytes_sent = kcp_counter_diff(sampler->prev_bytes_sent, bytes_sent); record.has_bytes_received = 1; record.bytes_received = kcp_counter_diff(sampler->prev_bytes_received, bytes_received); record.has_in_pkts = 1; record.in_pkts = kcp_counter_diff(sampler->prev_in_pkts, in_pkts); record.has_out_pkts = 1; record.out_pkts = kcp_counter_diff(sampler->prev_out_pkts, out_pkts); record.has_in_segs = 1; record.in_segs = kcp_counter_diff(sampler->prev_in_segs, in_segs); record.has_out_segs = 1; record.out_segs = kcp_counter_diff(sampler->prev_out_segs, out_segs); record.has_retrans_segs = 1; record.retrans_segs = kcp_counter_diff(sampler->prev_retrans_segs, retrans_segs); record.has_fast_retrans_segs = 1; record.fast_retrans_segs = kcp_counter_diff(sampler->prev_fast_retrans_segs, fast_retrans_segs); record.has_lost_segs = 1; record.lost_segs = kcp_counter_diff(sampler->prev_lost_segs, lost_segs); record.has_repeat_segs = 1; record.repeat_segs = kcp_counter_diff(sampler->prev_repeat_segs, repeat_segs); record.has_in_errs = 1; record.in_errs = kcp_counter_diff(sampler->prev_in_errs, in_errs); record.has_kcp_in_errs = 1; record.kcp_in_errs = kcp_counter_diff(sampler->prev_kcp_in_errs, kcp_in_errs); record.has_ring_buffer_snd_queue = 1; record.ring_buffer_snd_queue = snd_queue; record.has_ring_buffer_rcv_queue = 1; record.ring_buffer_rcv_queue = rcv_queue; record.has_ring_buffer_snd_buffer = 1; record.ring_buffer_snd_buffer = snd_buffer; record.has_curr_estab = 1; record.curr_estab = atomic_load_explicit(&sampler->curr_estab, memory_order_relaxed); sampler->prev_bytes_sent = bytes_sent; sampler->prev_bytes_received = bytes_received; sampler->prev_in_pkts = in_pkts; sampler->prev_out_pkts = out_pkts; sampler->prev_in_segs = in_segs; sampler->prev_out_segs = out_segs; sampler->prev_retrans_segs = retrans_segs; sampler->prev_fast_retrans_segs = fast_retrans_segs; sampler->prev_lost_segs = lost_segs; sampler->prev_repeat_segs = repeat_segs; sampler->prev_in_errs = in_errs; sampler->prev_kcp_in_errs = kcp_in_errs; (void) kcp_session_stats_log(sampler->logger, &record); } static void *kcp_process_sampler_thread_main(void *arg) { kcp_process_sampler_t *sampler = (kcp_process_sampler_t *) arg; for (;;) { int has_request = 0; uint64_t request_id = 0; char reason[32]; struct timespec deadline; clock_gettime(CLOCK_REALTIME, &deadline); deadline.tv_sec += sampler->stats_interval_ms / 1000; deadline.tv_nsec += (long) (sampler->stats_interval_ms % 1000) * 1000000L; if (deadline.tv_nsec >= 1000000000L) { deadline.tv_sec += 1; deadline.tv_nsec -= 1000000000L; } pthread_mutex_lock(&sampler->lock); while (!sampler->stopped && !sampler->request_pending) { int wait_rc = pthread_cond_timedwait(&sampler->cond, &sampler->lock, &deadline); if (wait_rc == ETIMEDOUT) { break; } } if (sampler->stopped) { pthread_mutex_unlock(&sampler->lock); return NULL; } if (sampler->request_pending) { has_request = 1; request_id = sampler->pending_request_id; snprintf(reason, sizeof(reason), "%s", sampler->pending_reason); sampler->request_pending = 0; } else { snprintf(reason, sizeof(reason), "%s", "periodic"); } pthread_mutex_unlock(&sampler->lock); kcp_process_sampler_log_snapshot(sampler, reason); if (has_request) { pthread_mutex_lock(&sampler->lock); if (request_id > sampler->completed_request_id) { sampler->completed_request_id = request_id; } pthread_cond_broadcast(&sampler->cond); pthread_mutex_unlock(&sampler->lock); } } } static kcp_process_sampler_t *kcp_process_sampler_acquire(kcp_session_stats_logger_t *logger, const char *node_role, const char *node_id, int stats_interval_ms) { kcp_process_sampler_t *sampler; if (logger == NULL) { return NULL; } pthread_mutex_lock(&g_kcp_process_sampler_mu); for (sampler = g_kcp_process_samplers; sampler != NULL; sampler = sampler->next) { if (kcp_process_sampler_matches(sampler, logger, node_role, node_id, stats_interval_ms)) { sampler->refcount++; pthread_mutex_unlock(&g_kcp_process_sampler_mu); return sampler; } } sampler = (kcp_process_sampler_t *) calloc(1, sizeof(*sampler)); if (sampler == NULL) { pthread_mutex_unlock(&g_kcp_process_sampler_mu); return NULL; } sampler->logger = logger; sampler->stats_interval_ms = stats_interval_ms > 0 ? stats_interval_ms : KCP_DEFAULT_STATS_INTERVAL_MS; sampler->refcount = 1; snprintf(sampler->node_role, sizeof(sampler->node_role), "%s", node_role == NULL ? "" : node_role); snprintf(sampler->node_id, sizeof(sampler->node_id), "%s", node_id == NULL ? "" : node_id); pthread_mutex_init(&sampler->lock, NULL); pthread_cond_init(&sampler->cond, NULL); if (pthread_create(&sampler->thread, NULL, kcp_process_sampler_thread_main, sampler) != 0) { pthread_cond_destroy(&sampler->cond); pthread_mutex_destroy(&sampler->lock); free(sampler); pthread_mutex_unlock(&g_kcp_process_sampler_mu); return NULL; } sampler->thread_started = 1; sampler->next = g_kcp_process_samplers; g_kcp_process_samplers = sampler; pthread_mutex_unlock(&g_kcp_process_sampler_mu); return sampler; } static void kcp_process_sampler_release(kcp_process_sampler_t *sampler) { kcp_process_sampler_t **cursor; if (sampler == NULL) { return; } pthread_mutex_lock(&g_kcp_process_sampler_mu); sampler->refcount--; if (sampler->refcount > 0) { pthread_mutex_unlock(&g_kcp_process_sampler_mu); return; } for (cursor = &g_kcp_process_samplers; *cursor != NULL; cursor = &(*cursor)->next) { if (*cursor == sampler) { *cursor = sampler->next; break; } } pthread_mutex_unlock(&g_kcp_process_sampler_mu); pthread_mutex_lock(&sampler->lock); sampler->stopped = 1; pthread_cond_broadcast(&sampler->cond); pthread_mutex_unlock(&sampler->lock); if (sampler->thread_started) { pthread_join(sampler->thread, NULL); } pthread_cond_destroy(&sampler->cond); pthread_mutex_destroy(&sampler->lock); free(sampler); } static void kcp_process_sampler_request_sample(kcp_process_sampler_t *sampler, const char *reason) { if (sampler == NULL) { return; } pthread_mutex_lock(&sampler->lock); if (!sampler->stopped && !sampler->request_pending) { sampler->request_pending = 1; sampler->pending_request_id++; snprintf(sampler->pending_reason, sizeof(sampler->pending_reason), "%s", reason == NULL ? "" : reason); pthread_cond_broadcast(&sampler->cond); } pthread_mutex_unlock(&sampler->lock); } static void kcp_process_sampler_request_sample_and_wait(kcp_process_sampler_t *sampler, const char *reason) { uint64_t request_id; if (sampler == NULL) { return; } pthread_mutex_lock(&sampler->lock); if (sampler->stopped) { pthread_mutex_unlock(&sampler->lock); return; } sampler->request_pending = 1; request_id = ++sampler->pending_request_id; snprintf(sampler->pending_reason, sizeof(sampler->pending_reason), "%s", reason == NULL ? "" : reason); pthread_cond_broadcast(&sampler->cond); while (!sampler->stopped && sampler->completed_request_id < request_id) { pthread_cond_wait(&sampler->cond, &sampler->lock); } pthread_mutex_unlock(&sampler->lock); } static int kcp_socket_debug_log_record(kcp_socket_debug_state_t *state, const char *event_name, const struct sockaddr_storage *remote_addr, socklen_t remote_addr_len, int packet_bytes, int has_tx_id, uint32_t tx_id, int has_conv, uint32_t conv, const kcp_packet_debug_segment_t *segments, size_t segment_count, int64_t ts_unix_nano) { char local_addr_text[OMNI_MAX_ADDR_TEXT]; char remote_addr_text[OMNI_MAX_ADDR_TEXT]; struct sockaddr_storage local_addr; socklen_t local_addr_len = sizeof(local_addr); kcp_packet_debug_record_t record; if (state->logger == NULL) { return 0; } memset(&record, 0, sizeof(record)); getsockname(state->fd, (struct sockaddr *) &local_addr, &local_addr_len); omni_sockaddr_to_string((struct sockaddr *) &local_addr, local_addr_len, local_addr_text, sizeof(local_addr_text)); omni_sockaddr_to_string((const struct sockaddr *) remote_addr, remote_addr_len, remote_addr_text, sizeof(remote_addr_text)); snprintf(record.event, sizeof(record.event), "%s", event_name); snprintf(record.node_role, sizeof(record.node_role), "%s", state->node_role); snprintf(record.node_id, sizeof(record.node_id), "%s", state->node_id); snprintf(record.local_addr, sizeof(record.local_addr), "%s", local_addr_text); snprintf(record.remote_addr, sizeof(record.remote_addr), "%s", remote_addr_text); record.packet_bytes = packet_bytes; record.has_udp_tx_id = has_tx_id; record.udp_tx_id = tx_id; record.has_kcp_conv = has_conv; record.kcp_conv = conv; record.ts_unix_nano = ts_unix_nano; if (segment_count > 0) { record.segments = (kcp_packet_debug_segment_t *) calloc(segment_count, sizeof(*record.segments)); if (record.segments == NULL) { return -1; } memcpy(record.segments, segments, segment_count * sizeof(*segments)); record.segment_count = segment_count; } kcp_packet_debug_log(state->logger, &record); kcp_packet_debug_record_clear(&record); return 0; } static void kcp_socket_debug_pending_free(kcp_packet_debug_pending_t *pending) { while (pending != NULL) { kcp_packet_debug_pending_t *next = pending->next; free(pending->segments); free(pending); pending = next; } } static int kcp_socket_debug_reserve_tx(kcp_socket_debug_state_t *state, const struct sockaddr_storage *remote_addr, socklen_t remote_addr_len, const uint8_t *packet, size_t packet_len, uint32_t *out_tx_id) { kcp_packet_debug_pending_t *pending; if (state->logger == NULL) { *out_tx_id = 0; return 0; } pending = (kcp_packet_debug_pending_t *) calloc(1, sizeof(*pending)); if (pending == NULL) { return -1; } pending->tx_id = state->next_tx_id++; pending->packet_bytes = (int) packet_len; memcpy(&pending->remote_addr, remote_addr, sizeof(*remote_addr)); pending->remote_addr_len = remote_addr_len; kcp_parse_packet_segments(packet, packet_len, &pending->conv, &pending->segments, &pending->segment_count); pending->has_conv = packet_len >= 4; pthread_mutex_lock(&state->pending_mu); pending->next = state->pending_head; state->pending_head = pending; pthread_mutex_unlock(&state->pending_mu); *out_tx_id = pending->tx_id; return 0; } static void kcp_socket_debug_rollback_tx(kcp_socket_debug_state_t *state, uint32_t tx_id) { kcp_packet_debug_pending_t *prev = NULL; kcp_packet_debug_pending_t *cur; pthread_mutex_lock(&state->pending_mu); for (cur = state->pending_head; cur != NULL; cur = cur->next) { if (cur->tx_id == tx_id) { if (prev == NULL) { state->pending_head = cur->next; } else { prev->next = cur->next; } free(cur->segments); free(cur); break; } prev = cur; } pthread_mutex_unlock(&state->pending_mu); } static void *kcp_socket_debug_errqueue_thread(void *arg) { kcp_socket_debug_state_t *state = (kcp_socket_debug_state_t *) arg; uint8_t control[512]; uint8_t dummy = 0; struct iovec iov; struct msghdr msg; while (!atomic_load(&state->closed)) { ssize_t rc; omni_tx_timestamp_event_t event; kcp_packet_debug_pending_t *prev = NULL; kcp_packet_debug_pending_t *cur = NULL; memset(&msg, 0, sizeof(msg)); iov.iov_base = &dummy; iov.iov_len = sizeof(dummy); msg.msg_iov = &iov; msg.msg_iovlen = 1; msg.msg_control = control; msg.msg_controllen = sizeof(control); rc = recvmsg(state->fd, &msg, MSG_ERRQUEUE | MSG_DONTWAIT); if (rc < 0) { if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) { usleep(10000); continue; } if (atomic_load(&state->closed)) { return NULL; } usleep(10000); continue; } if (linux_timestamping_parse_tx_timestamp(&msg, &event) != 0) { continue; } pthread_mutex_lock(&state->pending_mu); for (cur = state->pending_head; cur != NULL; cur = cur->next) { if (cur->tx_id == event.ee_data) { break; } prev = cur; } if (cur != NULL) { if (strcmp(event.event_name, EVENT_A_TX_SCHED) == 0) { cur->saw_sched = 1; } else if (strcmp(event.event_name, EVENT_A_TX_SOFTWARE) == 0) { cur->saw_software = 1; } kcp_socket_debug_log_record(state, event.event_name, &cur->remote_addr, cur->remote_addr_len, cur->packet_bytes, 1, cur->tx_id, cur->has_conv, cur->conv, cur->segments, cur->segment_count, event.ts_unix_nano); if (cur->saw_sched && cur->saw_software) { if (prev == NULL) { state->pending_head = cur->next; } else { prev->next = cur->next; } free(cur->segments); free(cur); } } pthread_mutex_unlock(&state->pending_mu); } return NULL; } static int kcp_socket_debug_init(kcp_socket_debug_state_t *state, int fd, kcp_packet_debug_logger_t *logger, const char *node_role, const char *node_id) { int thread_rc; memset(state, 0, sizeof(*state)); state->fd = fd; state->logger = logger; snprintf(state->node_role, sizeof(state->node_role), "%s", node_role == NULL ? "" : node_role); snprintf(state->node_id, sizeof(state->node_id), "%s", node_id == NULL ? "" : node_id); pthread_mutex_init(&state->write_mu, NULL); pthread_mutex_init(&state->pending_mu, NULL); if (logger != NULL) { if (linux_timestamping_enable_udp_socket(fd, 1) != 0) { pthread_mutex_destroy(&state->write_mu); pthread_mutex_destroy(&state->pending_mu); return -1; } thread_rc = pthread_create(&state->errqueue_thread, NULL, kcp_socket_debug_errqueue_thread, state); if (thread_rc != 0) { errno = thread_rc; pthread_mutex_destroy(&state->write_mu); pthread_mutex_destroy(&state->pending_mu); return -1; } state->errqueue_thread_started = 1; } return 0; } static void kcp_socket_debug_destroy(kcp_socket_debug_state_t *state) { atomic_store(&state->closed, 1); if (state->errqueue_thread_started) { pthread_join(state->errqueue_thread, NULL); } kcp_socket_debug_pending_free(state->pending_head); pthread_mutex_destroy(&state->write_mu); pthread_mutex_destroy(&state->pending_mu); } static int kcp_socket_send_packet(kcp_socket_debug_state_t *state, const struct sockaddr_storage *remote_addr, socklen_t remote_addr_len, const uint8_t *packet, size_t packet_len) { uint32_t tx_id = 0; ssize_t rc; if (state->logger != NULL && kcp_socket_debug_reserve_tx(state, remote_addr, remote_addr_len, packet, packet_len, &tx_id) != 0) { atomic_store(&state->last_send_errno, errno != 0 ? errno : EIO); return -1; } pthread_mutex_lock(&state->write_mu); rc = sendto(state->fd, packet, packet_len, 0, (const struct sockaddr *) remote_addr, remote_addr_len); pthread_mutex_unlock(&state->write_mu); if (rc < 0 || (size_t) rc != packet_len) { if (rc >= 0 && (size_t) rc != packet_len && errno == 0) { errno = EIO; } atomic_store(&state->last_send_errno, errno != 0 ? errno : EIO); if (state->logger != NULL) { kcp_socket_debug_rollback_tx(state, tx_id); } return -1; } atomic_store(&state->last_send_errno, 0); return 0; } static int kcp_output_callback_impl(const char *buf, int len, struct IKCPCB *kcp, void *user) { kcp_conn_t *conn = (kcp_conn_t *) user; size_t segment_count = 0; (void) kcp; if (conn == NULL || atomic_load(&conn->closed)) { return -1; } kcp_parse_packet_segments((const uint8_t *) buf, (size_t) len, NULL, NULL, &segment_count); if (kcp_socket_send_packet(conn->sock_state, &conn->remote_addr, conn->remote_addr_len, (const uint8_t *) buf, (size_t) len) != 0) { return -1; } kcp_conn_record_send(conn, len, segment_count); return len; } static int kcp_conn_attach_process_sampler(kcp_conn_t *conn) { kcp_process_sampler_t *next_sampler; kcp_process_sampler_t *previous_sampler; uint64_t pending_bytes_sent = 0; uint64_t pending_bytes_received = 0; uint64_t pending_in_pkts = 0; uint64_t pending_out_pkts = 0; uint64_t pending_in_segs = 0; uint64_t pending_out_segs = 0; uint64_t pending_in_errs = 0; uint64_t pending_kcp_in_errs = 0; if (conn == NULL) { errno = EINVAL; return -1; } next_sampler = kcp_process_sampler_acquire(conn->stats_logger, conn->node_role, conn->node_id, conn->stats_interval_ms); if (conn->stats_logger != NULL && next_sampler == NULL) { return -1; } previous_sampler = conn->process_sampler; if (previous_sampler == next_sampler) { return 0; } if (next_sampler != NULL) { kcp_process_sampler_add_conn(next_sampler, conn); } pthread_mutex_lock(&conn->kcp_mu); previous_sampler = conn->process_sampler; conn->process_sampler = next_sampler; pending_bytes_sent = conn->pending_bytes_sent; pending_bytes_received = conn->pending_bytes_received; pending_in_pkts = conn->pending_in_pkts; pending_out_pkts = conn->pending_out_pkts; pending_in_segs = conn->pending_in_segs; pending_out_segs = conn->pending_out_segs; pending_in_errs = conn->pending_in_errs; pending_kcp_in_errs = conn->pending_kcp_in_errs; conn->pending_bytes_sent = 0; conn->pending_bytes_received = 0; conn->pending_in_pkts = 0; conn->pending_out_pkts = 0; conn->pending_in_segs = 0; conn->pending_out_segs = 0; conn->pending_in_errs = 0; conn->pending_kcp_in_errs = 0; pthread_mutex_unlock(&conn->kcp_mu); if (next_sampler != NULL) { atomic_fetch_add_explicit(&next_sampler->bytes_sent, pending_bytes_sent, memory_order_relaxed); atomic_fetch_add_explicit(&next_sampler->bytes_received, pending_bytes_received, memory_order_relaxed); atomic_fetch_add_explicit(&next_sampler->in_pkts, pending_in_pkts, memory_order_relaxed); atomic_fetch_add_explicit(&next_sampler->out_pkts, pending_out_pkts, memory_order_relaxed); atomic_fetch_add_explicit(&next_sampler->in_segs, pending_in_segs, memory_order_relaxed); atomic_fetch_add_explicit(&next_sampler->out_segs, pending_out_segs, memory_order_relaxed); atomic_fetch_add_explicit(&next_sampler->in_errs, pending_in_errs, memory_order_relaxed); atomic_fetch_add_explicit(&next_sampler->kcp_in_errs, pending_kcp_in_errs, memory_order_relaxed); } if (previous_sampler != NULL) { kcp_process_sampler_remove_conn(previous_sampler, conn); kcp_process_sampler_release(previous_sampler); } return 0; } static void kcp_conn_detach_process_sampler(kcp_conn_t *conn) { kcp_process_sampler_t *sampler; if (conn == NULL || conn->process_sampler == NULL) { return; } sampler = conn->process_sampler; conn->process_sampler = NULL; kcp_process_sampler_remove_conn(sampler, conn); kcp_process_sampler_release(sampler); } static void kcp_log_session_snapshot(kcp_conn_t *conn, const char *reason) { kcp_session_stats_record_t record; struct sockaddr_storage local_addr; socklen_t local_len = sizeof(local_addr); char local_text[OMNI_MAX_ADDR_TEXT]; char remote_text[OMNI_MAX_ADDR_TEXT]; uint32_t inflight = 0; uint32_t window_limit = 0; uint64_t out_segs_total = 0; uint64_t fast_retrans_total = 0; uint64_t lost_total = 0; if (conn == NULL || conn->stats_logger == NULL || conn->sock_state == NULL || conn->kcp == NULL) { return; } memset(&record, 0, sizeof(record)); snprintf(record.record_type, sizeof(record.record_type), "%s", KCP_SESSION_STATS_RECORD_SESSION_SAMPLE); snprintf(record.node_role, sizeof(record.node_role), "%s", conn->node_role); snprintf(record.node_id, sizeof(record.node_id), "%s", conn->node_id); getsockname(conn->sock_state->fd, (struct sockaddr *) &local_addr, &local_len); omni_sockaddr_to_string((struct sockaddr *) &local_addr, local_len, local_text, sizeof(local_text)); omni_sockaddr_to_string((struct sockaddr *) &conn->remote_addr, conn->remote_addr_len, remote_text, sizeof(remote_text)); snprintf(record.local_addr, sizeof(record.local_addr), "%s", local_text); snprintf(record.remote_addr, sizeof(record.remote_addr), "%s", remote_text); record.has_conv = 1; record.conv = conn->kcp->conv; record.ts_unix_nano = omni_now_unix_nano(); snprintf(record.sample_reason, sizeof(record.sample_reason), "%s", reason); pthread_mutex_lock(&conn->kcp_mu); record.has_rto_ms = 1; record.rto_ms = conn->kcp->rx_rto; record.has_srtt_ms = 1; record.srtt_ms = conn->kcp->rx_srtt; record.has_srttvar_ms = 1; record.srttvar_ms = conn->kcp->rx_rttval; record.has_snd_wnd = 1; record.snd_wnd = conn->kcp->snd_wnd; record.has_rmt_wnd = 1; record.rmt_wnd = conn->kcp->rmt_wnd; inflight = conn->kcp->snd_nxt - conn->kcp->snd_una; window_limit = conn->kcp->snd_wnd < conn->kcp->rmt_wnd ? conn->kcp->snd_wnd : conn->kcp->rmt_wnd; record.has_inflight = 1; record.inflight = inflight; record.has_window_limit = 1; record.window_limit = window_limit; record.has_window_pressure_pct = 1; record.window_pressure_pct = window_limit == 0 ? 0.0 : ((double) inflight * 100.0) / (double) window_limit; record.has_ring_buffer_snd_queue = 1; record.ring_buffer_snd_queue = conn->kcp->nsnd_que; record.has_ring_buffer_rcv_queue = 1; record.ring_buffer_rcv_queue = conn->kcp->nrcv_que; record.has_ring_buffer_snd_buffer = 1; record.ring_buffer_snd_buffer = conn->kcp->nsnd_buf; lost_total = conn->kcp->timeout_retrans_total; fast_retrans_total = conn->kcp->fast_retrans_total; record.has_retrans_segs = 1; record.retrans_segs = lost_total + fast_retrans_total; record.has_fast_retrans_segs = 1; record.fast_retrans_segs = fast_retrans_total; record.has_lost_segs = 1; record.lost_segs = lost_total; record.has_repeat_segs = 1; record.repeat_segs = conn->kcp->duplicate_recv_total; pthread_mutex_unlock(&conn->kcp_mu); out_segs_total = atomic_load_explicit(&conn->total_out_segs, memory_order_relaxed); record.has_out_segs = 1; record.out_segs = out_segs_total; (void) kcp_session_stats_log(conn->stats_logger, &record); } static void *kcp_stats_thread_main(void *arg) { kcp_conn_t *conn = (kcp_conn_t *) arg; while (!atomic_load(&conn->closed)) { usleep((useconds_t) conn->stats_interval_ms * 1000U); if (!atomic_load(&conn->closed)) { kcp_log_session_snapshot(conn, "periodic"); } } return NULL; } static int kcp_socket_open_bound(const char *listen_addr, const char *bind_device, struct sockaddr_storage *local_addr, socklen_t *local_len) { int family; int fd; if (omni_parse_sockaddr(listen_addr, 1, local_addr, local_len, &family) != 0) { return -1; } fd = socket(family, SOCK_DGRAM, 0); if (fd < 0) { return -1; } if (bind_device != NULL && bind_device[0] != '\0' && omni_bind_device(fd, bind_device) != 0) { close(fd); return -1; } if (bind(fd, (struct sockaddr *) local_addr, *local_len) != 0) { close(fd); return -1; } return fd; } static int kcp_socket_open_dial(const char *server_addr, const char *bind_ip, const char *bind_device, struct sockaddr_storage *remote_addr, socklen_t *remote_len, int *family_out) { int family; struct sockaddr_storage local_addr; socklen_t local_len; int fd; if (omni_parse_sockaddr(server_addr, 0, remote_addr, remote_len, &family) != 0) { return -1; } fd = socket(family, SOCK_DGRAM, 0); if (fd < 0) { return -1; } if (bind_device != NULL && bind_device[0] != '\0' && omni_bind_device(fd, bind_device) != 0) { close(fd); return -1; } if (bind_ip != NULL && bind_ip[0] != '\0') { struct addrinfo hints; struct addrinfo *result = NULL; memset(&hints, 0, sizeof(hints)); hints.ai_family = family; hints.ai_socktype = SOCK_DGRAM; if (getaddrinfo(bind_ip, "0", &hints, &result) != 0 || result == NULL) { close(fd); errno = EINVAL; return -1; } memcpy(&local_addr, result->ai_addr, result->ai_addrlen); local_len = (socklen_t) result->ai_addrlen; freeaddrinfo(result); if (bind(fd, (struct sockaddr *) &local_addr, local_len) != 0) { close(fd); return -1; } } if (family_out != NULL) { *family_out = family; } return fd; } static int kcp_sockaddr_equal(const struct sockaddr_storage *left, socklen_t left_len, const struct sockaddr_storage *right, socklen_t right_len) { char left_text[OMNI_MAX_ADDR_TEXT]; char right_text[OMNI_MAX_ADDR_TEXT]; if (left == NULL || right == NULL) { return left == right; } return strcmp( omni_sockaddr_to_string((const struct sockaddr *) left, left_len, left_text, sizeof(left_text)), omni_sockaddr_to_string((const struct sockaddr *) right, right_len, right_text, sizeof(right_text)) ) == 0; } static void *kcp_client_recv_thread_main(void *arg) { kcp_conn_t *conn = (kcp_conn_t *) arg; uint8_t buffer[64 * 1024]; uint8_t control[512]; struct sockaddr_storage source; struct iovec iov; struct msghdr msg; ssize_t n; uint32_t conv = 0; kcp_packet_debug_segment_t *segments = NULL; size_t segment_count = 0; int64_t rx_ts; while (!atomic_load(&conn->closed)) { memset(&msg, 0, sizeof(msg)); memset(&source, 0, sizeof(source)); iov.iov_base = buffer; iov.iov_len = sizeof(buffer); msg.msg_name = &source; msg.msg_namelen = sizeof(source); msg.msg_iov = &iov; msg.msg_iovlen = 1; if (conn->sock_state->logger != NULL) { msg.msg_control = control; msg.msg_controllen = sizeof(control); } n = recvmsg(conn->fd, &msg, 0); if (n < 0) { if (errno == EINTR) { continue; } if (atomic_load(&conn->closed)) { return NULL; } return NULL; } kcp_parse_packet_segments(buffer, (size_t) n, &conv, &segments, &segment_count); rx_ts = conn->sock_state->logger != NULL ? linux_timestamping_parse_rx_timestamp(&msg) : 0; if (rx_ts > 0) { kcp_socket_debug_log_record(conn->sock_state, EVENT_B_RX_SOFTWARE, &source, msg.msg_namelen, (int) n, 0, 0, 1, conv, segments, segment_count, rx_ts); } if (!kcp_sockaddr_equal(&source, msg.msg_namelen, &conn->remote_addr, conn->remote_addr_len)) { free(segments); segments = NULL; segment_count = 0; continue; } pthread_mutex_lock(&conn->kcp_mu); conn->kcp->current = omni_now_millis32(); if (ikcp_input(conn->kcp, (const char *) buffer, n) != 0) { kcp_conn_record_error(conn); } else { kcp_conn_record_input(conn, (int) n, segment_count); } pthread_mutex_unlock(&conn->kcp_mu); pthread_cond_broadcast(&conn->rx_cond); free(segments); segments = NULL; segment_count = 0; } return NULL; } static void *kcp_update_thread_main(void *arg) { kcp_conn_t *conn = (kcp_conn_t *) arg; while (!atomic_load(&conn->closed)) { int interval_ms; pthread_mutex_lock(&conn->kcp_mu); ikcp_update(conn->kcp, omni_now_millis32()); interval_ms = conn->update_interval_ms > 0 ? conn->update_interval_ms : KCP_DEFAULT_INTERVAL_MS; pthread_mutex_unlock(&conn->kcp_mu); usleep((useconds_t) interval_ms * 1000U); } return NULL; } static int kcp_conn_start_stats_thread(kcp_conn_t *conn) { int thread_rc; if (conn == NULL || conn->stats_logger == NULL || conn->stats_thread_started) { return 0; } thread_rc = pthread_create(&conn->stats_thread, NULL, kcp_stats_thread_main, conn); if (thread_rc != 0) { errno = thread_rc; return -1; } conn->stats_thread_started = 1; return 0; } static kcp_conn_t *kcp_conn_alloc_common(int fd, const struct sockaddr_storage *remote_addr, socklen_t remote_addr_len, const kcp_conn_options_t *options, kcp_socket_debug_state_t *sock_state, latency_logger_t *logger, const char *node_role, const char *node_id, kcp_session_stats_logger_t *stats_logger, int stats_interval_ms) { kcp_conn_t *conn = (kcp_conn_t *) calloc(1, sizeof(*conn)); uint32_t conv; int thread_rc; kcp_conn_options_t effective_options; if (conn == NULL) { errno = ENOMEM; return NULL; } conn->fd = fd; memcpy(&conn->remote_addr, remote_addr, sizeof(*remote_addr)); conn->remote_addr_len = remote_addr_len; pthread_mutex_init(&conn->kcp_mu, NULL); pthread_mutex_init(&conn->close_mu, NULL); pthread_cond_init(&conn->rx_cond, NULL); protocol_frame_decoder_init(&conn->decoder); conn->logger = logger; snprintf(conn->node_role, sizeof(conn->node_role), "%s", node_role == NULL ? "" : node_role); snprintf(conn->node_id, sizeof(conn->node_id), "%s", node_id == NULL ? "" : node_id); conn->stats_logger = stats_logger; conn->stats_interval_ms = stats_interval_ms > 0 ? stats_interval_ms : KCP_DEFAULT_STATS_INTERVAL_MS; kcp_conn_options_init(&effective_options); if (options != NULL) { effective_options = *options; } conn->options = effective_options; conn->update_interval_ms = effective_options.interval_ms; conn->sock_state = sock_state; if (omni_random_u32(&conv) != 0) { protocol_frame_decoder_destroy(&conn->decoder); pthread_cond_destroy(&conn->rx_cond); pthread_mutex_destroy(&conn->kcp_mu); pthread_mutex_destroy(&conn->close_mu); free(conn); return NULL; } conn->kcp = ikcp_create(conv, conn); if (conn->kcp == NULL) { errno = ENOMEM; protocol_frame_decoder_destroy(&conn->decoder); pthread_cond_destroy(&conn->rx_cond); pthread_mutex_destroy(&conn->kcp_mu); pthread_mutex_destroy(&conn->close_mu); free(conn); return NULL; } ikcp_setoutput(conn->kcp, kcp_output_callback_impl); if (kcp_conn_apply_options_locked(conn, &effective_options) != 0) { ikcp_release(conn->kcp); protocol_frame_decoder_destroy(&conn->decoder); pthread_cond_destroy(&conn->rx_cond); pthread_mutex_destroy(&conn->kcp_mu); pthread_mutex_destroy(&conn->close_mu); free(conn); return NULL; } if (kcp_conn_attach_process_sampler(conn) != 0) { ikcp_release(conn->kcp); protocol_frame_decoder_destroy(&conn->decoder); pthread_cond_destroy(&conn->rx_cond); pthread_mutex_destroy(&conn->kcp_mu); pthread_mutex_destroy(&conn->close_mu); free(conn); return NULL; } if (kcp_conn_start_stats_thread(conn) != 0) { kcp_conn_detach_process_sampler(conn); ikcp_release(conn->kcp); protocol_frame_decoder_destroy(&conn->decoder); pthread_cond_destroy(&conn->rx_cond); pthread_mutex_destroy(&conn->kcp_mu); pthread_mutex_destroy(&conn->close_mu); free(conn); return NULL; } thread_rc = pthread_create(&conn->update_thread, NULL, kcp_update_thread_main, conn); if (thread_rc != 0) { errno = thread_rc; if (conn->stats_thread_started) { atomic_store(&conn->closed, 1); pthread_join(conn->stats_thread, NULL); } kcp_conn_detach_process_sampler(conn); ikcp_release(conn->kcp); protocol_frame_decoder_destroy(&conn->decoder); pthread_cond_destroy(&conn->rx_cond); pthread_mutex_destroy(&conn->kcp_mu); pthread_mutex_destroy(&conn->close_mu); free(conn); return NULL; } conn->update_thread_started = 1; return conn; } kcp_conn_t *kcp_conn_dial_with_options(const char *server_addr, const char *bind_ip, const char *bind_device, const kcp_conn_options_t *options, kcp_packet_debug_logger_t *packet_logger, latency_logger_t *logger, const char *node_role, const char *node_id, kcp_session_stats_logger_t *stats_logger, int stats_interval_ms) { struct sockaddr_storage remote_addr; socklen_t remote_len; int family; int fd = kcp_socket_open_dial(server_addr, bind_ip, bind_device, &remote_addr, &remote_len, &family); kcp_conn_t *conn; kcp_socket_debug_state_t *sock_state; int thread_rc; (void) family; if (fd < 0) { return NULL; } sock_state = (kcp_socket_debug_state_t *) calloc(1, sizeof(*sock_state)); if (sock_state == NULL) { errno = ENOMEM; close(fd); return NULL; } if (kcp_socket_debug_init(sock_state, fd, packet_logger, node_role, node_id) != 0) { free(sock_state); close(fd); return NULL; } conn = kcp_conn_alloc_common(fd, &remote_addr, remote_len, options, sock_state, logger, node_role, node_id, stats_logger, stats_interval_ms); if (conn == NULL) { kcp_socket_debug_destroy(sock_state); free(sock_state); close(fd); return NULL; } conn->is_client = 1; conn->owns_socket = 1; thread_rc = pthread_create(&conn->recv_thread, NULL, kcp_client_recv_thread_main, conn); if (thread_rc != 0) { errno = thread_rc; kcp_conn_free(conn); return NULL; } conn->recv_thread_started = 1; return conn; } kcp_conn_t *kcp_conn_dial(const char *server_addr, const char *bind_ip, const char *bind_device, kcp_packet_debug_logger_t *packet_logger, latency_logger_t *logger, const char *node_role, const char *node_id, kcp_session_stats_logger_t *stats_logger, int stats_interval_ms) { return kcp_conn_dial_with_options(server_addr, bind_ip, bind_device, NULL, packet_logger, logger, node_role, node_id, stats_logger, stats_interval_ms); } static void kcp_listener_enqueue_accept(kcp_listener_t *listener, kcp_conn_t *conn) { pthread_mutex_lock(&listener->accept_mu); if (listener->accept_tail == NULL) { listener->accept_head = conn; } else { listener->accept_tail->accept_next = conn; } listener->accept_tail = conn; conn->accept_next = NULL; pthread_cond_signal(&listener->accept_cond); pthread_mutex_unlock(&listener->accept_mu); } static kcp_conn_t *kcp_listener_find_session(kcp_listener_t *listener, uint32_t conv) { kcp_session_entry_t *entry; for (entry = listener->sessions; entry != NULL; entry = entry->next) { if (entry->conv == conv) { return entry->conn; } } return NULL; } static int kcp_listener_add_session(kcp_listener_t *listener, uint32_t conv, kcp_conn_t *conn) { kcp_session_entry_t *entry = (kcp_session_entry_t *) calloc(1, sizeof(*entry)); if (entry == NULL) { return -1; } entry->conv = conv; entry->conn = conn; entry->next = listener->sessions; listener->sessions = entry; return 0; } static void kcp_listener_remove_session(kcp_listener_t *listener, kcp_conn_t *conn) { kcp_session_entry_t *prev_entry = NULL; kcp_session_entry_t *entry; kcp_conn_t *prev_accept = NULL; kcp_conn_t *accept; if (listener == NULL || conn == NULL) { return; } pthread_mutex_lock(&listener->lock); for (entry = listener->sessions; entry != NULL; entry = entry->next) { if (entry->conn == conn) { if (prev_entry == NULL) { listener->sessions = entry->next; } else { prev_entry->next = entry->next; } free(entry); break; } prev_entry = entry; } pthread_mutex_unlock(&listener->lock); pthread_mutex_lock(&listener->accept_mu); for (accept = listener->accept_head; accept != NULL; accept = accept->accept_next) { if (accept == conn) { if (prev_accept == NULL) { listener->accept_head = accept->accept_next; } else { prev_accept->accept_next = accept->accept_next; } if (listener->accept_tail == conn) { listener->accept_tail = prev_accept; } conn->accept_next = NULL; break; } prev_accept = accept; } pthread_mutex_unlock(&listener->accept_mu); } static void *kcp_listener_recv_thread_main(void *arg) { kcp_listener_t *listener = (kcp_listener_t *) arg; uint8_t buffer[64 * 1024]; uint8_t control[512]; struct sockaddr_storage source; struct iovec iov; struct msghdr msg; ssize_t n; uint32_t conv; kcp_packet_debug_segment_t *segments = NULL; size_t segment_count = 0; int64_t rx_ts; while (!listener->closed) { memset(&msg, 0, sizeof(msg)); memset(&source, 0, sizeof(source)); iov.iov_base = buffer; iov.iov_len = sizeof(buffer); msg.msg_name = &source; msg.msg_namelen = sizeof(source); msg.msg_iov = &iov; msg.msg_iovlen = 1; if (listener->sock_state.logger != NULL) { msg.msg_control = control; msg.msg_controllen = sizeof(control); } n = recvmsg(listener->fd, &msg, 0); if (n < 0) { if (errno == EINTR) { continue; } if (listener->closed) { return NULL; } return NULL; } kcp_parse_packet_segments(buffer, (size_t) n, &conv, &segments, &segment_count); rx_ts = listener->sock_state.logger != NULL ? linux_timestamping_parse_rx_timestamp(&msg) : 0; if (rx_ts > 0) { kcp_socket_debug_log_record(&listener->sock_state, EVENT_B_RX_SOFTWARE, &source, msg.msg_namelen, (int) n, 0, 0, 1, conv, segments, segment_count, rx_ts); } pthread_mutex_lock(&listener->lock); { kcp_conn_t *conn = kcp_listener_find_session(listener, conv); if (conn == NULL) { conn = (kcp_conn_t *) calloc(1, sizeof(*conn)); if (conn != NULL) { kcp_conn_options_t accepted_options; conn->fd = listener->fd; memcpy(&conn->remote_addr, &source, sizeof(source)); conn->remote_addr_len = msg.msg_namelen; pthread_mutex_init(&conn->kcp_mu, NULL); pthread_mutex_init(&conn->close_mu, NULL); pthread_cond_init(&conn->rx_cond, NULL); protocol_frame_decoder_init(&conn->decoder); snprintf(conn->node_role, sizeof(conn->node_role), "%s", listener->sock_state.node_role); snprintf(conn->node_id, sizeof(conn->node_id), "%s", listener->sock_state.node_id); conn->stats_interval_ms = KCP_DEFAULT_STATS_INTERVAL_MS; conn->sock_state = &listener->sock_state; conn->listener = listener; kcp_conn_options_init(&accepted_options); conn->options = accepted_options; conn->update_interval_ms = accepted_options.interval_ms; conn->kcp = ikcp_create(conv, conn); if (conn->kcp != NULL) { int update_started = 0; ikcp_setoutput(conn->kcp, kcp_output_callback_impl); if (kcp_conn_apply_options_locked(conn, &accepted_options) == 0 && pthread_create(&conn->update_thread, NULL, kcp_update_thread_main, conn) == 0) { update_started = 1; } if (update_started && kcp_listener_add_session(listener, conv, conn) == 0) { conn->update_thread_started = 1; kcp_listener_enqueue_accept(listener, conn); } else { atomic_store(&conn->closed, 1); if (update_started) { pthread_join(conn->update_thread, NULL); } ikcp_release(conn->kcp); protocol_frame_decoder_destroy(&conn->decoder); pthread_cond_destroy(&conn->rx_cond); pthread_mutex_destroy(&conn->kcp_mu); pthread_mutex_destroy(&conn->close_mu); free(conn); conn = NULL; } } else { protocol_frame_decoder_destroy(&conn->decoder); pthread_cond_destroy(&conn->rx_cond); pthread_mutex_destroy(&conn->kcp_mu); pthread_mutex_destroy(&conn->close_mu); free(conn); conn = NULL; } } } if (conn != NULL && conn->kcp != NULL) { pthread_mutex_lock(&conn->kcp_mu); conn->kcp->current = omni_now_millis32(); if (ikcp_input(conn->kcp, (const char *) buffer, n) != 0) { kcp_conn_record_error(conn); } else { kcp_conn_record_input(conn, (int) n, segment_count); } pthread_mutex_unlock(&conn->kcp_mu); pthread_cond_broadcast(&conn->rx_cond); } } pthread_mutex_unlock(&listener->lock); free(segments); segments = NULL; segment_count = 0; } return NULL; } kcp_listener_t *kcp_listener_listen(const char *listen_addr, const char *bind_device, kcp_packet_debug_logger_t *packet_logger, const char *node_role, const char *node_id) { struct sockaddr_storage local_addr; socklen_t local_len; int fd = kcp_socket_open_bound(listen_addr, bind_device, &local_addr, &local_len); kcp_listener_t *listener; if (fd < 0) { return NULL; } listener = (kcp_listener_t *) calloc(1, sizeof(*listener)); if (listener == NULL) { close(fd); return NULL; } listener->fd = fd; pthread_mutex_init(&listener->lock, NULL); pthread_mutex_init(&listener->accept_mu, NULL); pthread_cond_init(&listener->accept_cond, NULL); if (kcp_socket_debug_init(&listener->sock_state, fd, packet_logger, node_role, node_id) != 0) { kcp_listener_free(listener); return NULL; } if (pthread_create(&listener->recv_thread, NULL, kcp_listener_recv_thread_main, listener) != 0) { kcp_listener_free(listener); return NULL; } listener->recv_thread_started = 1; return listener; } kcp_conn_t *kcp_listener_accept(kcp_listener_t *listener) { kcp_conn_t *conn; if (listener == NULL) { errno = EINVAL; return NULL; } pthread_mutex_lock(&listener->accept_mu); while (!listener->closed && listener->accept_head == NULL) { pthread_cond_wait(&listener->accept_cond, &listener->accept_mu); } if (listener->closed) { pthread_mutex_unlock(&listener->accept_mu); errno = ECANCELED; return NULL; } conn = listener->accept_head; listener->accept_head = conn->accept_next; if (listener->accept_head == NULL) { listener->accept_tail = NULL; } conn->accept_next = NULL; pthread_mutex_unlock(&listener->accept_mu); return conn; } int kcp_conn_configure_runtime(kcp_conn_t *conn, latency_logger_t *logger, const char *node_role, const char *node_id, kcp_session_stats_logger_t *stats_logger, int stats_interval_ms) { if (conn == NULL) { errno = EINVAL; return -1; } pthread_mutex_lock(&conn->close_mu); conn->logger = logger; if (node_role != NULL) { snprintf(conn->node_role, sizeof(conn->node_role), "%s", node_role); } if (node_id != NULL) { snprintf(conn->node_id, sizeof(conn->node_id), "%s", node_id); } conn->stats_logger = stats_logger; if (stats_interval_ms > 0) { conn->stats_interval_ms = stats_interval_ms; } else if (conn->stats_interval_ms <= 0) { conn->stats_interval_ms = KCP_DEFAULT_STATS_INTERVAL_MS; } if (kcp_conn_attach_process_sampler(conn) != 0) { pthread_mutex_unlock(&conn->close_mu); return -1; } pthread_mutex_unlock(&conn->close_mu); if (kcp_conn_start_stats_thread(conn) != 0) { pthread_mutex_lock(&conn->close_mu); kcp_conn_detach_process_sampler(conn); pthread_mutex_unlock(&conn->close_mu); return -1; } return 0; } int kcp_conn_apply_options(kcp_conn_t *conn, const kcp_conn_options_t *options) { int rc; if (conn == NULL || options == NULL) { errno = EINVAL; return -1; } pthread_mutex_lock(&conn->kcp_mu); rc = kcp_conn_apply_options_locked(conn, options); pthread_mutex_unlock(&conn->kcp_mu); return rc; } int kcp_conn_send(kcp_conn_t *conn, const message_t *msg) { uint8_t *frame = NULL; size_t frame_len = 0; int send_errno = 0; int kcp_send_rc = 0; if (conn == NULL || msg == NULL) { errno = EINVAL; return -1; } if (protocol_encode_message_stream(msg, &frame, &frame_len) != 0) { return -1; } latencylog_log_message_event(conn->logger, conn->node_role, conn->node_id, EVENT_SEND_HANDOFF_BEGIN, msg); kcp_log_session_snapshot(conn, "send_handoff_begin"); kcp_process_sampler_request_sample(conn->process_sampler, "send_handoff_begin"); pthread_mutex_lock(&conn->kcp_mu); atomic_store(&conn->sock_state->last_send_errno, 0); conn->kcp->current = omni_now_millis32(); kcp_send_rc = ikcp_send(conn->kcp, (const char *) frame, (int) frame_len); if (kcp_send_rc < 0) { pthread_mutex_unlock(&conn->kcp_mu); errno = kcp_send_rc == -2 ? EMSGSIZE : EINVAL; free(frame); return -1; } ikcp_flush(conn->kcp); send_errno = atomic_load(&conn->sock_state->last_send_errno); pthread_mutex_unlock(&conn->kcp_mu); if (send_errno != 0) { errno = send_errno; free(frame); return -1; } kcp_log_session_snapshot(conn, "send_handoff_end"); kcp_process_sampler_request_sample(conn->process_sampler, "send_handoff_end"); latencylog_log_message_event(conn->logger, conn->node_role, conn->node_id, EVENT_SEND_HANDOFF_END, msg); free(frame); return 0; } static void kcp_timespec_deadline_after_ms(struct timespec *deadline, int timeout_ms) { clock_gettime(CLOCK_REALTIME, deadline); deadline->tv_sec += timeout_ms / 1000; deadline->tv_nsec += (long) (timeout_ms % 1000) * 1000000L; if (deadline->tv_nsec >= 1000000000L) { deadline->tv_sec += 1; deadline->tv_nsec -= 1000000000L; } } int kcp_conn_receive_timed(kcp_conn_t *conn, message_t *out_msg, int timeout_ms) { uint8_t *frame = NULL; size_t frame_len = 0; char err[128]; int next_rc; struct timespec deadline; int use_deadline = timeout_ms > 0; if (conn == NULL || out_msg == NULL) { errno = EINVAL; return -1; } if (use_deadline) { kcp_timespec_deadline_after_ms(&deadline, timeout_ms); } for (;;) { next_rc = protocol_frame_decoder_next(&conn->decoder, &frame, &frame_len); if (next_rc < 0) { return -1; } if (next_rc == 1) { if (protocol_decode_message_stream_payload(frame, frame_len, out_msg, err, sizeof(err)) != 0) { free(frame); errno = EPROTO; return -1; } free(frame); kcp_log_session_snapshot(conn, "receive"); kcp_process_sampler_request_sample(conn->process_sampler, "receive"); return 0; } pthread_mutex_lock(&conn->kcp_mu); { int n = ikcp_recv(conn->kcp, (char *) conn->scratch, (int) sizeof(conn->scratch)); if (n > 0) { if (protocol_frame_decoder_feed(&conn->decoder, conn->scratch, (size_t) n) != 0) { pthread_mutex_unlock(&conn->kcp_mu); return -1; } pthread_mutex_unlock(&conn->kcp_mu); continue; } if (atomic_load(&conn->closed)) { pthread_mutex_unlock(&conn->kcp_mu); errno = ECANCELED; return -1; } if (timeout_ms == 0) { pthread_mutex_unlock(&conn->kcp_mu); return 1; } if (timeout_ms < 0) { pthread_cond_wait(&conn->rx_cond, &conn->kcp_mu); } else { int wait_rc = pthread_cond_timedwait(&conn->rx_cond, &conn->kcp_mu, &deadline); if (wait_rc == ETIMEDOUT) { pthread_mutex_unlock(&conn->kcp_mu); return 1; } if (wait_rc != 0) { pthread_mutex_unlock(&conn->kcp_mu); errno = wait_rc; return -1; } } } pthread_mutex_unlock(&conn->kcp_mu); } } int kcp_conn_receive(kcp_conn_t *conn, message_t *out_msg) { return kcp_conn_receive_timed(conn, out_msg, -1); } uint32_t kcp_conn_conv(const kcp_conn_t *conn) { return conn == NULL || conn->kcp == NULL ? 0 : conn->kcp->conv; } int kcp_conn_local_addr(const kcp_conn_t *conn, struct sockaddr_storage *addr, socklen_t *addr_len) { socklen_t len = sizeof(*addr); if (conn == NULL || addr == NULL || addr_len == NULL || conn->sock_state == NULL) { errno = EINVAL; return -1; } if (getsockname(conn->sock_state->fd, (struct sockaddr *) addr, &len) != 0) { return -1; } *addr_len = len; return 0; } int kcp_conn_remote_addr(const kcp_conn_t *conn, struct sockaddr_storage *addr, socklen_t *addr_len) { if (conn == NULL || addr == NULL || addr_len == NULL) { errno = EINVAL; return -1; } if (conn->remote_addr_len == 0) { errno = ENOTCONN; return -1; } return omni_clone_sockaddr((const struct sockaddr *) &conn->remote_addr, conn->remote_addr_len, addr, addr_len); } void kcp_conn_runtime_stats_snapshot(kcp_conn_t *conn, kcp_runtime_stats_t *out_stats) { if (out_stats == NULL) { return; } memset(out_stats, 0, sizeof(*out_stats)); if (conn == NULL) { return; } out_stats->connected = atomic_load(&conn->closed) ? 0 : 1; pthread_mutex_lock(&conn->kcp_mu); if (conn->kcp != NULL) { out_stats->conv = conn->kcp->conv; out_stats->rto_ms = conn->kcp->rx_rto; out_stats->srtt_ms = conn->kcp->rx_srtt; out_stats->srttvar_ms = conn->kcp->rx_rttval; out_stats->snd_wnd = conn->kcp->snd_wnd; out_stats->rmt_wnd = conn->kcp->rmt_wnd; out_stats->inflight = conn->kcp->snd_nxt - conn->kcp->snd_una; out_stats->window_limit = conn->kcp->snd_wnd < conn->kcp->rmt_wnd ? conn->kcp->snd_wnd : conn->kcp->rmt_wnd; out_stats->window_pressure_pct = out_stats->window_limit == 0 ? 0.0 : ((double) out_stats->inflight * 100.0) / (double) out_stats->window_limit; out_stats->snd_queue = conn->kcp->nsnd_que; out_stats->rcv_queue = conn->kcp->nrcv_que; out_stats->snd_buffer = conn->kcp->nsnd_buf; out_stats->out_segs_total = atomic_load_explicit(&conn->total_out_segs, memory_order_relaxed); out_stats->fast_retrans_total = conn->kcp->fast_retrans_total; out_stats->lost_total = conn->kcp->timeout_retrans_total; out_stats->retrans_total = out_stats->lost_total + out_stats->fast_retrans_total; out_stats->repeat_total = conn->kcp->duplicate_recv_total; out_stats->xmit_total = conn->kcp->xmit; } else { out_stats->connected = 0; } pthread_mutex_unlock(&conn->kcp_mu); } int kcp_conn_close(kcp_conn_t *conn) { if (conn == NULL) { return 0; } pthread_mutex_lock(&conn->close_mu); if (!atomic_load(&conn->closed)) { kcp_log_session_snapshot(conn, "close"); kcp_process_sampler_request_sample_and_wait(conn->process_sampler, "close"); pthread_mutex_lock(&conn->kcp_mu); atomic_store(&conn->closed, 1); if (conn->owns_socket && !conn->socket_closed) { /* Wake the blocking recv thread before closing the shared UDP socket. */ (void) shutdown(conn->fd, SHUT_RDWR); close(conn->fd); conn->socket_closed = 1; } pthread_cond_broadcast(&conn->rx_cond); pthread_mutex_unlock(&conn->kcp_mu); } pthread_mutex_unlock(&conn->close_mu); return 0; } void kcp_conn_free(kcp_conn_t *conn) { if (conn == NULL) { return; } kcp_conn_close(conn); if (conn->recv_thread_started) { pthread_join(conn->recv_thread, NULL); } if (conn->update_thread_started) { pthread_join(conn->update_thread, NULL); } if (conn->stats_thread_started) { pthread_join(conn->stats_thread, NULL); } if (conn->listener != NULL && !conn->listener->closed) { kcp_listener_remove_session(conn->listener, conn); } kcp_conn_detach_process_sampler(conn); if (conn->owns_socket && conn->sock_state != NULL) { if (!conn->socket_closed) { close(conn->fd); conn->socket_closed = 1; } kcp_socket_debug_destroy(conn->sock_state); free(conn->sock_state); } if (conn->kcp != NULL) { ikcp_release(conn->kcp); } protocol_frame_decoder_destroy(&conn->decoder); pthread_cond_destroy(&conn->rx_cond); pthread_mutex_destroy(&conn->kcp_mu); pthread_mutex_destroy(&conn->close_mu); free(conn); } int kcp_listener_close(kcp_listener_t *listener) { if (listener == NULL) { return 0; } if (!listener->closed) { listener->closed = 1; close(listener->fd); pthread_cond_broadcast(&listener->accept_cond); } return 0; } void kcp_listener_free(kcp_listener_t *listener) { kcp_session_entry_t *entry; kcp_session_entry_t *next; if (listener == NULL) { return; } kcp_listener_close(listener); if (listener->recv_thread_started) { pthread_join(listener->recv_thread, NULL); } for (entry = listener->sessions; entry != NULL; entry = next) { next = entry->next; entry->conn->listener = NULL; kcp_conn_free(entry->conn); free(entry); } kcp_socket_debug_destroy(&listener->sock_state); pthread_mutex_destroy(&listener->lock); pthread_mutex_destroy(&listener->accept_mu); pthread_cond_destroy(&listener->accept_cond); free(listener); } int kcp_session_stats_parse_interval_ms(const char *raw, int *out_ms) { return omni_parse_duration_ms(raw, KCP_DEFAULT_STATS_INTERVAL_MS, out_ms); }