#include "server_udp_relay.h" #include #include #define UDP_RELAY_BUF_SIZE (64U * 1024U) typedef struct udp_relay_client_entry { struct udp_relay_client_entry *next; uint32_t conv; struct sockaddr_storage addr; socklen_t addr_len; } udp_relay_client_entry_t; struct udp_relay { int downstream_fd; int upstream_fd; struct sockaddr_storage upstream_addr; socklen_t upstream_addr_len; char downstream_local_addr[OMNI_MAX_ADDR_TEXT]; char upstream_local_addr[OMNI_MAX_ADDR_TEXT]; struct sockaddr_storage last_client_addr; socklen_t last_client_addr_len; int has_last_client; udp_relay_client_entry_t *clients; pthread_mutex_t lock; pthread_mutex_t log_mu; pthread_mutex_t state_mu; pthread_cond_t state_cond; pthread_t downstream_thread; int downstream_thread_started; pthread_t upstream_thread; int upstream_thread_started; int worker_done; int worker_rc; int worker_errno; int closed; }; static void udp_relay_parse_kcp_summary(const uint8_t *packet, size_t len, int *has_conv, uint32_t *conv, size_t *segment_count) { size_t offset = 0; size_t count = 0; if (has_conv != NULL) { *has_conv = 0; } if (conv != NULL) { *conv = 0; } if (segment_count != NULL) { *segment_count = 0; } if (packet == NULL || len < 4U) { return; } if (has_conv != NULL) { *has_conv = 1; } if (conv != NULL) { *conv = (uint32_t) ((unsigned char) packet[0] | ((unsigned char) packet[1] << 8) | ((unsigned char) packet[2] << 16) | ((unsigned char) packet[3] << 24)); } while (offset + 24U <= len) { uint32_t seg_len = (uint32_t) ((unsigned char) packet[offset + 20] | ((unsigned char) packet[offset + 21] << 8) | ((unsigned char) packet[offset + 22] << 16) | ((unsigned char) packet[offset + 23] << 24)); if (offset + 24U + seg_len > len) { return; } count++; offset += 24U + seg_len; } if (segment_count != NULL) { *segment_count = count; } } static void udp_relay_print_packet(udp_relay_t *relay, const char *event_name, const char *local_addr, const struct sockaddr_storage *remote_addr, socklen_t remote_addr_len, const uint8_t *packet, size_t packet_len) { char remote_addr_text[OMNI_MAX_ADDR_TEXT]; int64_t ts_unix_nano; int has_conv = 0; uint32_t conv = 0; size_t segment_count = 0; if (relay == NULL) { return; } if (remote_addr != NULL && remote_addr_len > 0) { omni_sockaddr_to_string((const struct sockaddr *) remote_addr, remote_addr_len, remote_addr_text, sizeof(remote_addr_text)); } else { remote_addr_text[0] = '\0'; } ts_unix_nano = omni_now_unix_nano(); udp_relay_parse_kcp_summary(packet, packet_len, &has_conv, &conv, &segment_count); pthread_mutex_lock(&relay->log_mu); if (has_conv) { fprintf(stderr, "[relay] ts=%" PRId64 " event=%s local=%s remote=%s bytes=%zu conv=%" PRIu32 " segs=%zu\n", ts_unix_nano, event_name == NULL ? "" : event_name, local_addr == NULL ? "" : local_addr, remote_addr_text, packet_len, conv, segment_count); } else { fprintf(stderr, "[relay] ts=%" PRId64 " event=%s local=%s remote=%s bytes=%zu\n", ts_unix_nano, event_name == NULL ? "" : event_name, local_addr == NULL ? "" : local_addr, remote_addr_text, packet_len); } fflush(stderr); pthread_mutex_unlock(&relay->log_mu); } static int udp_relay_is_closed(udp_relay_t *relay) { int closed; pthread_mutex_lock(&relay->state_mu); closed = relay->closed; pthread_mutex_unlock(&relay->state_mu); return closed; } static void udp_relay_note_result(udp_relay_t *relay, int rc, int errnum) { pthread_mutex_lock(&relay->state_mu); if (!relay->worker_done) { relay->worker_done = 1; relay->worker_rc = rc; relay->worker_errno = errnum; pthread_cond_signal(&relay->state_cond); } pthread_mutex_unlock(&relay->state_mu); } static udp_relay_client_entry_t *udp_relay_find_client_locked(udp_relay_t *relay, uint32_t conv) { udp_relay_client_entry_t *entry; for (entry = relay->clients; entry != NULL; entry = entry->next) { if (entry->conv == conv) { return entry; } } return NULL; } static void udp_relay_record_client(udp_relay_t *relay, int has_conv, uint32_t conv, const struct sockaddr_storage *addr, socklen_t addr_len) { pthread_mutex_lock(&relay->lock); memcpy(&relay->last_client_addr, addr, sizeof(*addr)); relay->last_client_addr_len = addr_len; relay->has_last_client = 1; if (has_conv) { udp_relay_client_entry_t *entry = udp_relay_find_client_locked(relay, conv); if (entry == NULL) { entry = (udp_relay_client_entry_t *) calloc(1, sizeof(*entry)); if (entry != NULL) { entry->conv = conv; entry->next = relay->clients; relay->clients = entry; } } if (entry != NULL) { memcpy(&entry->addr, addr, sizeof(*addr)); entry->addr_len = addr_len; } } pthread_mutex_unlock(&relay->lock); } static int udp_relay_copy_client(udp_relay_t *relay, int has_conv, uint32_t conv, struct sockaddr_storage *addr, socklen_t *addr_len) { int has_client = 0; pthread_mutex_lock(&relay->lock); if (has_conv) { udp_relay_client_entry_t *entry = udp_relay_find_client_locked(relay, conv); if (entry != NULL) { memcpy(addr, &entry->addr, sizeof(*addr)); *addr_len = entry->addr_len; has_client = 1; } } else if (relay->has_last_client) { memcpy(addr, &relay->last_client_addr, sizeof(*addr)); *addr_len = relay->last_client_addr_len; has_client = 1; } pthread_mutex_unlock(&relay->lock); return has_client; } static void *udp_relay_forward_downstream_to_upstream(void *arg) { udp_relay_t *relay = (udp_relay_t *) arg; uint8_t buffer[UDP_RELAY_BUF_SIZE]; for (;;) { struct sockaddr_storage source; socklen_t source_len = sizeof(source); ssize_t n = recvfrom(relay->downstream_fd, buffer, sizeof(buffer), 0, (struct sockaddr *) &source, &source_len); int has_conv = 0; uint32_t conv = 0; if (n < 0) { int errnum = errno; if (errnum == EINTR) { continue; } if (udp_relay_is_closed(relay)) { udp_relay_note_result(relay, 0, 0); } else { udp_relay_note_result(relay, -1, errnum); } return NULL; } udp_relay_parse_kcp_summary(buffer, (size_t) n, &has_conv, &conv, NULL); udp_relay_record_client(relay, has_conv, conv, &source, source_len); udp_relay_print_packet(relay, "relay_downstream_rx", relay->downstream_local_addr, &source, source_len, buffer, (size_t) n); for (;;) { if (send(relay->upstream_fd, buffer, (size_t) n, 0) >= 0) { udp_relay_print_packet(relay, "relay_upstream_tx", relay->upstream_local_addr, &relay->upstream_addr, relay->upstream_addr_len, buffer, (size_t) n); break; } { int errnum = errno; if (errnum == EINTR) { continue; } if (udp_relay_is_closed(relay)) { udp_relay_note_result(relay, 0, 0); } else { udp_relay_note_result(relay, -1, errnum); } return NULL; } } } } static void *udp_relay_forward_upstream_to_downstream(void *arg) { udp_relay_t *relay = (udp_relay_t *) arg; uint8_t buffer[UDP_RELAY_BUF_SIZE]; for (;;) { struct sockaddr_storage client_addr; socklen_t client_addr_len = 0; ssize_t n = recv(relay->upstream_fd, buffer, sizeof(buffer), 0); int has_conv = 0; uint32_t conv = 0; if (n < 0) { int errnum = errno; if (errnum == EINTR) { continue; } if (udp_relay_is_closed(relay)) { udp_relay_note_result(relay, 0, 0); } else { udp_relay_note_result(relay, -1, errnum); } return NULL; } udp_relay_print_packet(relay, "relay_upstream_rx", relay->upstream_local_addr, &relay->upstream_addr, relay->upstream_addr_len, buffer, (size_t) n); udp_relay_parse_kcp_summary(buffer, (size_t) n, &has_conv, &conv, NULL); if (!udp_relay_copy_client(relay, has_conv, conv, &client_addr, &client_addr_len)) { udp_relay_print_packet(relay, "relay_upstream_drop_no_client", relay->upstream_local_addr, &relay->upstream_addr, relay->upstream_addr_len, buffer, (size_t) n); continue; } for (;;) { if (sendto(relay->downstream_fd, buffer, (size_t) n, 0, (struct sockaddr *) &client_addr, client_addr_len) >= 0) { udp_relay_print_packet(relay, "relay_downstream_tx", relay->downstream_local_addr, &client_addr, client_addr_len, buffer, (size_t) n); break; } { int errnum = errno; if (errnum == EINTR) { continue; } if (udp_relay_is_closed(relay)) { udp_relay_note_result(relay, 0, 0); } else { udp_relay_note_result(relay, -1, errnum); } return NULL; } } } } static void udp_relay_join_threads(udp_relay_t *relay) { if (relay->downstream_thread_started) { pthread_join(relay->downstream_thread, NULL); relay->downstream_thread_started = 0; } if (relay->upstream_thread_started) { pthread_join(relay->upstream_thread, NULL); relay->upstream_thread_started = 0; } } udp_relay_t *udp_relay_open(const char *listen_addr, const char *upstream_addr) { struct sockaddr_storage listen_ss; struct sockaddr_storage upstream_ss; struct sockaddr_storage downstream_local_ss; struct sockaddr_storage upstream_local_ss; socklen_t listen_len; socklen_t upstream_len; socklen_t downstream_local_len = sizeof(downstream_local_ss); socklen_t upstream_local_len = sizeof(upstream_local_ss); int family; int fd_listen = -1; int fd_upstream = -1; udp_relay_t *relay = NULL; if (omni_parse_sockaddr(listen_addr, 1, &listen_ss, &listen_len, &family) != 0 || omni_parse_sockaddr(upstream_addr, 0, &upstream_ss, &upstream_len, &family) != 0) { return NULL; } fd_listen = socket(listen_ss.ss_family, SOCK_DGRAM, 0); if (fd_listen < 0) { return NULL; } if (bind(fd_listen, (struct sockaddr *) &listen_ss, listen_len) != 0) { close(fd_listen); return NULL; } fd_upstream = socket(upstream_ss.ss_family, SOCK_DGRAM, 0); if (fd_upstream < 0) { close(fd_listen); return NULL; } if (connect(fd_upstream, (struct sockaddr *) &upstream_ss, upstream_len) != 0) { close(fd_upstream); close(fd_listen); return NULL; } relay = (udp_relay_t *) calloc(1, sizeof(*relay)); if (relay == NULL) { close(fd_upstream); close(fd_listen); return NULL; } relay->downstream_fd = fd_listen; relay->upstream_fd = fd_upstream; memcpy(&relay->upstream_addr, &upstream_ss, sizeof(upstream_ss)); relay->upstream_addr_len = upstream_len; if (getsockname(fd_listen, (struct sockaddr *) &downstream_local_ss, &downstream_local_len) == 0) { omni_sockaddr_to_string((const struct sockaddr *) &downstream_local_ss, downstream_local_len, relay->downstream_local_addr, sizeof(relay->downstream_local_addr)); } else { snprintf(relay->downstream_local_addr, sizeof(relay->downstream_local_addr), "%s", listen_addr == NULL ? "" : listen_addr); } if (getsockname(fd_upstream, (struct sockaddr *) &upstream_local_ss, &upstream_local_len) == 0) { omni_sockaddr_to_string((const struct sockaddr *) &upstream_local_ss, upstream_local_len, relay->upstream_local_addr, sizeof(relay->upstream_local_addr)); } else { snprintf(relay->upstream_local_addr, sizeof(relay->upstream_local_addr), "%s", listen_addr == NULL ? "" : listen_addr); } pthread_mutex_init(&relay->lock, NULL); pthread_mutex_init(&relay->log_mu, NULL); pthread_mutex_init(&relay->state_mu, NULL); pthread_cond_init(&relay->state_cond, NULL); return relay; } int udp_relay_serve(udp_relay_t *relay) { int thread_rc; int rc; int errnum; if (relay == NULL) { errno = EINVAL; return -1; } if (udp_relay_is_closed(relay)) { errno = ECANCELED; return -1; } pthread_mutex_lock(&relay->state_mu); relay->worker_done = 0; relay->worker_rc = 0; relay->worker_errno = 0; pthread_mutex_unlock(&relay->state_mu); thread_rc = pthread_create(&relay->downstream_thread, NULL, udp_relay_forward_downstream_to_upstream, relay); if (thread_rc != 0) { errno = thread_rc; return -1; } relay->downstream_thread_started = 1; thread_rc = pthread_create(&relay->upstream_thread, NULL, udp_relay_forward_upstream_to_downstream, relay); if (thread_rc != 0) { errno = thread_rc; udp_relay_close(relay); udp_relay_join_threads(relay); return -1; } relay->upstream_thread_started = 1; pthread_mutex_lock(&relay->state_mu); while (!relay->worker_done) { pthread_cond_wait(&relay->state_cond, &relay->state_mu); } rc = relay->worker_rc; errnum = relay->worker_errno; pthread_mutex_unlock(&relay->state_mu); udp_relay_close(relay); udp_relay_join_threads(relay); if (rc != 0 && errnum != 0) { errno = errnum; } return rc; } int udp_relay_close(udp_relay_t *relay) { int downstream_fd; int upstream_fd; if (relay == NULL) { return 0; } pthread_mutex_lock(&relay->state_mu); if (relay->closed) { pthread_mutex_unlock(&relay->state_mu); return 0; } relay->closed = 1; downstream_fd = relay->downstream_fd; upstream_fd = relay->upstream_fd; relay->downstream_fd = -1; relay->upstream_fd = -1; pthread_cond_broadcast(&relay->state_cond); pthread_mutex_unlock(&relay->state_mu); if (downstream_fd >= 0) { close(downstream_fd); } if (upstream_fd >= 0) { close(upstream_fd); } return 0; } void udp_relay_free(udp_relay_t *relay) { udp_relay_client_entry_t *entry; udp_relay_client_entry_t *next; if (relay == NULL) { return; } udp_relay_close(relay); udp_relay_join_threads(relay); for (entry = relay->clients; entry != NULL; entry = next) { next = entry->next; free(entry); } pthread_mutex_destroy(&relay->lock); pthread_mutex_destroy(&relay->log_mu); pthread_cond_destroy(&relay->state_cond); pthread_mutex_destroy(&relay->state_mu); free(relay); }