#include "peer_kcp_client.h" #include #include #include #include #define KCP_CLIENT_REGISTER_TIMEOUT_MS 3000 #define KCP_CLIENT_CTRL_REGISTER_OK "{\"type\":\"server_register_ok\"}" #define KCP_CLIENT_CTRL_PEER_REPLACED "{\"type\":\"server_peer_replaced\",\"reason\":\"new_instance_wins\"}" #define KCP_CLIENT_CTRL_HEARTBEAT "{\"type\":\"server_heartbeat\"}" #define KCP_CLIENT_CTRL_HEARTBEAT_ACK "{\"type\":\"server_heartbeat_ack\"}" struct kcp_client { char id[OMNI_MAX_PEER_ID]; char server_addr[OMNI_MAX_ADDR_TEXT]; kcp_conn_t *conn; latency_logger_t *logger; pthread_mutex_t state_mu; uint64_t next_message_id; int registered; uint32_t last_server_activity_ms; char last_server_error[256]; }; static int kcp_client_next_message_id(kcp_client_t *client, uint64_t *out_id) { if (client == NULL || out_id == NULL) { errno = EINVAL; return -1; } pthread_mutex_lock(&client->state_mu); *out_id = ++client->next_message_id; pthread_mutex_unlock(&client->state_mu); return 0; } static void kcp_client_set_registered(kcp_client_t *client, int registered) { if (client == NULL) { return; } pthread_mutex_lock(&client->state_mu); client->registered = registered != 0; pthread_mutex_unlock(&client->state_mu); } static void kcp_client_touch_server_activity(kcp_client_t *client) { if (client == NULL) { return; } pthread_mutex_lock(&client->state_mu); client->last_server_activity_ms = omni_now_millis32(); pthread_mutex_unlock(&client->state_mu); } static void kcp_client_set_last_server_error(kcp_client_t *client, const char *message) { if (client == NULL) { return; } pthread_mutex_lock(&client->state_mu); snprintf(client->last_server_error, sizeof(client->last_server_error), "%s", message == NULL ? "" : message); pthread_mutex_unlock(&client->state_mu); } static void kcp_client_clear_last_server_error(kcp_client_t *client) { kcp_client_set_last_server_error(client, ""); } static int kcp_client_server_error_invalidates_registration(const char *message) { if (message == NULL || message[0] == '\0') { return 0; } return strstr(message, "not registered") != NULL || strstr(message, "first message must be register") != NULL || strstr(message, "peer replaced") != NULL || strstr(message, "timed out waiting for server_register_ok") != NULL; } static int kcp_client_is_registered(kcp_client_t *client) { int registered; if (client == NULL) { return 0; } pthread_mutex_lock(&client->state_mu); registered = client->registered; pthread_mutex_unlock(&client->state_mu); return registered; } static int kcp_client_text_body_equals(const message_t *msg, const char *payload) { size_t expected_len; if (msg == NULL || payload == NULL || msg->body == NULL) { return 0; } expected_len = strlen(payload); return msg->body_len == expected_len && memcmp(msg->body, payload, expected_len) == 0; } static void kcp_client_copy_server_error_body(const message_t *msg, char *buffer, size_t buffer_len) { size_t copy_len; if (buffer == NULL || buffer_len == 0) { return; } buffer[0] = '\0'; if (msg == NULL || msg->body == NULL || msg->body_len == 0) { return; } copy_len = msg->body_len < (buffer_len - 1U) ? msg->body_len : (buffer_len - 1U); memcpy(buffer, msg->body, copy_len); buffer[copy_len] = '\0'; } static int kcp_client_registration_errno_from_message(const char *message) { if (message == NULL || message[0] == '\0') { return ECONNREFUSED; } if (strstr(message, "duplicate peer id") != NULL) { return EEXIST; } if (strstr(message, "first message must be register") != NULL) { return EPROTO; } return ECONNREFUSED; } static int kcp_client_send_text_internal(kcp_client_t *client, const char *to, const char *text, int log_business_event) { message_t msg; uint64_t id; if (client == NULL || to == NULL || text == NULL || client->conn == NULL) { errno = EINVAL; return -1; } protocol_message_init(&msg); if (kcp_client_next_message_id(client, &id) != 0) { return -1; } msg.type = MSG_TYPE_TEXT; msg.id = id; snprintf(msg.from, sizeof(msg.from), "%s", client->id); snprintf(msg.to, sizeof(msg.to), "%s", to); msg.body = (uint8_t *) omni_strdup(text); if (msg.body == NULL) { return -1; } msg.body_len = strlen((const char *) msg.body); if (log_business_event) { latencylog_log_message_event(client->logger, OMNI_NODE_ROLE_PEER, client->id, EVENT_A_APP_PREP_BEGIN, &msg); } if (kcp_conn_send(client->conn, &msg) != 0) { protocol_message_clear(&msg); return -1; } protocol_message_clear(&msg); return 0; } static int kcp_client_send_business_preflight(kcp_client_t *client) { if (client == NULL || client->conn == NULL) { errno = ENOTCONN; return -1; } if (!kcp_client_is_registered(client)) { errno = ENOTCONN; return -1; } return 0; } static int kcp_client_handle_reserved_server_message(kcp_client_t *client, const message_t *msg) { if (client == NULL || msg == NULL) { errno = EINVAL; return -1; } if (msg->type != MSG_TYPE_TEXT || strcmp(msg->from, SERVER_PEER_ID) != 0) { return 0; } kcp_client_touch_server_activity(client); if (kcp_client_text_body_equals(msg, KCP_CLIENT_CTRL_REGISTER_OK)) { kcp_client_set_registered(client, 1); kcp_client_clear_last_server_error(client); return 1; } if (kcp_client_text_body_equals(msg, KCP_CLIENT_CTRL_HEARTBEAT)) { if (kcp_client_send_text_internal(client, SERVER_PEER_ID, KCP_CLIENT_CTRL_HEARTBEAT_ACK, 0) != 0) { kcp_client_set_registered(client, 0); kcp_client_set_last_server_error(client, "failed to acknowledge server heartbeat"); (void) kcp_conn_close(client->conn); return -1; } return 1; } if (kcp_client_text_body_equals(msg, KCP_CLIENT_CTRL_HEARTBEAT_ACK)) { return 1; } if (kcp_client_text_body_equals(msg, KCP_CLIENT_CTRL_PEER_REPLACED)) { kcp_client_set_registered(client, 0); kcp_client_set_last_server_error(client, "server peer replaced this session"); (void) kcp_conn_close(client->conn); errno = ECONNRESET; return -1; } return 0; } static int kcp_client_remaining_timeout_ms(int original_timeout_ms, uint32_t start_ms) { uint32_t elapsed_ms; if (original_timeout_ms < 0) { return -1; } elapsed_ms = omni_now_millis32() - start_ms; if (elapsed_ms >= (uint32_t) original_timeout_ms) { return 0; } return original_timeout_ms - (int) elapsed_ms; } static int kcp_client_wait_for_register_ok(kcp_client_t *client) { uint32_t start_ms; if (client == NULL || client->conn == NULL) { errno = EINVAL; return -1; } start_ms = omni_now_millis32(); for (;;) { message_t msg; int rc; int remaining_timeout_ms = kcp_client_remaining_timeout_ms(KCP_CLIENT_REGISTER_TIMEOUT_MS, start_ms); if (remaining_timeout_ms <= 0) { kcp_client_set_registered(client, 0); kcp_client_set_last_server_error(client, "timed out waiting for server_register_ok"); (void) kcp_conn_close(client->conn); errno = ETIMEDOUT; return -1; } protocol_message_init(&msg); rc = kcp_conn_receive_timed(client->conn, &msg, remaining_timeout_ms); if (rc == 1) { protocol_message_clear(&msg); kcp_client_set_registered(client, 0); kcp_client_set_last_server_error(client, "timed out waiting for server_register_ok"); (void) kcp_conn_close(client->conn); errno = ETIMEDOUT; return -1; } if (rc != 0) { protocol_message_clear(&msg); kcp_client_set_registered(client, 0); return -1; } if (msg.type == MSG_TYPE_ERROR && strcmp(msg.from, SERVER_PEER_ID) == 0) { char error_text[256]; kcp_client_copy_server_error_body(&msg, error_text, sizeof(error_text)); kcp_client_touch_server_activity(client); kcp_client_set_registered(client, 0); kcp_client_set_last_server_error(client, error_text); protocol_message_clear(&msg); (void) kcp_conn_close(client->conn); errno = kcp_client_registration_errno_from_message(error_text); return -1; } rc = kcp_client_handle_reserved_server_message(client, &msg); protocol_message_clear(&msg); if (rc < 0) { return -1; } if (rc > 0 && kcp_client_is_registered(client)) { return 0; } kcp_client_set_registered(client, 0); kcp_client_set_last_server_error(client, "unexpected message while waiting for server_register_ok"); (void) kcp_conn_close(client->conn); errno = EPROTO; return -1; } } static int kcp_client_receive_business_timed(kcp_client_t *client, message_t *out_msg, int timeout_ms) { uint32_t start_ms; if (client == NULL || out_msg == NULL || client->conn == NULL) { errno = EINVAL; return -1; } start_ms = omni_now_millis32(); protocol_message_init(out_msg); for (;;) { int rc; int reserved_rc; int effective_timeout_ms = timeout_ms < 0 ? -1 : kcp_client_remaining_timeout_ms(timeout_ms, start_ms); if (timeout_ms >= 0 && effective_timeout_ms <= 0) { return 1; } protocol_message_clear(out_msg); rc = kcp_conn_receive_timed(client->conn, out_msg, effective_timeout_ms); if (rc != 0) { if (rc != 1) { kcp_client_set_registered(client, 0); } return rc; } if (strcmp(out_msg->from, SERVER_PEER_ID) == 0) { kcp_client_touch_server_activity(client); } reserved_rc = kcp_client_handle_reserved_server_message(client, out_msg); if (reserved_rc < 0) { protocol_message_clear(out_msg); return -1; } if (reserved_rc > 0) { protocol_message_clear(out_msg); continue; } if (out_msg->type == MSG_TYPE_ERROR && strcmp(out_msg->from, SERVER_PEER_ID) == 0) { char error_text[256]; kcp_client_copy_server_error_body(out_msg, error_text, sizeof(error_text)); kcp_client_set_last_server_error(client, error_text); if (kcp_client_server_error_invalidates_registration(error_text)) { kcp_client_set_registered(client, 0); } } latencylog_log_message_event(client->logger, OMNI_NODE_ROLE_PEER, client->id, EVENT_B_APP_RECV, out_msg); return 0; } } static int kcp_client_persist_message_to_disk(const message_t *msg, const char *inbox_dir, char *out_path, size_t out_path_len) { char path[512]; if (omni_ensure_dir(inbox_dir) != 0) { return -1; } if (msg->type == MSG_TYPE_TEXT) { char *body = omni_json_escape_bytes(msg->body, msg->body_len); char *from = omni_json_escape(msg->from); char *to = omni_json_escape(msg->to); char *line; if (body == NULL || from == NULL || to == NULL) { free(body); free(from); free(to); return -1; } snprintf(path, sizeof(path), "%s/messages.log", inbox_dir); line = omni_strdup_printf( "{\"message_type\":\"%s\",\"message_id\":%" PRIu64 ",\"from\":\"%s\",\"to\":\"%s\",\"body\":\"%s\"}\n", protocol_message_type_name(msg->type), msg->id, from, to, body ); free(body); free(from); free(to); if (line == NULL) { return -1; } if (omni_append_file(path, (const uint8_t *) line, strlen(line)) != 0) { free(line); return -1; } free(line); } else if (msg->type == MSG_TYPE_FILE) { const char *file_name = omni_path_base_name(msg->file_name); if (file_name[0] == '\0') { file_name = "unnamed"; } snprintf(path, sizeof(path), "%s/%s-%" PRIu64 "-%s", inbox_dir, msg->from, msg->id, file_name); if (omni_write_file(path, msg->body, msg->body_len) != 0) { return -1; } } else if (msg->type == MSG_TYPE_BINARY) { snprintf(path, sizeof(path), "%s/%s-%" PRIu64 ".bin", inbox_dir, msg->from, msg->id); if (omni_write_file(path, msg->body, msg->body_len) != 0) { return -1; } } else { errno = EINVAL; return -1; } if (out_path != NULL && out_path_len > 0) { snprintf(out_path, out_path_len, "%s", path); } return 0; } static void kcp_client_fill_recv_meta(kcp_client_recv_meta_t *meta, const message_t *msg) { if (meta == NULL || msg == NULL) { return; } memset(meta, 0, sizeof(*meta)); meta->type = msg->type; meta->id = msg->id; meta->body_len = msg->body_len; snprintf(meta->from, sizeof(meta->from), "%s", msg->from); snprintf(meta->to, sizeof(meta->to), "%s", msg->to); snprintf(meta->file_name, sizeof(meta->file_name), "%s", msg->file_name); } kcp_client_t *kcp_client_dial_with_options(const char *server_addr, const char *dial_addr, const char *peer_id, const char *bind_ip, const char *bind_device, const kcp_conn_options_t *options, latency_logger_t *logger, kcp_packet_debug_logger_t *packet_logger, kcp_session_stats_logger_t *stats_logger, int stats_interval_ms) { kcp_client_t *client; const char *actual_dial_addr = (dial_addr != NULL && dial_addr[0] != '\0') ? dial_addr : server_addr; message_t register_msg; int saved_errno = 0; client = (kcp_client_t *) calloc(1, sizeof(*client)); if (client == NULL) { return NULL; } snprintf(client->id, sizeof(client->id), "%s", peer_id); snprintf(client->server_addr, sizeof(client->server_addr), "%s", server_addr == NULL ? "" : server_addr); pthread_mutex_init(&client->state_mu, NULL); client->last_server_activity_ms = omni_now_millis32(); client->logger = logger; client->conn = kcp_conn_dial_with_options(actual_dial_addr, bind_ip, bind_device, options, packet_logger, logger, OMNI_NODE_ROLE_PEER, peer_id, stats_logger, stats_interval_ms); if (client->conn == NULL) { saved_errno = errno; kcp_client_free(client); errno = saved_errno; return NULL; } protocol_message_init(®ister_msg); register_msg.type = MSG_TYPE_REGISTER; register_msg.id = 0; snprintf(register_msg.from, sizeof(register_msg.from), "%s", peer_id); snprintf(register_msg.to, sizeof(register_msg.to), "%s", SERVER_PEER_ID); if (kcp_conn_send(client->conn, ®ister_msg) != 0) { saved_errno = errno; kcp_client_free(client); errno = saved_errno; return NULL; } if (kcp_client_wait_for_register_ok(client) != 0) { saved_errno = errno; kcp_client_free(client); errno = saved_errno; return NULL; } return client; } kcp_client_t *kcp_client_dial(const char *server_addr, const char *dial_addr, const char *peer_id, const char *bind_ip, const char *bind_device, latency_logger_t *logger, kcp_packet_debug_logger_t *packet_logger, kcp_session_stats_logger_t *stats_logger, int stats_interval_ms) { return kcp_client_dial_with_options(server_addr, dial_addr, peer_id, bind_ip, bind_device, NULL, logger, packet_logger, stats_logger, stats_interval_ms); } const char *kcp_client_id(const kcp_client_t *client) { return client == NULL ? "" : client->id; } int kcp_client_send_text(kcp_client_t *client, const char *to, const char *text) { if (client == NULL || to == NULL || text == NULL) { errno = EINVAL; return -1; } if (kcp_client_send_business_preflight(client) != 0) { return -1; } return kcp_client_send_text_internal(client, to, text, 1); } int kcp_client_send_binary(kcp_client_t *client, const char *to, const void *data, size_t data_len) { message_t msg; uint64_t id; if (client == NULL || to == NULL || (data == NULL && data_len > 0)) { errno = EINVAL; return -1; } if (kcp_client_send_business_preflight(client) != 0) { return -1; } protocol_message_init(&msg); if (kcp_client_next_message_id(client, &id) != 0) { return -1; } msg.type = MSG_TYPE_BINARY; msg.id = id; snprintf(msg.from, sizeof(msg.from), "%s", client->id); snprintf(msg.to, sizeof(msg.to), "%s", to); if (data_len > 0) { msg.body = (uint8_t *) malloc(data_len); if (msg.body == NULL) { return -1; } memcpy(msg.body, data, data_len); } msg.body_len = data_len; latencylog_log_message_event(client->logger, OMNI_NODE_ROLE_PEER, client->id, EVENT_A_APP_PREP_BEGIN, &msg); if (kcp_conn_send(client->conn, &msg) != 0) { protocol_message_clear(&msg); return -1; } protocol_message_clear(&msg); return 0; } int kcp_client_send_file_path(kcp_client_t *client, const char *to, const char *path) { message_t msg; uint64_t id; uint8_t *body = NULL; size_t body_len = 0; const char *base_name = strrchr(path, '/'); if (client == NULL || to == NULL || path == NULL) { errno = EINVAL; return -1; } if (kcp_client_send_business_preflight(client) != 0) { return -1; } if (omni_read_file(path, &body, &body_len) != 0) { return -1; } protocol_message_init(&msg); if (kcp_client_next_message_id(client, &id) != 0) { free(body); return -1; } msg.type = MSG_TYPE_FILE; msg.id = id; snprintf(msg.from, sizeof(msg.from), "%s", client->id); snprintf(msg.to, sizeof(msg.to), "%s", to); snprintf(msg.file_name, sizeof(msg.file_name), "%s", base_name == NULL ? path : base_name + 1); msg.body = body; msg.body_len = body_len; latencylog_log_message_event(client->logger, OMNI_NODE_ROLE_PEER, client->id, EVENT_A_APP_PREP_BEGIN, &msg); if (kcp_conn_send(client->conn, &msg) != 0) { protocol_message_clear(&msg); return -1; } protocol_message_clear(&msg); return 0; } int kcp_client_receive_timed(kcp_client_t *client, message_t *out_msg, int timeout_ms) { if (client == NULL || out_msg == NULL) { errno = EINVAL; return -1; } return kcp_client_receive_business_timed(client, out_msg, timeout_ms); } int kcp_client_receive(kcp_client_t *client, message_t *out_msg) { if (kcp_client_receive_timed(client, out_msg, -1) != 0) { return -1; } return 0; } int kcp_client_receive_binary_into(kcp_client_t *client, void *buffer, size_t buffer_len, kcp_client_recv_meta_t *out_meta, int timeout_ms) { message_t msg; int rc; if (client == NULL || (buffer == NULL && buffer_len > 0) || out_meta == NULL) { errno = EINVAL; return -1; } protocol_message_init(&msg); rc = kcp_client_receive_timed(client, &msg, timeout_ms); if (rc != 0) { protocol_message_clear(&msg); return rc; } kcp_client_fill_recv_meta(out_meta, &msg); if (msg.body_len > buffer_len) { protocol_message_clear(&msg); errno = EMSGSIZE; return 2; } if (msg.body_len > 0) { memcpy(buffer, msg.body, msg.body_len); } protocol_message_clear(&msg); return 0; } int kcp_client_persist_message(kcp_client_t *client, const message_t *msg, const char *inbox_dir, char *out_path, size_t out_path_len) { if (!latencylog_is_business_message(msg)) { errno = EINVAL; return -1; } latencylog_log_message_event(client->logger, OMNI_NODE_ROLE_PEER, client->id, EVENT_B_PERSIST_BEGIN, msg); if (kcp_client_persist_message_to_disk(msg, inbox_dir, out_path, out_path_len) != 0) { return -1; } latencylog_log_message_event(client->logger, OMNI_NODE_ROLE_PEER, client->id, EVENT_B_PERSIST_END, msg); return 0; } void kcp_client_state_snapshot(kcp_client_t *client, kcp_client_state_t *out_state) { kcp_runtime_stats_t runtime_stats; if (out_state == NULL) { return; } memset(out_state, 0, sizeof(*out_state)); if (client == NULL) { return; } memset(&runtime_stats, 0, sizeof(runtime_stats)); if (client->conn != NULL) { kcp_conn_runtime_stats_snapshot(client->conn, &runtime_stats); out_state->connected = runtime_stats.connected; } pthread_mutex_lock(&client->state_mu); out_state->registered = client->registered; out_state->server_idle_ms = client->last_server_activity_ms == 0 ? 0 : (omni_now_millis32() - client->last_server_activity_ms); snprintf(out_state->last_server_error, sizeof(out_state->last_server_error), "%s", client->last_server_error); pthread_mutex_unlock(&client->state_mu); } void kcp_client_runtime_stats_snapshot(kcp_client_t *client, kcp_runtime_stats_t *out_stats) { if (out_stats == NULL) { return; } memset(out_stats, 0, sizeof(*out_stats)); if (client == NULL || client->conn == NULL) { return; } kcp_conn_runtime_stats_snapshot(client->conn, out_stats); } int kcp_client_close(kcp_client_t *client) { if (client == NULL) { return 0; } kcp_client_set_registered(client, 0); return kcp_conn_close(client->conn); } void kcp_client_free(kcp_client_t *client) { if (client == NULL) { return; } kcp_conn_free(client->conn); pthread_mutex_destroy(&client->state_mu); free(client); }