Compare commits
5 Commits
b3fb6d47ab
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| 88ed9e2707 | |||
| 9cfd2815b8 | |||
| 7a8bc4ea33 | |||
| 981b15d16a | |||
| 1e55fa99a0 |
@@ -3,6 +3,7 @@
|
|||||||
#include "peer_kcp_client.h"
|
#include "peer_kcp_client.h"
|
||||||
|
|
||||||
#include <pthread.h>
|
#include <pthread.h>
|
||||||
|
#include <string.h>
|
||||||
|
|
||||||
typedef struct kcppeer_receive_ctx {
|
typedef struct kcppeer_receive_ctx {
|
||||||
kcp_client_t *client;
|
kcp_client_t *client;
|
||||||
@@ -69,6 +70,7 @@ int main(int argc, char **argv) {
|
|||||||
const char *peer_id = "peer-a";
|
const char *peer_id = "peer-a";
|
||||||
const char *server_addr = "127.0.0.1:9002";
|
const char *server_addr = "127.0.0.1:9002";
|
||||||
const char *relay_via = "";
|
const char *relay_via = "";
|
||||||
|
const char *actual_dial_target;
|
||||||
const char *target_peer = "";
|
const char *target_peer = "";
|
||||||
const char *text = "";
|
const char *text = "";
|
||||||
const char *file_path = "";
|
const char *file_path = "";
|
||||||
@@ -238,15 +240,22 @@ int main(int argc, char **argv) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
actual_dial_target = relay_via[0] != '\0' ? relay_via : server_addr;
|
||||||
client = kcp_client_dial(server_addr, relay_via, peer_id, bind_ip, bind_device, latency_logger, packet_logger, stats_logger, stats_interval_ms);
|
client = kcp_client_dial(server_addr, relay_via, peer_id, bind_ip, bind_device, latency_logger, packet_logger, stats_logger, stats_interval_ms);
|
||||||
if (client == NULL) {
|
if (client == NULL) {
|
||||||
fprintf(stderr, "kcppeer: dial kcp server %s failed\n", server_addr);
|
int saved_errno = errno;
|
||||||
|
const char *reason = saved_errno != 0 ? strerror(saved_errno) : "unknown error";
|
||||||
|
if (relay_via[0] != '\0') {
|
||||||
|
fprintf(stderr, "kcppeer: dial target %s failed (logical server %s): %s (errno=%d)\n", actual_dial_target, server_addr, reason, saved_errno);
|
||||||
|
} else {
|
||||||
|
fprintf(stderr, "kcppeer: dial kcp server %s failed: %s (errno=%d)\n", server_addr, reason, saved_errno);
|
||||||
|
}
|
||||||
goto cleanup;
|
goto cleanup;
|
||||||
}
|
}
|
||||||
if (relay_via[0] != '\0') {
|
if (relay_via[0] != '\0') {
|
||||||
fprintf(stderr, "opened KCP session as %s; logical server=%s, actual dial target=%s via relay; register not yet confirmed\n", kcp_client_id(client), server_addr, relay_via);
|
fprintf(stderr, "opened KCP session as %s; logical server=%s, actual dial target=%s via relay; register not yet confirmed\n", kcp_client_id(client), server_addr, actual_dial_target);
|
||||||
} else {
|
} else {
|
||||||
fprintf(stderr, "opened KCP session as %s; logical server=%s, actual dial target=%s; register not yet confirmed\n", kcp_client_id(client), server_addr, server_addr);
|
fprintf(stderr, "opened KCP session as %s; logical server=%s, actual dial target=%s; register not yet confirmed\n", kcp_client_id(client), server_addr, actual_dial_target);
|
||||||
}
|
}
|
||||||
|
|
||||||
receive_ctx.client = client;
|
receive_ctx.client = client;
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
#include "peer_kcp_client.h"
|
#include "peer_kcp_client.h"
|
||||||
|
|
||||||
#include <pthread.h>
|
#include <pthread.h>
|
||||||
|
#include <string.h>
|
||||||
|
|
||||||
struct kcp_client {
|
struct kcp_client {
|
||||||
char id[OMNI_MAX_PEER_ID];
|
char id[OMNI_MAX_PEER_ID];
|
||||||
@@ -80,6 +81,7 @@ kcp_client_t *kcp_client_dial(const char *server_addr, const char *dial_addr, co
|
|||||||
kcp_client_t *client;
|
kcp_client_t *client;
|
||||||
const char *actual_dial_addr = (dial_addr != NULL && dial_addr[0] != '\0') ? dial_addr : server_addr;
|
const char *actual_dial_addr = (dial_addr != NULL && dial_addr[0] != '\0') ? dial_addr : server_addr;
|
||||||
message_t register_msg;
|
message_t register_msg;
|
||||||
|
int saved_errno = 0;
|
||||||
|
|
||||||
client = (kcp_client_t *) calloc(1, sizeof(*client));
|
client = (kcp_client_t *) calloc(1, sizeof(*client));
|
||||||
if (client == NULL) {
|
if (client == NULL) {
|
||||||
@@ -91,7 +93,9 @@ kcp_client_t *kcp_client_dial(const char *server_addr, const char *dial_addr, co
|
|||||||
client->logger = logger;
|
client->logger = logger;
|
||||||
client->conn = kcp_conn_dial(actual_dial_addr, bind_ip, bind_device, packet_logger, logger, OMNI_NODE_ROLE_PEER, peer_id, stats_logger, stats_interval_ms);
|
client->conn = kcp_conn_dial(actual_dial_addr, bind_ip, bind_device, packet_logger, logger, OMNI_NODE_ROLE_PEER, peer_id, stats_logger, stats_interval_ms);
|
||||||
if (client->conn == NULL) {
|
if (client->conn == NULL) {
|
||||||
|
saved_errno = errno;
|
||||||
kcp_client_free(client);
|
kcp_client_free(client);
|
||||||
|
errno = saved_errno;
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -101,7 +105,9 @@ kcp_client_t *kcp_client_dial(const char *server_addr, const char *dial_addr, co
|
|||||||
snprintf(register_msg.from, sizeof(register_msg.from), "%s", peer_id);
|
snprintf(register_msg.from, sizeof(register_msg.from), "%s", peer_id);
|
||||||
snprintf(register_msg.to, sizeof(register_msg.to), "%s", SERVER_PEER_ID);
|
snprintf(register_msg.to, sizeof(register_msg.to), "%s", SERVER_PEER_ID);
|
||||||
if (kcp_conn_send(client->conn, ®ister_msg) != 0) {
|
if (kcp_conn_send(client->conn, ®ister_msg) != 0) {
|
||||||
|
saved_errno = errno;
|
||||||
kcp_client_free(client);
|
kcp_client_free(client);
|
||||||
|
errno = saved_errno;
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
return client;
|
return client;
|
||||||
|
|||||||
@@ -10,10 +10,13 @@ struct udp_relay {
|
|||||||
int upstream_fd;
|
int upstream_fd;
|
||||||
struct sockaddr_storage upstream_addr;
|
struct sockaddr_storage upstream_addr;
|
||||||
socklen_t upstream_addr_len;
|
socklen_t upstream_addr_len;
|
||||||
|
char downstream_local_addr[OMNI_MAX_ADDR_TEXT];
|
||||||
|
char upstream_local_addr[OMNI_MAX_ADDR_TEXT];
|
||||||
struct sockaddr_storage client_addr;
|
struct sockaddr_storage client_addr;
|
||||||
socklen_t client_addr_len;
|
socklen_t client_addr_len;
|
||||||
int has_client;
|
int has_client;
|
||||||
pthread_mutex_t lock;
|
pthread_mutex_t lock;
|
||||||
|
pthread_mutex_t log_mu;
|
||||||
pthread_mutex_t state_mu;
|
pthread_mutex_t state_mu;
|
||||||
pthread_cond_t state_cond;
|
pthread_cond_t state_cond;
|
||||||
pthread_t downstream_thread;
|
pthread_t downstream_thread;
|
||||||
@@ -26,6 +29,88 @@ struct udp_relay {
|
|||||||
int closed;
|
int closed;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
static void udp_relay_parse_kcp_summary(const uint8_t *packet, size_t len, int *has_conv, uint32_t *conv, size_t *segment_count) {
|
||||||
|
size_t offset = 0;
|
||||||
|
size_t count = 0;
|
||||||
|
|
||||||
|
if (has_conv != NULL) {
|
||||||
|
*has_conv = 0;
|
||||||
|
}
|
||||||
|
if (conv != NULL) {
|
||||||
|
*conv = 0;
|
||||||
|
}
|
||||||
|
if (segment_count != NULL) {
|
||||||
|
*segment_count = 0;
|
||||||
|
}
|
||||||
|
if (packet == NULL || len < 4U) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (has_conv != NULL) {
|
||||||
|
*has_conv = 1;
|
||||||
|
}
|
||||||
|
if (conv != NULL) {
|
||||||
|
*conv = (uint32_t) ((unsigned char) packet[0] |
|
||||||
|
((unsigned char) packet[1] << 8) |
|
||||||
|
((unsigned char) packet[2] << 16) |
|
||||||
|
((unsigned char) packet[3] << 24));
|
||||||
|
}
|
||||||
|
while (offset + 24U <= len) {
|
||||||
|
uint32_t seg_len = (uint32_t) ((unsigned char) packet[offset + 20] |
|
||||||
|
((unsigned char) packet[offset + 21] << 8) |
|
||||||
|
((unsigned char) packet[offset + 22] << 16) |
|
||||||
|
((unsigned char) packet[offset + 23] << 24));
|
||||||
|
if (offset + 24U + seg_len > len) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
count++;
|
||||||
|
offset += 24U + seg_len;
|
||||||
|
}
|
||||||
|
if (segment_count != NULL) {
|
||||||
|
*segment_count = count;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static void udp_relay_print_packet(udp_relay_t *relay, const char *event_name, const char *local_addr, const struct sockaddr_storage *remote_addr, socklen_t remote_addr_len, const uint8_t *packet, size_t packet_len) {
|
||||||
|
char remote_addr_text[OMNI_MAX_ADDR_TEXT];
|
||||||
|
int64_t ts_unix_nano;
|
||||||
|
int has_conv = 0;
|
||||||
|
uint32_t conv = 0;
|
||||||
|
size_t segment_count = 0;
|
||||||
|
|
||||||
|
if (relay == NULL) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (remote_addr != NULL && remote_addr_len > 0) {
|
||||||
|
omni_sockaddr_to_string((const struct sockaddr *) remote_addr, remote_addr_len, remote_addr_text, sizeof(remote_addr_text));
|
||||||
|
} else {
|
||||||
|
remote_addr_text[0] = '\0';
|
||||||
|
}
|
||||||
|
ts_unix_nano = omni_now_unix_nano();
|
||||||
|
udp_relay_parse_kcp_summary(packet, packet_len, &has_conv, &conv, &segment_count);
|
||||||
|
|
||||||
|
pthread_mutex_lock(&relay->log_mu);
|
||||||
|
if (has_conv) {
|
||||||
|
fprintf(stderr, "[relay] ts=%" PRId64 " event=%s local=%s remote=%s bytes=%zu conv=%" PRIu32 " segs=%zu\n",
|
||||||
|
ts_unix_nano,
|
||||||
|
event_name == NULL ? "" : event_name,
|
||||||
|
local_addr == NULL ? "" : local_addr,
|
||||||
|
remote_addr_text,
|
||||||
|
packet_len,
|
||||||
|
conv,
|
||||||
|
segment_count);
|
||||||
|
} else {
|
||||||
|
fprintf(stderr, "[relay] ts=%" PRId64 " event=%s local=%s remote=%s bytes=%zu\n",
|
||||||
|
ts_unix_nano,
|
||||||
|
event_name == NULL ? "" : event_name,
|
||||||
|
local_addr == NULL ? "" : local_addr,
|
||||||
|
remote_addr_text,
|
||||||
|
packet_len);
|
||||||
|
}
|
||||||
|
fflush(stderr);
|
||||||
|
pthread_mutex_unlock(&relay->log_mu);
|
||||||
|
}
|
||||||
|
|
||||||
static int udp_relay_is_closed(udp_relay_t *relay) {
|
static int udp_relay_is_closed(udp_relay_t *relay) {
|
||||||
int closed;
|
int closed;
|
||||||
|
|
||||||
@@ -90,8 +175,10 @@ static void *udp_relay_forward_downstream_to_upstream(void *arg) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
udp_relay_record_client(relay, &source, source_len);
|
udp_relay_record_client(relay, &source, source_len);
|
||||||
|
udp_relay_print_packet(relay, "relay_downstream_rx", relay->downstream_local_addr, &source, source_len, buffer, (size_t) n);
|
||||||
for (;;) {
|
for (;;) {
|
||||||
if (send(relay->upstream_fd, buffer, (size_t) n, 0) >= 0) {
|
if (send(relay->upstream_fd, buffer, (size_t) n, 0) >= 0) {
|
||||||
|
udp_relay_print_packet(relay, "relay_upstream_tx", relay->upstream_local_addr, &relay->upstream_addr, relay->upstream_addr_len, buffer, (size_t) n);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
{
|
{
|
||||||
@@ -132,12 +219,15 @@ static void *udp_relay_forward_upstream_to_downstream(void *arg) {
|
|||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
udp_relay_print_packet(relay, "relay_upstream_rx", relay->upstream_local_addr, &relay->upstream_addr, relay->upstream_addr_len, buffer, (size_t) n);
|
||||||
if (!udp_relay_copy_client(relay, &client_addr, &client_addr_len)) {
|
if (!udp_relay_copy_client(relay, &client_addr, &client_addr_len)) {
|
||||||
|
udp_relay_print_packet(relay, "relay_upstream_drop_no_client", relay->upstream_local_addr, &relay->upstream_addr, relay->upstream_addr_len, buffer, (size_t) n);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
for (;;) {
|
for (;;) {
|
||||||
if (sendto(relay->downstream_fd, buffer, (size_t) n, 0, (struct sockaddr *) &client_addr, client_addr_len) >= 0) {
|
if (sendto(relay->downstream_fd, buffer, (size_t) n, 0, (struct sockaddr *) &client_addr, client_addr_len) >= 0) {
|
||||||
|
udp_relay_print_packet(relay, "relay_downstream_tx", relay->downstream_local_addr, &client_addr, client_addr_len, buffer, (size_t) n);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
{
|
{
|
||||||
@@ -170,8 +260,12 @@ static void udp_relay_join_threads(udp_relay_t *relay) {
|
|||||||
udp_relay_t *udp_relay_open(const char *listen_addr, const char *upstream_addr) {
|
udp_relay_t *udp_relay_open(const char *listen_addr, const char *upstream_addr) {
|
||||||
struct sockaddr_storage listen_ss;
|
struct sockaddr_storage listen_ss;
|
||||||
struct sockaddr_storage upstream_ss;
|
struct sockaddr_storage upstream_ss;
|
||||||
|
struct sockaddr_storage downstream_local_ss;
|
||||||
|
struct sockaddr_storage upstream_local_ss;
|
||||||
socklen_t listen_len;
|
socklen_t listen_len;
|
||||||
socklen_t upstream_len;
|
socklen_t upstream_len;
|
||||||
|
socklen_t downstream_local_len = sizeof(downstream_local_ss);
|
||||||
|
socklen_t upstream_local_len = sizeof(upstream_local_ss);
|
||||||
int family;
|
int family;
|
||||||
int fd_listen = -1;
|
int fd_listen = -1;
|
||||||
int fd_upstream = -1;
|
int fd_upstream = -1;
|
||||||
@@ -209,7 +303,18 @@ udp_relay_t *udp_relay_open(const char *listen_addr, const char *upstream_addr)
|
|||||||
relay->upstream_fd = fd_upstream;
|
relay->upstream_fd = fd_upstream;
|
||||||
memcpy(&relay->upstream_addr, &upstream_ss, sizeof(upstream_ss));
|
memcpy(&relay->upstream_addr, &upstream_ss, sizeof(upstream_ss));
|
||||||
relay->upstream_addr_len = upstream_len;
|
relay->upstream_addr_len = upstream_len;
|
||||||
|
if (getsockname(fd_listen, (struct sockaddr *) &downstream_local_ss, &downstream_local_len) == 0) {
|
||||||
|
omni_sockaddr_to_string((const struct sockaddr *) &downstream_local_ss, downstream_local_len, relay->downstream_local_addr, sizeof(relay->downstream_local_addr));
|
||||||
|
} else {
|
||||||
|
snprintf(relay->downstream_local_addr, sizeof(relay->downstream_local_addr), "%s", listen_addr == NULL ? "" : listen_addr);
|
||||||
|
}
|
||||||
|
if (getsockname(fd_upstream, (struct sockaddr *) &upstream_local_ss, &upstream_local_len) == 0) {
|
||||||
|
omni_sockaddr_to_string((const struct sockaddr *) &upstream_local_ss, upstream_local_len, relay->upstream_local_addr, sizeof(relay->upstream_local_addr));
|
||||||
|
} else {
|
||||||
|
snprintf(relay->upstream_local_addr, sizeof(relay->upstream_local_addr), "%s", listen_addr == NULL ? "" : listen_addr);
|
||||||
|
}
|
||||||
pthread_mutex_init(&relay->lock, NULL);
|
pthread_mutex_init(&relay->lock, NULL);
|
||||||
|
pthread_mutex_init(&relay->log_mu, NULL);
|
||||||
pthread_mutex_init(&relay->state_mu, NULL);
|
pthread_mutex_init(&relay->state_mu, NULL);
|
||||||
pthread_cond_init(&relay->state_cond, NULL);
|
pthread_cond_init(&relay->state_cond, NULL);
|
||||||
return relay;
|
return relay;
|
||||||
@@ -305,6 +410,7 @@ void udp_relay_free(udp_relay_t *relay) {
|
|||||||
udp_relay_close(relay);
|
udp_relay_close(relay);
|
||||||
udp_relay_join_threads(relay);
|
udp_relay_join_threads(relay);
|
||||||
pthread_mutex_destroy(&relay->lock);
|
pthread_mutex_destroy(&relay->lock);
|
||||||
|
pthread_mutex_destroy(&relay->log_mu);
|
||||||
pthread_cond_destroy(&relay->state_cond);
|
pthread_cond_destroy(&relay->state_cond);
|
||||||
pthread_mutex_destroy(&relay->state_mu);
|
pthread_mutex_destroy(&relay->state_mu);
|
||||||
free(relay);
|
free(relay);
|
||||||
|
|||||||
@@ -36,6 +36,7 @@ typedef struct kcp_socket_debug_state {
|
|||||||
uint32_t next_tx_id;
|
uint32_t next_tx_id;
|
||||||
kcp_packet_debug_pending_t *pending_head;
|
kcp_packet_debug_pending_t *pending_head;
|
||||||
atomic_int closed;
|
atomic_int closed;
|
||||||
|
atomic_int last_send_errno;
|
||||||
} kcp_socket_debug_state_t;
|
} kcp_socket_debug_state_t;
|
||||||
|
|
||||||
typedef struct kcp_session_entry kcp_session_entry_t;
|
typedef struct kcp_session_entry kcp_session_entry_t;
|
||||||
@@ -766,6 +767,7 @@ static void *kcp_socket_debug_errqueue_thread(void *arg) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
static int kcp_socket_debug_init(kcp_socket_debug_state_t *state, int fd, kcp_packet_debug_logger_t *logger, const char *node_role, const char *node_id) {
|
static int kcp_socket_debug_init(kcp_socket_debug_state_t *state, int fd, kcp_packet_debug_logger_t *logger, const char *node_role, const char *node_id) {
|
||||||
|
int thread_rc;
|
||||||
memset(state, 0, sizeof(*state));
|
memset(state, 0, sizeof(*state));
|
||||||
state->fd = fd;
|
state->fd = fd;
|
||||||
state->logger = logger;
|
state->logger = logger;
|
||||||
@@ -779,7 +781,9 @@ static int kcp_socket_debug_init(kcp_socket_debug_state_t *state, int fd, kcp_pa
|
|||||||
pthread_mutex_destroy(&state->pending_mu);
|
pthread_mutex_destroy(&state->pending_mu);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
if (pthread_create(&state->errqueue_thread, NULL, kcp_socket_debug_errqueue_thread, state) != 0) {
|
thread_rc = pthread_create(&state->errqueue_thread, NULL, kcp_socket_debug_errqueue_thread, state);
|
||||||
|
if (thread_rc != 0) {
|
||||||
|
errno = thread_rc;
|
||||||
pthread_mutex_destroy(&state->write_mu);
|
pthread_mutex_destroy(&state->write_mu);
|
||||||
pthread_mutex_destroy(&state->pending_mu);
|
pthread_mutex_destroy(&state->pending_mu);
|
||||||
return -1;
|
return -1;
|
||||||
@@ -803,17 +807,23 @@ static int kcp_socket_send_packet(kcp_socket_debug_state_t *state, const struct
|
|||||||
uint32_t tx_id = 0;
|
uint32_t tx_id = 0;
|
||||||
ssize_t rc;
|
ssize_t rc;
|
||||||
if (state->logger != NULL && kcp_socket_debug_reserve_tx(state, remote_addr, remote_addr_len, packet, packet_len, &tx_id) != 0) {
|
if (state->logger != NULL && kcp_socket_debug_reserve_tx(state, remote_addr, remote_addr_len, packet, packet_len, &tx_id) != 0) {
|
||||||
|
atomic_store(&state->last_send_errno, errno != 0 ? errno : EIO);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
pthread_mutex_lock(&state->write_mu);
|
pthread_mutex_lock(&state->write_mu);
|
||||||
rc = sendto(state->fd, packet, packet_len, 0, (const struct sockaddr *) remote_addr, remote_addr_len);
|
rc = sendto(state->fd, packet, packet_len, 0, (const struct sockaddr *) remote_addr, remote_addr_len);
|
||||||
pthread_mutex_unlock(&state->write_mu);
|
pthread_mutex_unlock(&state->write_mu);
|
||||||
if (rc < 0 || (size_t) rc != packet_len) {
|
if (rc < 0 || (size_t) rc != packet_len) {
|
||||||
|
if (rc >= 0 && (size_t) rc != packet_len && errno == 0) {
|
||||||
|
errno = EIO;
|
||||||
|
}
|
||||||
|
atomic_store(&state->last_send_errno, errno != 0 ? errno : EIO);
|
||||||
if (state->logger != NULL) {
|
if (state->logger != NULL) {
|
||||||
kcp_socket_debug_rollback_tx(state, tx_id);
|
kcp_socket_debug_rollback_tx(state, tx_id);
|
||||||
}
|
}
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
atomic_store(&state->last_send_errno, 0);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1106,10 +1116,13 @@ static void *kcp_update_thread_main(void *arg) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
static int kcp_conn_start_stats_thread(kcp_conn_t *conn) {
|
static int kcp_conn_start_stats_thread(kcp_conn_t *conn) {
|
||||||
|
int thread_rc;
|
||||||
if (conn == NULL || conn->stats_logger == NULL || conn->stats_thread_started) {
|
if (conn == NULL || conn->stats_logger == NULL || conn->stats_thread_started) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
if (pthread_create(&conn->stats_thread, NULL, kcp_stats_thread_main, conn) != 0) {
|
thread_rc = pthread_create(&conn->stats_thread, NULL, kcp_stats_thread_main, conn);
|
||||||
|
if (thread_rc != 0) {
|
||||||
|
errno = thread_rc;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
conn->stats_thread_started = 1;
|
conn->stats_thread_started = 1;
|
||||||
@@ -1119,8 +1132,10 @@ static int kcp_conn_start_stats_thread(kcp_conn_t *conn) {
|
|||||||
static kcp_conn_t *kcp_conn_alloc_common(int fd, const struct sockaddr_storage *remote_addr, socklen_t remote_addr_len, kcp_socket_debug_state_t *sock_state, latency_logger_t *logger, const char *node_role, const char *node_id, kcp_session_stats_logger_t *stats_logger, int stats_interval_ms) {
|
static kcp_conn_t *kcp_conn_alloc_common(int fd, const struct sockaddr_storage *remote_addr, socklen_t remote_addr_len, kcp_socket_debug_state_t *sock_state, latency_logger_t *logger, const char *node_role, const char *node_id, kcp_session_stats_logger_t *stats_logger, int stats_interval_ms) {
|
||||||
kcp_conn_t *conn = (kcp_conn_t *) calloc(1, sizeof(*conn));
|
kcp_conn_t *conn = (kcp_conn_t *) calloc(1, sizeof(*conn));
|
||||||
uint32_t conv;
|
uint32_t conv;
|
||||||
|
int thread_rc;
|
||||||
|
|
||||||
if (conn == NULL) {
|
if (conn == NULL) {
|
||||||
|
errno = ENOMEM;
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
conn->fd = fd;
|
conn->fd = fd;
|
||||||
@@ -1146,6 +1161,7 @@ static kcp_conn_t *kcp_conn_alloc_common(int fd, const struct sockaddr_storage *
|
|||||||
}
|
}
|
||||||
conn->kcp = ikcp_create(conv, conn);
|
conn->kcp = ikcp_create(conv, conn);
|
||||||
if (conn->kcp == NULL) {
|
if (conn->kcp == NULL) {
|
||||||
|
errno = ENOMEM;
|
||||||
protocol_frame_decoder_destroy(&conn->decoder);
|
protocol_frame_decoder_destroy(&conn->decoder);
|
||||||
pthread_cond_destroy(&conn->rx_cond);
|
pthread_cond_destroy(&conn->rx_cond);
|
||||||
pthread_mutex_destroy(&conn->kcp_mu);
|
pthread_mutex_destroy(&conn->kcp_mu);
|
||||||
@@ -1177,7 +1193,9 @@ static kcp_conn_t *kcp_conn_alloc_common(int fd, const struct sockaddr_storage *
|
|||||||
free(conn);
|
free(conn);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
if (pthread_create(&conn->update_thread, NULL, kcp_update_thread_main, conn) != 0) {
|
thread_rc = pthread_create(&conn->update_thread, NULL, kcp_update_thread_main, conn);
|
||||||
|
if (thread_rc != 0) {
|
||||||
|
errno = thread_rc;
|
||||||
if (conn->stats_thread_started) {
|
if (conn->stats_thread_started) {
|
||||||
atomic_store(&conn->closed, 1);
|
atomic_store(&conn->closed, 1);
|
||||||
pthread_join(conn->stats_thread, NULL);
|
pthread_join(conn->stats_thread, NULL);
|
||||||
@@ -1202,12 +1220,14 @@ kcp_conn_t *kcp_conn_dial(const char *server_addr, const char *bind_ip, const ch
|
|||||||
int fd = kcp_socket_open_dial(server_addr, bind_ip, bind_device, &remote_addr, &remote_len, &family);
|
int fd = kcp_socket_open_dial(server_addr, bind_ip, bind_device, &remote_addr, &remote_len, &family);
|
||||||
kcp_conn_t *conn;
|
kcp_conn_t *conn;
|
||||||
kcp_socket_debug_state_t *sock_state;
|
kcp_socket_debug_state_t *sock_state;
|
||||||
|
int thread_rc;
|
||||||
(void) family;
|
(void) family;
|
||||||
if (fd < 0) {
|
if (fd < 0) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
sock_state = (kcp_socket_debug_state_t *) calloc(1, sizeof(*sock_state));
|
sock_state = (kcp_socket_debug_state_t *) calloc(1, sizeof(*sock_state));
|
||||||
if (sock_state == NULL) {
|
if (sock_state == NULL) {
|
||||||
|
errno = ENOMEM;
|
||||||
close(fd);
|
close(fd);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
@@ -1225,7 +1245,9 @@ kcp_conn_t *kcp_conn_dial(const char *server_addr, const char *bind_ip, const ch
|
|||||||
}
|
}
|
||||||
conn->is_client = 1;
|
conn->is_client = 1;
|
||||||
conn->owns_socket = 1;
|
conn->owns_socket = 1;
|
||||||
if (pthread_create(&conn->recv_thread, NULL, kcp_client_recv_thread_main, conn) != 0) {
|
thread_rc = pthread_create(&conn->recv_thread, NULL, kcp_client_recv_thread_main, conn);
|
||||||
|
if (thread_rc != 0) {
|
||||||
|
errno = thread_rc;
|
||||||
kcp_conn_free(conn);
|
kcp_conn_free(conn);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
@@ -1518,6 +1540,8 @@ int kcp_conn_configure_runtime(kcp_conn_t *conn, latency_logger_t *logger, const
|
|||||||
int kcp_conn_send(kcp_conn_t *conn, const message_t *msg) {
|
int kcp_conn_send(kcp_conn_t *conn, const message_t *msg) {
|
||||||
uint8_t *frame = NULL;
|
uint8_t *frame = NULL;
|
||||||
size_t frame_len = 0;
|
size_t frame_len = 0;
|
||||||
|
int send_errno = 0;
|
||||||
|
int kcp_send_rc = 0;
|
||||||
if (conn == NULL || msg == NULL) {
|
if (conn == NULL || msg == NULL) {
|
||||||
errno = EINVAL;
|
errno = EINVAL;
|
||||||
return -1;
|
return -1;
|
||||||
@@ -1529,14 +1553,23 @@ int kcp_conn_send(kcp_conn_t *conn, const message_t *msg) {
|
|||||||
kcp_log_session_snapshot(conn, "send_handoff_begin");
|
kcp_log_session_snapshot(conn, "send_handoff_begin");
|
||||||
kcp_process_sampler_request_sample(conn->process_sampler, "send_handoff_begin");
|
kcp_process_sampler_request_sample(conn->process_sampler, "send_handoff_begin");
|
||||||
pthread_mutex_lock(&conn->kcp_mu);
|
pthread_mutex_lock(&conn->kcp_mu);
|
||||||
|
atomic_store(&conn->sock_state->last_send_errno, 0);
|
||||||
conn->kcp->current = omni_now_millis32();
|
conn->kcp->current = omni_now_millis32();
|
||||||
if (ikcp_send(conn->kcp, (const char *) frame, (int) frame_len) != 0) {
|
kcp_send_rc = ikcp_send(conn->kcp, (const char *) frame, (int) frame_len);
|
||||||
|
if (kcp_send_rc < 0) {
|
||||||
pthread_mutex_unlock(&conn->kcp_mu);
|
pthread_mutex_unlock(&conn->kcp_mu);
|
||||||
|
errno = kcp_send_rc == -2 ? EMSGSIZE : EINVAL;
|
||||||
free(frame);
|
free(frame);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
ikcp_flush(conn->kcp);
|
ikcp_flush(conn->kcp);
|
||||||
|
send_errno = atomic_load(&conn->sock_state->last_send_errno);
|
||||||
pthread_mutex_unlock(&conn->kcp_mu);
|
pthread_mutex_unlock(&conn->kcp_mu);
|
||||||
|
if (send_errno != 0) {
|
||||||
|
errno = send_errno;
|
||||||
|
free(frame);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
kcp_log_session_snapshot(conn, "send_handoff_end");
|
kcp_log_session_snapshot(conn, "send_handoff_end");
|
||||||
kcp_process_sampler_request_sample(conn->process_sampler, "send_handoff_end");
|
kcp_process_sampler_request_sample(conn->process_sampler, "send_handoff_end");
|
||||||
latencylog_log_message_event(conn->logger, conn->node_role, conn->node_id, EVENT_SEND_HANDOFF_END, msg);
|
latencylog_log_message_event(conn->logger, conn->node_role, conn->node_id, EVENT_SEND_HANDOFF_END, msg);
|
||||||
|
|||||||
Reference in New Issue
Block a user