Files
OmniSocketGo/python/omnisocket/omnisocket_client.c
2026-04-18 12:52:39 +08:00

573 lines
16 KiB
C

#include "omnisocket_client.h"
static void omnisocket_session_sync_client_state_locked(omnisocket_session_t *session, kcp_client_t *client) {
kcp_client_state_t client_state;
if (session == NULL) {
return;
}
memset(&client_state, 0, sizeof(client_state));
if (client != NULL) {
kcp_client_state_snapshot(client, &client_state);
}
session->stats.connected = client_state.connected;
session->stats.registered = client_state.registered;
snprintf(
session->stats.last_server_error,
sizeof(session->stats.last_server_error),
"%s",
client_state.last_server_error
);
}
static void omnisocket_session_mark_disconnected_locked(omnisocket_session_t *session) {
if (session == NULL) {
return;
}
session->stats.connected = 0;
session->stats.registered = 0;
}
int omnisocket_session_init(omnisocket_session_t *session) {
int rc;
if (session == NULL) {
errno = EINVAL;
return -1;
}
memset(session, 0, sizeof(*session));
rc = pthread_mutex_init(&session->mutex, NULL);
if (rc != 0) {
errno = rc;
return -1;
}
rc = pthread_cond_init(&session->idle_cond, NULL);
if (rc != 0) {
pthread_mutex_destroy(&session->mutex);
errno = rc;
return -1;
}
return 0;
}
void omnisocket_session_destroy(omnisocket_session_t *session) {
if (session == NULL) {
return;
}
(void) omnisocket_session_close(session);
pthread_cond_destroy(&session->idle_cond);
pthread_mutex_destroy(&session->mutex);
}
static int omnisocket_session_begin_client_op(omnisocket_session_t *session, kcp_client_t **out_client) {
if (session == NULL || out_client == NULL) {
errno = EINVAL;
return -1;
}
pthread_mutex_lock(&session->mutex);
if (session->closing) {
pthread_mutex_unlock(&session->mutex);
errno = ECANCELED;
return -1;
}
if (session->client == NULL) {
pthread_mutex_unlock(&session->mutex);
errno = ENOTCONN;
return -1;
}
*out_client = session->client;
session->active_ops += 1;
pthread_mutex_unlock(&session->mutex);
return 0;
}
int omnisocket_session_connect(
omnisocket_session_t *session,
const char *server_addr,
const char *relay_via,
const char *peer_id,
const char *bind_ip,
const char *bind_device,
const kcp_conn_options_t *options,
int stats_interval_ms
) {
kcp_client_t *client;
if (session == NULL || server_addr == NULL || peer_id == NULL) {
errno = EINVAL;
return -1;
}
pthread_mutex_lock(&session->mutex);
while (session->closing) {
pthread_cond_wait(&session->idle_cond, &session->mutex);
}
if (session->client != NULL) {
pthread_mutex_unlock(&session->mutex);
errno = EISCONN;
return -1;
}
client = kcp_client_dial_with_options(
server_addr,
relay_via,
peer_id,
bind_ip,
bind_device,
options,
NULL,
NULL,
NULL,
stats_interval_ms
);
if (client == NULL) {
pthread_mutex_unlock(&session->mutex);
return -1;
}
session->client = client;
omnisocket_session_sync_client_state_locked(session, client);
pthread_mutex_unlock(&session->mutex);
return 0;
}
int omnisocket_session_close(omnisocket_session_t *session) {
kcp_client_t *client;
if (session == NULL) {
errno = EINVAL;
return -1;
}
pthread_mutex_lock(&session->mutex);
while (session->closing) {
pthread_cond_wait(&session->idle_cond, &session->mutex);
}
client = session->client;
if (client != NULL) {
session->closing = 1;
session->client = NULL;
}
omnisocket_session_mark_disconnected_locked(session);
pthread_mutex_unlock(&session->mutex);
if (client != NULL) {
kcp_client_close(client);
pthread_mutex_lock(&session->mutex);
while (session->active_ops > 0) {
pthread_cond_wait(&session->idle_cond, &session->mutex);
}
pthread_mutex_unlock(&session->mutex);
kcp_client_free(client);
pthread_mutex_lock(&session->mutex);
session->closing = 0;
pthread_cond_broadcast(&session->idle_cond);
pthread_mutex_unlock(&session->mutex);
}
return 0;
}
int omnisocket_session_send(omnisocket_session_t *session, const char *to, const void *data, size_t data_len) {
return omnisocket_session_send_with_id(session, to, data, data_len, NULL);
}
int omnisocket_session_send_with_id(
omnisocket_session_t *session,
const char *to,
const void *data,
size_t data_len,
uint64_t *out_message_id
) {
kcp_client_t *client;
int rc;
if (session == NULL || to == NULL || (data == NULL && data_len > 0)) {
errno = EINVAL;
return -1;
}
if (omnisocket_session_begin_client_op(session, &client) != 0) {
return -1;
}
rc = kcp_client_send_binary_with_id(client, to, data, data_len, out_message_id);
pthread_mutex_lock(&session->mutex);
if (rc == 0) {
session->stats.send_calls += 1;
session->stats.send_bytes += (uint64_t) data_len;
} else {
session->stats.send_errors += 1;
}
omnisocket_session_sync_client_state_locked(session, client);
if (session->active_ops > 0) {
session->active_ops -= 1;
}
if (session->closing && session->active_ops == 0) {
pthread_cond_broadcast(&session->idle_cond);
}
pthread_mutex_unlock(&session->mutex);
return rc;
}
int omnisocket_session_recv(omnisocket_session_t *session, message_t *out_msg, int timeout_ms) {
kcp_client_t *client;
int rc;
if (session == NULL || out_msg == NULL) {
errno = EINVAL;
return -1;
}
if (omnisocket_session_begin_client_op(session, &client) != 0) {
return -1;
}
rc = kcp_client_receive_timed(client, out_msg, timeout_ms);
pthread_mutex_lock(&session->mutex);
if (rc == 0) {
session->stats.recv_calls += 1;
session->stats.recv_bytes += (uint64_t) out_msg->body_len;
} else if (rc == 1) {
session->stats.recv_timeouts += 1;
} else {
session->stats.recv_errors += 1;
}
omnisocket_session_sync_client_state_locked(session, client);
if (session->active_ops > 0) {
session->active_ops -= 1;
}
if (session->closing && session->active_ops == 0) {
pthread_cond_broadcast(&session->idle_cond);
}
pthread_mutex_unlock(&session->mutex);
return rc;
}
int omnisocket_session_recv_into(
omnisocket_session_t *session,
void *buffer,
size_t buffer_len,
kcp_client_recv_meta_t *out_meta,
int timeout_ms
) {
kcp_client_t *client;
int rc;
if (session == NULL || out_meta == NULL || (buffer == NULL && buffer_len > 0)) {
errno = EINVAL;
return -1;
}
if (omnisocket_session_begin_client_op(session, &client) != 0) {
return -1;
}
rc = kcp_client_receive_binary_into(client, buffer, buffer_len, out_meta, timeout_ms);
pthread_mutex_lock(&session->mutex);
if (rc == 0) {
session->stats.recv_calls += 1;
session->stats.recv_bytes += (uint64_t) out_meta->body_len;
} else if (rc == 1) {
session->stats.recv_timeouts += 1;
} else {
session->stats.recv_errors += 1;
}
omnisocket_session_sync_client_state_locked(session, client);
if (session->active_ops > 0) {
session->active_ops -= 1;
}
if (session->closing && session->active_ops == 0) {
pthread_cond_broadcast(&session->idle_cond);
}
pthread_mutex_unlock(&session->mutex);
return rc;
}
void omnisocket_session_stats_snapshot(omnisocket_session_t *session, omnisocket_session_stats_t *out_stats) {
if (session == NULL || out_stats == NULL) {
return;
}
pthread_mutex_lock(&session->mutex);
*out_stats = session->stats;
pthread_mutex_unlock(&session->mutex);
}
void omnisocket_session_kcp_stats_snapshot(omnisocket_session_t *session, omnisocket_session_kcp_stats_t *out_stats) {
kcp_runtime_stats_t runtime_stats;
if (session == NULL || out_stats == NULL) {
return;
}
memset(&runtime_stats, 0, sizeof(runtime_stats));
pthread_mutex_lock(&session->mutex);
if (session->client != NULL) {
kcp_client_runtime_stats_snapshot(session->client, &runtime_stats);
}
pthread_mutex_unlock(&session->mutex);
memset(out_stats, 0, sizeof(*out_stats));
out_stats->connected = runtime_stats.connected;
out_stats->conv = runtime_stats.conv;
out_stats->rto_ms = runtime_stats.rto_ms;
out_stats->srtt_ms = runtime_stats.srtt_ms;
out_stats->min_srtt_ms = runtime_stats.min_srtt_ms;
out_stats->srttvar_ms = runtime_stats.srttvar_ms;
out_stats->last_feedback_age_ms = runtime_stats.last_feedback_age_ms;
out_stats->snd_wnd = runtime_stats.snd_wnd;
out_stats->rmt_wnd = runtime_stats.rmt_wnd;
out_stats->inflight = runtime_stats.inflight;
out_stats->window_limit = runtime_stats.window_limit;
out_stats->window_pressure_pct = runtime_stats.window_pressure_pct;
out_stats->snd_queue = runtime_stats.snd_queue;
out_stats->rcv_queue = runtime_stats.rcv_queue;
out_stats->snd_buffer = runtime_stats.snd_buffer;
out_stats->out_segs_total = runtime_stats.out_segs_total;
out_stats->retrans_total = runtime_stats.retrans_total;
out_stats->fast_retrans_total = runtime_stats.fast_retrans_total;
out_stats->lost_total = runtime_stats.lost_total;
out_stats->repeat_total = runtime_stats.repeat_total;
out_stats->xmit_total = runtime_stats.xmit_total;
}
int omnisocket_udp_session_init(omnisocket_udp_session_t *session) {
int rc;
if (session == NULL) {
errno = EINVAL;
return -1;
}
memset(session, 0, sizeof(*session));
rc = pthread_mutex_init(&session->mutex, NULL);
if (rc != 0) {
errno = rc;
return -1;
}
rc = pthread_cond_init(&session->idle_cond, NULL);
if (rc != 0) {
pthread_mutex_destroy(&session->mutex);
errno = rc;
return -1;
}
return 0;
}
void omnisocket_udp_session_destroy(omnisocket_udp_session_t *session) {
if (session == NULL) {
return;
}
(void) omnisocket_udp_session_close(session);
pthread_cond_destroy(&session->idle_cond);
pthread_mutex_destroy(&session->mutex);
}
static int omnisocket_udp_session_begin_client_op(omnisocket_udp_session_t *session, udp_client_t **out_client) {
if (session == NULL || out_client == NULL) {
errno = EINVAL;
return -1;
}
pthread_mutex_lock(&session->mutex);
if (session->closing) {
pthread_mutex_unlock(&session->mutex);
errno = ECANCELED;
return -1;
}
if (session->client == NULL) {
pthread_mutex_unlock(&session->mutex);
errno = ENOTCONN;
return -1;
}
*out_client = session->client;
session->active_ops += 1;
pthread_mutex_unlock(&session->mutex);
return 0;
}
int omnisocket_udp_session_connect(
omnisocket_udp_session_t *session,
const char *server_addr,
const char *peer_id,
const char *bind_ip,
const char *bind_device,
int enable_timestamping
) {
udp_client_t *client;
if (session == NULL || server_addr == NULL || peer_id == NULL) {
errno = EINVAL;
return -1;
}
pthread_mutex_lock(&session->mutex);
while (session->closing) {
pthread_cond_wait(&session->idle_cond, &session->mutex);
}
if (session->client != NULL) {
pthread_mutex_unlock(&session->mutex);
errno = EISCONN;
return -1;
}
client = udp_client_dial_with_options(
server_addr,
peer_id,
bind_ip,
bind_device,
NULL,
NULL,
enable_timestamping
);
if (client == NULL) {
pthread_mutex_unlock(&session->mutex);
return -1;
}
session->client = client;
session->stats.connected = 1;
session->stats.registered = 1;
session->stats.last_server_error[0] = '\0';
pthread_mutex_unlock(&session->mutex);
return 0;
}
int omnisocket_udp_session_close(omnisocket_udp_session_t *session) {
udp_client_t *client;
if (session == NULL) {
errno = EINVAL;
return -1;
}
pthread_mutex_lock(&session->mutex);
while (session->closing) {
pthread_cond_wait(&session->idle_cond, &session->mutex);
}
client = session->client;
if (client != NULL) {
session->closing = 1;
session->client = NULL;
}
session->stats.connected = 0;
session->stats.registered = 0;
pthread_mutex_unlock(&session->mutex);
if (client != NULL) {
udp_client_close(client);
pthread_mutex_lock(&session->mutex);
while (session->active_ops > 0) {
pthread_cond_wait(&session->idle_cond, &session->mutex);
}
pthread_mutex_unlock(&session->mutex);
udp_client_free(client);
pthread_mutex_lock(&session->mutex);
session->closing = 0;
pthread_cond_broadcast(&session->idle_cond);
pthread_mutex_unlock(&session->mutex);
}
return 0;
}
int omnisocket_udp_session_send(omnisocket_udp_session_t *session, const char *to, const void *data, size_t data_len) {
udp_client_t *client;
int rc;
if (session == NULL || to == NULL || (data == NULL && data_len > 0)) {
errno = EINVAL;
return -1;
}
if (omnisocket_udp_session_begin_client_op(session, &client) != 0) {
return -1;
}
rc = udp_client_send_binary(client, to, data, data_len);
pthread_mutex_lock(&session->mutex);
if (rc == 0) {
session->stats.send_calls += 1;
session->stats.send_bytes += (uint64_t) data_len;
} else {
session->stats.send_errors += 1;
}
if (session->active_ops > 0) {
session->active_ops -= 1;
}
if (session->closing && session->active_ops == 0) {
pthread_cond_broadcast(&session->idle_cond);
}
pthread_mutex_unlock(&session->mutex);
return rc;
}
int omnisocket_udp_session_recv(omnisocket_udp_session_t *session, message_t *out_msg, int timeout_ms) {
udp_client_t *client;
int rc;
if (session == NULL || out_msg == NULL) {
errno = EINVAL;
return -1;
}
if (omnisocket_udp_session_begin_client_op(session, &client) != 0) {
return -1;
}
rc = udp_client_receive_timed(client, out_msg, timeout_ms);
pthread_mutex_lock(&session->mutex);
if (rc == 0) {
session->stats.recv_calls += 1;
session->stats.recv_bytes += (uint64_t) out_msg->body_len;
} else if (rc == 1) {
session->stats.recv_timeouts += 1;
} else {
session->stats.recv_errors += 1;
}
if (session->active_ops > 0) {
session->active_ops -= 1;
}
if (session->closing && session->active_ops == 0) {
pthread_cond_broadcast(&session->idle_cond);
}
pthread_mutex_unlock(&session->mutex);
return rc;
}
int omnisocket_udp_session_recv_into(
omnisocket_udp_session_t *session,
void *buffer,
size_t buffer_len,
udp_client_recv_meta_t *out_meta,
int timeout_ms
) {
udp_client_t *client;
int rc;
if (session == NULL || out_meta == NULL || (buffer == NULL && buffer_len > 0)) {
errno = EINVAL;
return -1;
}
if (omnisocket_udp_session_begin_client_op(session, &client) != 0) {
return -1;
}
rc = udp_client_receive_into(client, buffer, buffer_len, out_meta, timeout_ms);
pthread_mutex_lock(&session->mutex);
if (rc == 0) {
session->stats.recv_calls += 1;
session->stats.recv_bytes += (uint64_t) out_meta->body_len;
} else if (rc == 1) {
session->stats.recv_timeouts += 1;
} else {
session->stats.recv_errors += 1;
}
if (session->active_ops > 0) {
session->active_ops -= 1;
}
if (session->closing && session->active_ops == 0) {
pthread_cond_broadcast(&session->idle_cond);
}
pthread_mutex_unlock(&session->mutex);
return rc;
}
void omnisocket_udp_session_stats_snapshot(omnisocket_udp_session_t *session, omnisocket_session_stats_t *out_stats) {
if (session == NULL || out_stats == NULL) {
return;
}
pthread_mutex_lock(&session->mutex);
*out_stats = session->stats;
pthread_mutex_unlock(&session->mutex);
}