del: 将go版本的内容删除,只保留处理日志功能

This commit is contained in:
2026-03-30 15:57:36 +08:00
parent 88ed9e2707
commit 24467c04c0
117 changed files with 142 additions and 13890 deletions

77
src/interactive.c Normal file
View File

@@ -0,0 +1,77 @@
#include "interactive.h"
#include <ctype.h>
static void interactive_skip_spaces(const char **cursor) {
while (**cursor != '\0' && isspace((unsigned char) **cursor)) {
(*cursor)++;
}
}
int interactive_parse_command(const char *line, interactive_command_t *command, char *err, size_t err_len) {
const char *cursor = line;
char action[16];
size_t action_len = 0;
size_t to_len = 0;
size_t value_len;
if (line == NULL || command == NULL) {
snprintf(err, err_len, "interactive: invalid command");
return -1;
}
memset(command, 0, sizeof(*command));
interactive_skip_spaces(&cursor);
while (*cursor != '\0' && !isspace((unsigned char) *cursor) && action_len + 1 < sizeof(action)) {
action[action_len++] = *cursor++;
}
action[action_len] = '\0';
if (action_len == 0) {
snprintf(err, err_len, "interactive: empty command");
return -1;
}
if (strcmp(action, "help") == 0) {
command->type = INTERACTIVE_CMD_HELP;
return 0;
}
if (strcmp(action, "quit") == 0) {
command->type = INTERACTIVE_CMD_QUIT;
return 0;
}
interactive_skip_spaces(&cursor);
while (*cursor != '\0' && !isspace((unsigned char) *cursor) && to_len + 1 < sizeof(command->to)) {
command->to[to_len++] = *cursor++;
}
command->to[to_len] = '\0';
interactive_skip_spaces(&cursor);
if (command->to[0] == '\0' || *cursor == '\0') {
snprintf(err, err_len, "interactive: missing target or value");
return -1;
}
value_len = strlen(cursor);
if (value_len >= sizeof(command->value)) {
snprintf(err, err_len, "interactive: value too long");
return -1;
}
snprintf(command->value, sizeof(command->value), "%s", cursor);
if (strcmp(action, "text") == 0) {
command->type = INTERACTIVE_CMD_TEXT;
return 0;
}
if (strcmp(action, "file") == 0) {
command->type = INTERACTIVE_CMD_FILE;
return 0;
}
snprintf(err, err_len, "interactive: unknown command %s", action);
return -1;
}
void interactive_print_help(FILE *out, const char *transport_name) {
fprintf(out, "interactive mode commands (%s):\n", transport_name);
fprintf(out, " help show this help\n");
fprintf(out, " text <peer> <message> send one text message\n");
fprintf(out, " file <peer> <path> send one file\n");
fprintf(out, " quit exit this process\n");
}

166
src/kcp_packet_debug.c Normal file
View File

@@ -0,0 +1,166 @@
#include "kcp_packet_debug.h"
kcp_packet_debug_logger_t *kcp_packet_debug_open_jsonl(const char *path) {
kcp_packet_debug_logger_t *logger;
FILE *file;
if (path == NULL || path[0] == '\0') {
return NULL;
}
if (omni_ensure_parent_dir(path) != 0) {
return NULL;
}
file = fopen(path, "ab");
if (file == NULL) {
return NULL;
}
logger = (kcp_packet_debug_logger_t *) calloc(1, sizeof(*logger));
if (logger == NULL) {
fclose(file);
return NULL;
}
omni_file_logger_init(&logger->file_logger, file);
logger->enabled = 1;
return logger;
}
void kcp_packet_debug_close(kcp_packet_debug_logger_t *logger) {
if (logger == NULL) {
return;
}
if (logger->file_logger.file != NULL) {
fclose(logger->file_logger.file);
}
omni_file_logger_destroy(&logger->file_logger);
free(logger);
}
void kcp_packet_debug_record_clear(kcp_packet_debug_record_t *record) {
if (record == NULL) {
return;
}
free(record->segments);
memset(record, 0, sizeof(*record));
}
int kcp_packet_debug_log(kcp_packet_debug_logger_t *logger, const kcp_packet_debug_record_t *record) {
char *event = NULL;
char *node_role = NULL;
char *node_id = NULL;
char *local_addr = NULL;
char *remote_addr = NULL;
char *segments_json = NULL;
char *tx_id_text = NULL;
char *conv_text = NULL;
char *line = NULL;
size_t i;
size_t cap = 128U;
size_t len = 0U;
if (logger == NULL || record == NULL || !logger->enabled) {
return 0;
}
event = omni_json_escape(record->event);
node_role = omni_json_escape(record->node_role);
node_id = omni_json_escape(record->node_id);
local_addr = omni_json_escape(record->local_addr);
remote_addr = omni_json_escape(record->remote_addr);
if (event == NULL || node_role == NULL || node_id == NULL || local_addr == NULL || remote_addr == NULL) {
free(event);
free(node_role);
free(node_id);
free(local_addr);
free(remote_addr);
return -1;
}
segments_json = (char *) malloc(cap);
if (segments_json == NULL) {
free(event);
free(node_role);
free(node_id);
free(local_addr);
free(remote_addr);
return -1;
}
segments_json[len++] = '[';
for (i = 0; i < record->segment_count; ++i) {
int written;
while (len + 96U > cap) {
char *next = (char *) realloc(segments_json, cap * 2U);
if (next == NULL) {
free(event);
free(node_role);
free(node_id);
free(local_addr);
free(remote_addr);
free(segments_json);
return -1;
}
segments_json = next;
cap *= 2U;
}
written = snprintf(
segments_json + len,
cap - len,
"%s{\"cmd\":%u,\"sn\":%u,\"una\":%u,\"frg\":%u,\"wnd\":%u,\"len\":%u}",
i == 0 ? "" : ",",
record->segments[i].cmd,
record->segments[i].sn,
record->segments[i].una,
record->segments[i].frg,
record->segments[i].wnd,
record->segments[i].len
);
len += (size_t) written;
}
segments_json[len++] = ']';
segments_json[len] = '\0';
tx_id_text = record->has_udp_tx_id ? omni_strdup_printf("%u", record->udp_tx_id) : omni_strdup("null");
conv_text = record->has_kcp_conv ? omni_strdup_printf("%u", record->kcp_conv) : omni_strdup("null");
if (tx_id_text == NULL || conv_text == NULL) {
free(event);
free(node_role);
free(node_id);
free(local_addr);
free(remote_addr);
free(segments_json);
free(tx_id_text);
free(conv_text);
return -1;
}
line = omni_strdup_printf(
"{\"event\":\"%s\",\"node_role\":\"%s\",\"node_id\":\"%s\",\"local_addr\":\"%s\",\"remote_addr\":\"%s\",\"packet_bytes\":%d,\"udp_tx_id\":%s,\"kcp_conv\":%s,\"segments\":%s,\"ts_unix_nano\":%" PRId64 "}",
event,
node_role,
node_id,
local_addr,
remote_addr,
record->packet_bytes,
tx_id_text,
conv_text,
segments_json,
record->ts_unix_nano
);
free(event);
free(node_role);
free(node_id);
free(local_addr);
free(remote_addr);
free(segments_json);
free(tx_id_text);
free(conv_text);
if (line == NULL) {
return -1;
}
if (omni_file_logger_write_line(&logger->file_logger, line) != 0) {
free(line);
return -1;
}
free(line);
return 0;
}

258
src/kcp_session_stats.c Normal file
View File

@@ -0,0 +1,258 @@
#include "kcp_session_stats.h"
static int kcp_session_stats_append(char **line, size_t *len, const char *suffix) {
size_t suffix_len;
char *next;
if (line == NULL || len == NULL || suffix == NULL) {
errno = EINVAL;
return -1;
}
suffix_len = strlen(suffix);
next = (char *) realloc(*line, *len + suffix_len + 1U);
if (next == NULL) {
return -1;
}
memcpy(next + *len, suffix, suffix_len + 1U);
*line = next;
*len += suffix_len;
return 0;
}
static int kcp_session_stats_appendf(char **line, size_t *len, const char *fmt, ...) {
va_list args;
va_list copy;
int needed;
char *buffer;
if (line == NULL || len == NULL || fmt == NULL) {
errno = EINVAL;
return -1;
}
va_start(args, fmt);
va_copy(copy, args);
needed = vsnprintf(NULL, 0, fmt, copy);
va_end(copy);
if (needed < 0) {
va_end(args);
return -1;
}
buffer = (char *) malloc((size_t) needed + 1U);
if (buffer == NULL) {
va_end(args);
return -1;
}
vsnprintf(buffer, (size_t) needed + 1U, fmt, args);
va_end(args);
if (kcp_session_stats_append(line, len, buffer) != 0) {
free(buffer);
return -1;
}
free(buffer);
return 0;
}
kcp_session_stats_logger_t *kcp_session_stats_open_jsonl(const char *path) {
kcp_session_stats_logger_t *logger;
FILE *file;
if (path == NULL || path[0] == '\0') {
return NULL;
}
if (omni_ensure_parent_dir(path) != 0) {
return NULL;
}
file = fopen(path, "ab");
if (file == NULL) {
return NULL;
}
logger = (kcp_session_stats_logger_t *) calloc(1, sizeof(*logger));
if (logger == NULL) {
fclose(file);
return NULL;
}
omni_file_logger_init(&logger->file_logger, file);
logger->enabled = 1;
return logger;
}
void kcp_session_stats_close(kcp_session_stats_logger_t *logger) {
if (logger == NULL) {
return;
}
if (logger->file_logger.file != NULL) {
fclose(logger->file_logger.file);
}
omni_file_logger_destroy(&logger->file_logger);
free(logger);
}
int kcp_session_stats_log(kcp_session_stats_logger_t *logger, const kcp_session_stats_record_t *record) {
char *record_type = NULL;
char *node_role = NULL;
char *node_id = NULL;
char *local_addr = NULL;
char *remote_addr = NULL;
char *sample_reason = NULL;
char *line = NULL;
size_t line_len = 0;
if (logger == NULL || record == NULL || !logger->enabled) {
return 0;
}
record_type = omni_json_escape(record->record_type);
node_role = omni_json_escape(record->node_role);
node_id = omni_json_escape(record->node_id);
local_addr = omni_json_escape(record->local_addr);
remote_addr = omni_json_escape(record->remote_addr);
sample_reason = omni_json_escape(record->sample_reason);
if (record_type == NULL || node_role == NULL || node_id == NULL || local_addr == NULL || remote_addr == NULL || sample_reason == NULL) {
free(record_type);
free(node_role);
free(node_id);
free(local_addr);
free(remote_addr);
free(sample_reason);
return -1;
}
line = omni_strdup("");
if (line == NULL) {
free(record_type);
free(node_role);
free(node_id);
free(local_addr);
free(remote_addr);
free(sample_reason);
return -1;
}
if (kcp_session_stats_appendf(&line, &line_len, "{\"record_type\":\"%s\",\"node_role\":\"%s\",\"node_id\":\"%s\",\"ts_unix_nano\":%" PRId64 ",\"sample_reason\":\"%s\"",
record_type,
node_role,
node_id,
record->ts_unix_nano,
sample_reason) != 0) {
goto cleanup;
}
if (record->local_addr[0] != '\0' &&
kcp_session_stats_appendf(&line, &line_len, ",\"local_addr\":\"%s\"", local_addr) != 0) {
goto cleanup;
}
if (record->remote_addr[0] != '\0' &&
kcp_session_stats_appendf(&line, &line_len, ",\"remote_addr\":\"%s\"", remote_addr) != 0) {
goto cleanup;
}
if (record->has_conv &&
kcp_session_stats_appendf(&line, &line_len, ",\"conv\":%u", record->conv) != 0) {
goto cleanup;
}
if (record->has_rto_ms &&
kcp_session_stats_appendf(&line, &line_len, ",\"rto_ms\":%u", record->rto_ms) != 0) {
goto cleanup;
}
if (record->has_srtt_ms &&
kcp_session_stats_appendf(&line, &line_len, ",\"srtt_ms\":%d", record->srtt_ms) != 0) {
goto cleanup;
}
if (record->has_srttvar_ms &&
kcp_session_stats_appendf(&line, &line_len, ",\"srttvar_ms\":%d", record->srttvar_ms) != 0) {
goto cleanup;
}
if (record->has_bytes_sent &&
kcp_session_stats_appendf(&line, &line_len, ",\"bytes_sent\":%" PRIu64, record->bytes_sent) != 0) {
goto cleanup;
}
if (record->has_bytes_received &&
kcp_session_stats_appendf(&line, &line_len, ",\"bytes_received\":%" PRIu64, record->bytes_received) != 0) {
goto cleanup;
}
if (record->has_in_pkts &&
kcp_session_stats_appendf(&line, &line_len, ",\"in_pkts\":%" PRIu64, record->in_pkts) != 0) {
goto cleanup;
}
if (record->has_out_pkts &&
kcp_session_stats_appendf(&line, &line_len, ",\"out_pkts\":%" PRIu64, record->out_pkts) != 0) {
goto cleanup;
}
if (record->has_in_segs &&
kcp_session_stats_appendf(&line, &line_len, ",\"in_segs\":%" PRIu64, record->in_segs) != 0) {
goto cleanup;
}
if (record->has_out_segs &&
kcp_session_stats_appendf(&line, &line_len, ",\"out_segs\":%" PRIu64, record->out_segs) != 0) {
goto cleanup;
}
if (record->has_retrans_segs &&
kcp_session_stats_appendf(&line, &line_len, ",\"retrans_segs\":%" PRIu64, record->retrans_segs) != 0) {
goto cleanup;
}
if (record->has_fast_retrans_segs &&
kcp_session_stats_appendf(&line, &line_len, ",\"fast_retrans_segs\":%" PRIu64, record->fast_retrans_segs) != 0) {
goto cleanup;
}
if (record->has_early_retrans_segs &&
kcp_session_stats_appendf(&line, &line_len, ",\"early_retrans_segs\":%" PRIu64, record->early_retrans_segs) != 0) {
goto cleanup;
}
if (record->has_lost_segs &&
kcp_session_stats_appendf(&line, &line_len, ",\"lost_segs\":%" PRIu64, record->lost_segs) != 0) {
goto cleanup;
}
if (record->has_repeat_segs &&
kcp_session_stats_appendf(&line, &line_len, ",\"repeat_segs\":%" PRIu64, record->repeat_segs) != 0) {
goto cleanup;
}
if (record->has_in_errs &&
kcp_session_stats_appendf(&line, &line_len, ",\"in_errs\":%" PRIu64, record->in_errs) != 0) {
goto cleanup;
}
if (record->has_kcp_in_errs &&
kcp_session_stats_appendf(&line, &line_len, ",\"kcp_in_errs\":%" PRIu64, record->kcp_in_errs) != 0) {
goto cleanup;
}
if (record->has_ring_buffer_snd_queue &&
kcp_session_stats_appendf(&line, &line_len, ",\"ring_buffer_snd_queue\":%" PRIu64, record->ring_buffer_snd_queue) != 0) {
goto cleanup;
}
if (record->has_ring_buffer_rcv_queue &&
kcp_session_stats_appendf(&line, &line_len, ",\"ring_buffer_rcv_queue\":%" PRIu64, record->ring_buffer_rcv_queue) != 0) {
goto cleanup;
}
if (record->has_ring_buffer_snd_buffer &&
kcp_session_stats_appendf(&line, &line_len, ",\"ring_buffer_snd_buffer\":%" PRIu64, record->ring_buffer_snd_buffer) != 0) {
goto cleanup;
}
if (record->has_curr_estab &&
kcp_session_stats_appendf(&line, &line_len, ",\"curr_estab\":%" PRIu64, record->curr_estab) != 0) {
goto cleanup;
}
if (kcp_session_stats_append(&line, &line_len, "}") != 0) {
goto cleanup;
}
free(record_type);
free(node_role);
free(node_id);
free(local_addr);
free(remote_addr);
free(sample_reason);
if (omni_file_logger_write_line(&logger->file_logger, line) != 0) {
free(line);
return -1;
}
free(line);
return 0;
cleanup:
free(record_type);
free(node_role);
free(node_id);
free(local_addr);
free(remote_addr);
free(sample_reason);
free(line);
return -1;
}

130
src/latencylog.c Normal file
View File

@@ -0,0 +1,130 @@
#include "latencylog.h"
static void latencylog_fill_event(latency_event_t *event, const char *node_role, const char *node_id, const char *event_name, int64_t ts_unix_nano, const message_t *msg) {
memset(event, 0, sizeof(*event));
event->ts_unix_nano = ts_unix_nano;
snprintf(event->node_role, sizeof(event->node_role), "%s", node_role == NULL ? "" : node_role);
snprintf(event->node_id, sizeof(event->node_id), "%s", node_id == NULL ? "" : node_id);
snprintf(event->event, sizeof(event->event), "%s", event_name == NULL ? "" : event_name);
event->message_type = msg->type;
event->message_id = msg->id;
snprintf(event->from, sizeof(event->from), "%s", msg->from);
snprintf(event->to, sizeof(event->to), "%s", msg->to);
snprintf(event->file_name, sizeof(event->file_name), "%s", msg->file_name);
event->body_size = (int) msg->body_len;
}
latency_logger_t *latencylog_open_jsonl(const char *path) {
latency_logger_t *logger;
FILE *file;
if (path == NULL || path[0] == '\0') {
return NULL;
}
if (omni_ensure_parent_dir(path) != 0) {
return NULL;
}
file = fopen(path, "ab");
if (file == NULL) {
return NULL;
}
logger = (latency_logger_t *) calloc(1, sizeof(*logger));
if (logger == NULL) {
fclose(file);
return NULL;
}
omni_file_logger_init(&logger->file_logger, file);
logger->enabled = 1;
return logger;
}
void latencylog_close(latency_logger_t *logger) {
if (logger == NULL) {
return;
}
if (logger->file_logger.file != NULL) {
fclose(logger->file_logger.file);
}
omni_file_logger_destroy(&logger->file_logger);
free(logger);
}
int latencylog_log_event(latency_logger_t *logger, const latency_event_t *event) {
char *node_role = NULL;
char *node_id = NULL;
char *event_name = NULL;
char *from = NULL;
char *to = NULL;
char *file_name = NULL;
char *line = NULL;
if (logger == NULL || event == NULL || !logger->enabled) {
return 0;
}
node_role = omni_json_escape(event->node_role);
node_id = omni_json_escape(event->node_id);
event_name = omni_json_escape(event->event);
from = omni_json_escape(event->from);
to = omni_json_escape(event->to);
file_name = omni_json_escape(event->file_name);
if (node_role == NULL || node_id == NULL || event_name == NULL || from == NULL || to == NULL || file_name == NULL) {
free(node_role);
free(node_id);
free(event_name);
free(from);
free(to);
free(file_name);
return -1;
}
line = omni_strdup_printf(
"{\"ts_unix_nano\":%" PRId64 ",\"node_role\":\"%s\",\"node_id\":\"%s\",\"event\":\"%s\",\"message_type\":\"%s\",\"message_id\":%" PRIu64 ",\"from\":\"%s\",\"to\":\"%s\",\"file_name\":\"%s\",\"body_size\":%d}",
event->ts_unix_nano,
node_role,
node_id,
event_name,
protocol_message_type_name(event->message_type),
event->message_id,
from,
to,
file_name,
event->body_size
);
free(node_role);
free(node_id);
free(event_name);
free(from);
free(to);
free(file_name);
if (line == NULL) {
return -1;
}
if (omni_file_logger_write_line(&logger->file_logger, line) != 0) {
free(line);
return -1;
}
free(line);
return 0;
}
int latencylog_is_business_message(const message_t *msg) {
if (msg == NULL) {
return 0;
}
return msg->type == MSG_TYPE_TEXT || msg->type == MSG_TYPE_FILE;
}
void latencylog_log_message_event(latency_logger_t *logger, const char *node_role, const char *node_id, const char *event_name, const message_t *msg) {
latencylog_log_message_event_at(logger, node_role, node_id, event_name, omni_now_unix_nano(), msg);
}
void latencylog_log_message_event_at(latency_logger_t *logger, const char *node_role, const char *node_id, const char *event_name, int64_t ts_unix_nano, const message_t *msg) {
latency_event_t event;
if (!latencylog_is_business_message(msg)) {
return;
}
latencylog_fill_event(&event, node_role, node_id, event_name, ts_unix_nano, msg);
(void) latencylog_log_event(logger, &event);
}

103
src/linux_timestamping.c Normal file
View File

@@ -0,0 +1,103 @@
#include "linux_timestamping.h"
#include "latencylog.h"
#ifdef __linux__
#include <linux/errqueue.h>
#include <linux/net_tstamp.h>
#include <netinet/in.h>
#include <string.h>
static int64_t linux_timespec_to_ns(const struct timespec *ts) {
if (ts == NULL) {
return 0;
}
return (int64_t) ts->tv_sec * 1000000000LL + ts->tv_nsec;
}
int linux_timestamping_enable_udp_socket(int fd, int enable_rx) {
int flags = SOF_TIMESTAMPING_SOFTWARE | SOF_TIMESTAMPING_TX_SCHED | SOF_TIMESTAMPING_TX_SOFTWARE | SOF_TIMESTAMPING_OPT_ID | SOF_TIMESTAMPING_OPT_TSONLY;
if (enable_rx) {
flags |= SOF_TIMESTAMPING_RX_SOFTWARE;
}
return setsockopt(fd, SOL_SOCKET, SO_TIMESTAMPING, &flags, sizeof(flags));
}
int64_t linux_timestamping_parse_rx_timestamp(const struct msghdr *msg) {
struct cmsghdr *cmsg;
const struct scm_timestamping *timestamps;
if (msg == NULL) {
return 0;
}
for (cmsg = CMSG_FIRSTHDR((struct msghdr *) msg); cmsg != NULL; cmsg = CMSG_NXTHDR((struct msghdr *) msg, cmsg)) {
if (cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SCM_TIMESTAMPING) {
timestamps = (const struct scm_timestamping *) CMSG_DATA(cmsg);
if (timestamps->ts[0].tv_sec != 0 || timestamps->ts[0].tv_nsec != 0) {
return linux_timespec_to_ns(&timestamps->ts[0]);
}
}
}
return 0;
}
int linux_timestamping_parse_tx_timestamp(const struct msghdr *msg, omni_tx_timestamp_event_t *out_event) {
struct cmsghdr *cmsg;
const struct scm_timestamping *timestamps = NULL;
const struct sock_extended_err *sock_err = NULL;
int64_t timestamp_ns = 0;
if (msg == NULL || out_event == NULL) {
errno = EINVAL;
return -1;
}
memset(out_event, 0, sizeof(*out_event));
for (cmsg = CMSG_FIRSTHDR((struct msghdr *) msg); cmsg != NULL; cmsg = CMSG_NXTHDR((struct msghdr *) msg, cmsg)) {
if (cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SCM_TIMESTAMPING) {
timestamps = (const struct scm_timestamping *) CMSG_DATA(cmsg);
} else if ((cmsg->cmsg_level == SOL_IP && cmsg->cmsg_type == IP_RECVERR) ||
(cmsg->cmsg_level == SOL_IPV6 && cmsg->cmsg_type == IPV6_RECVERR)) {
sock_err = (const struct sock_extended_err *) CMSG_DATA(cmsg);
}
}
if (timestamps == NULL || sock_err == NULL) {
errno = EAGAIN;
return -1;
}
if (timestamps->ts[0].tv_sec != 0 || timestamps->ts[0].tv_nsec != 0) {
timestamp_ns = linux_timespec_to_ns(&timestamps->ts[0]);
snprintf(out_event->event_name, sizeof(out_event->event_name), "%s", EVENT_A_TX_SOFTWARE);
} else if (timestamps->ts[1].tv_sec != 0 || timestamps->ts[1].tv_nsec != 0) {
timestamp_ns = linux_timespec_to_ns(&timestamps->ts[1]);
snprintf(out_event->event_name, sizeof(out_event->event_name), "%s", EVENT_A_TX_SCHED);
} else {
errno = EAGAIN;
return -1;
}
out_event->ts_unix_nano = timestamp_ns;
out_event->ee_info = sock_err->ee_info;
out_event->ee_data = sock_err->ee_data;
return 0;
}
#else
int linux_timestamping_enable_udp_socket(int fd, int enable_rx) {
(void) fd;
(void) enable_rx;
errno = ENOTSUP;
return -1;
}
int64_t linux_timestamping_parse_rx_timestamp(const struct msghdr *msg) {
(void) msg;
return 0;
}
int linux_timestamping_parse_tx_timestamp(const struct msghdr *msg, omni_tx_timestamp_event_t *out_event) {
(void) msg;
(void) out_event;
errno = ENOTSUP;
return -1;
}
#endif

568
src/omni_common.c Normal file
View File

@@ -0,0 +1,568 @@
#include "omni_common.h"
#include <ctype.h>
#include <fcntl.h>
#include <limits.h>
#include <netdb.h>
#include <unistd.h>
int64_t omni_now_unix_nano(void) {
struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
return (int64_t) ts.tv_sec * 1000000000LL + ts.tv_nsec;
}
uint32_t omni_now_millis32(void) {
struct timespec ts;
uint64_t ms;
clock_gettime(CLOCK_MONOTONIC, &ts);
ms = (uint64_t) ts.tv_sec * 1000ULL + (uint64_t) (ts.tv_nsec / 1000000L);
return (uint32_t) (ms & 0xffffffffu);
}
int omni_set_nonblocking(int fd, int enabled) {
int flags = fcntl(fd, F_GETFL, 0);
if (flags < 0) {
return -1;
}
if (enabled) {
flags |= O_NONBLOCK;
} else {
flags &= ~O_NONBLOCK;
}
return fcntl(fd, F_SETFL, flags);
}
int omni_parse_sockaddr(const char *raw, int passive, struct sockaddr_storage *addr, socklen_t *addr_len, int *family_out) {
struct addrinfo hints;
struct addrinfo *result = NULL;
char host_copy[OMNI_MAX_ADDR_TEXT];
char port_copy[32];
const char *host = NULL;
const char *service = NULL;
const char *last_colon;
size_t host_len;
if (raw == NULL || addr == NULL || addr_len == NULL) {
errno = EINVAL;
return -1;
}
memset(&hints, 0, sizeof(hints));
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_DGRAM;
hints.ai_flags = passive ? AI_PASSIVE : 0;
last_colon = strrchr(raw, ':');
if (last_colon == NULL) {
host = passive ? NULL : raw;
service = passive ? raw : "0";
} else {
host_len = (size_t) (last_colon - raw);
if (host_len >= sizeof(host_copy)) {
errno = ENAMETOOLONG;
return -1;
}
memcpy(host_copy, raw, host_len);
host_copy[host_len] = '\0';
snprintf(port_copy, sizeof(port_copy), "%s", last_colon + 1);
host = host_len == 0 ? NULL : host_copy;
service = port_copy;
}
if (getaddrinfo(host, service, &hints, &result) != 0 || result == NULL) {
errno = EINVAL;
return -1;
}
memcpy(addr, result->ai_addr, result->ai_addrlen);
*addr_len = (socklen_t) result->ai_addrlen;
if (family_out != NULL) {
*family_out = result->ai_family;
}
freeaddrinfo(result);
return 0;
}
int omni_clone_sockaddr(const struct sockaddr *src, socklen_t src_len, struct sockaddr_storage *dst, socklen_t *dst_len) {
if (src == NULL || dst == NULL || dst_len == NULL || src_len > sizeof(*dst)) {
errno = EINVAL;
return -1;
}
memset(dst, 0, sizeof(*dst));
memcpy(dst, src, src_len);
*dst_len = src_len;
return 0;
}
const char *omni_sockaddr_to_string(const struct sockaddr *addr, socklen_t addr_len, char *buffer, size_t buffer_len) {
char host[NI_MAXHOST];
char service[NI_MAXSERV];
if (buffer == NULL || buffer_len == 0) {
return "";
}
if (addr == NULL) {
snprintf(buffer, buffer_len, "<nil>");
return buffer;
}
if (getnameinfo(addr, addr_len, host, sizeof(host), service, sizeof(service), NI_NUMERICHOST | NI_NUMERICSERV) != 0) {
snprintf(buffer, buffer_len, "<addr>");
return buffer;
}
if (addr->sa_family == AF_INET6) {
snprintf(buffer, buffer_len, "[%s]:%s", host, service);
} else {
snprintf(buffer, buffer_len, "%s:%s", host, service);
}
return buffer;
}
int omni_bind_device(int fd, const char *device) {
#ifdef __linux__
if (device == NULL || device[0] == '\0') {
return 0;
}
return setsockopt(fd, SOL_SOCKET, SO_BINDTODEVICE, device, (socklen_t) strlen(device));
#else
(void) fd;
(void) device;
errno = ENOTSUP;
return -1;
#endif
}
static int omni_mkdir_single(const char *path) {
if (mkdir(path, 0755) == 0 || errno == EEXIST) {
return 0;
}
return -1;
}
int omni_ensure_dir(const char *path) {
char tmp[PATH_MAX];
size_t i;
if (path == NULL || path[0] == '\0') {
return 0;
}
if (strlen(path) >= sizeof(tmp)) {
errno = ENAMETOOLONG;
return -1;
}
snprintf(tmp, sizeof(tmp), "%s", path);
for (i = 1; tmp[i] != '\0'; ++i) {
if (tmp[i] == '/') {
tmp[i] = '\0';
if (tmp[0] != '\0' && omni_mkdir_single(tmp) != 0) {
return -1;
}
tmp[i] = '/';
}
}
return omni_mkdir_single(tmp);
}
int omni_ensure_parent_dir(const char *path) {
char tmp[PATH_MAX];
char *slash;
if (path == NULL || path[0] == '\0') {
return 0;
}
if (strlen(path) >= sizeof(tmp)) {
errno = ENAMETOOLONG;
return -1;
}
snprintf(tmp, sizeof(tmp), "%s", path);
slash = strrchr(tmp, '/');
if (slash == NULL) {
return 0;
}
if (slash == tmp) {
return omni_mkdir_single("/");
}
*slash = '\0';
return omni_ensure_dir(tmp);
}
int omni_read_file(const char *path, uint8_t **out, size_t *out_len) {
FILE *file;
long size;
uint8_t *buffer;
if (out == NULL || out_len == NULL) {
errno = EINVAL;
return -1;
}
*out = NULL;
*out_len = 0;
file = fopen(path, "rb");
if (file == NULL) {
return -1;
}
if (fseek(file, 0, SEEK_END) != 0) {
fclose(file);
return -1;
}
size = ftell(file);
if (size < 0) {
fclose(file);
return -1;
}
if (fseek(file, 0, SEEK_SET) != 0) {
fclose(file);
return -1;
}
buffer = (uint8_t *) malloc((size_t) size);
if (size > 0 && buffer == NULL) {
fclose(file);
errno = ENOMEM;
return -1;
}
if ((size_t) size > 0 && fread(buffer, 1, (size_t) size, file) != (size_t) size) {
free(buffer);
fclose(file);
errno = EIO;
return -1;
}
fclose(file);
*out = buffer;
*out_len = (size_t) size;
return 0;
}
int omni_write_full_fd(int fd, const uint8_t *data, size_t len) {
ssize_t written;
while (len > 0) {
written = write(fd, data, len);
if (written < 0) {
if (errno == EINTR) {
continue;
}
return -1;
}
if (written == 0) {
errno = EIO;
return -1;
}
data += written;
len -= (size_t) written;
}
return 0;
}
static int omni_write_file_internal(const char *path, const uint8_t *data, size_t len, const char *mode) {
FILE *file;
if (omni_ensure_parent_dir(path) != 0) {
return -1;
}
file = fopen(path, mode);
if (file == NULL) {
return -1;
}
if (len > 0 && fwrite(data, 1, len, file) != len) {
fclose(file);
errno = EIO;
return -1;
}
if (fclose(file) != 0) {
return -1;
}
return 0;
}
int omni_append_file(const char *path, const uint8_t *data, size_t len) {
return omni_write_file_internal(path, data, len, "ab");
}
int omni_write_file(const char *path, const uint8_t *data, size_t len) {
return omni_write_file_internal(path, data, len, "wb");
}
int omni_random_u32(uint32_t *out) {
uint8_t *cursor;
size_t remaining;
int fd;
if (out == NULL) {
errno = EINVAL;
return -1;
}
fd = open("/dev/urandom", O_RDONLY);
if (fd < 0) {
return -1;
}
cursor = (uint8_t *) out;
remaining = sizeof(*out);
while (remaining > 0) {
ssize_t n = read(fd, cursor, remaining);
if (n < 0) {
if (errno == EINTR) {
continue;
}
close(fd);
return -1;
}
if (n == 0) {
close(fd);
errno = EIO;
return -1;
}
cursor += n;
remaining -= (size_t) n;
}
close(fd);
if (*out == 0) {
*out = 1;
}
return 0;
}
char *omni_strdup(const char *src) {
size_t len;
char *dst;
if (src == NULL) {
return NULL;
}
len = strlen(src);
dst = (char *) malloc(len + 1U);
if (dst == NULL) {
return NULL;
}
memcpy(dst, src, len + 1U);
return dst;
}
char *omni_strdup_printf(const char *fmt, ...) {
va_list args;
va_list copy;
int needed;
char *buffer;
va_start(args, fmt);
va_copy(copy, args);
needed = vsnprintf(NULL, 0, fmt, copy);
va_end(copy);
if (needed < 0) {
va_end(args);
return NULL;
}
buffer = (char *) malloc((size_t) needed + 1U);
if (buffer == NULL) {
va_end(args);
return NULL;
}
vsnprintf(buffer, (size_t) needed + 1U, fmt, args);
va_end(args);
return buffer;
}
char *omni_json_escape_bytes(const uint8_t *src, size_t len) {
size_t i;
size_t out_len = 0;
char *out;
char *cursor;
if (src == NULL) {
if (len == 0) {
return omni_strdup("");
}
errno = EINVAL;
return NULL;
}
for (i = 0; i < len; ++i) {
switch (src[i]) {
case '\\':
case '"':
case '\b':
case '\f':
case '\n':
case '\r':
case '\t':
out_len += 2;
break;
default:
out_len += src[i] < 0x20 ? 6U : 1U;
break;
}
}
out = (char *) malloc(out_len + 1U);
if (out == NULL) {
return NULL;
}
cursor = out;
for (i = 0; i < len; ++i) {
switch (src[i]) {
case '\\':
*cursor++ = '\\';
*cursor++ = '\\';
break;
case '"':
*cursor++ = '\\';
*cursor++ = '"';
break;
case '\b':
*cursor++ = '\\';
*cursor++ = 'b';
break;
case '\f':
*cursor++ = '\\';
*cursor++ = 'f';
break;
case '\n':
*cursor++ = '\\';
*cursor++ = 'n';
break;
case '\r':
*cursor++ = '\\';
*cursor++ = 'r';
break;
case '\t':
*cursor++ = '\\';
*cursor++ = 't';
break;
default:
if (src[i] < 0x20) {
snprintf(cursor, 7, "\\u%04x", src[i]);
cursor += 6;
} else {
*cursor++ = (char) src[i];
}
break;
}
}
*cursor = '\0';
return out;
}
char *omni_json_escape(const char *src) {
if (src == NULL) {
return omni_strdup("");
}
return omni_json_escape_bytes((const uint8_t *) src, strlen(src));
}
int omni_utf8_valid(const uint8_t *data, size_t len) {
size_t i = 0;
uint8_t c;
while (i < len) {
c = data[i];
if (c <= 0x7f) {
i++;
continue;
}
if ((c & 0xe0) == 0xc0) {
if (i + 1 >= len || (data[i + 1] & 0xc0) != 0x80 || c < 0xc2) {
return 0;
}
i += 2;
continue;
}
if ((c & 0xf0) == 0xe0) {
if (i + 2 >= len || (data[i + 1] & 0xc0) != 0x80 || (data[i + 2] & 0xc0) != 0x80) {
return 0;
}
if (c == 0xe0 && data[i + 1] < 0xa0) {
return 0;
}
if (c == 0xed && data[i + 1] >= 0xa0) {
return 0;
}
i += 3;
continue;
}
if ((c & 0xf8) == 0xf0) {
if (i + 3 >= len || (data[i + 1] & 0xc0) != 0x80 || (data[i + 2] & 0xc0) != 0x80 || (data[i + 3] & 0xc0) != 0x80) {
return 0;
}
if (c == 0xf0 && data[i + 1] < 0x90) {
return 0;
}
if (c > 0xf4 || (c == 0xf4 && data[i + 1] >= 0x90)) {
return 0;
}
i += 4;
continue;
}
return 0;
}
return 1;
}
void omni_trim_newline(char *line) {
size_t len;
if (line == NULL) {
return;
}
len = strlen(line);
while (len > 0 && (line[len - 1] == '\n' || line[len - 1] == '\r')) {
line[--len] = '\0';
}
}
int omni_parse_duration_ms(const char *raw, int default_ms, int *out_ms) {
char *endptr;
long value;
if (out_ms == NULL) {
errno = EINVAL;
return -1;
}
if (raw == NULL || raw[0] == '\0') {
*out_ms = default_ms;
return 0;
}
value = strtol(raw, &endptr, 10);
if (endptr == raw || value <= 0) {
errno = EINVAL;
return -1;
}
if (*endptr == '\0' || strcmp(endptr, "ms") == 0) {
*out_ms = (int) value;
return 0;
}
if (strcmp(endptr, "s") == 0) {
*out_ms = (int) (value * 1000L);
return 0;
}
errno = EINVAL;
return -1;
}
double omni_duration_ms_to_ns(double ms) {
return ms * 1000000.0;
}
const char *omni_path_base_name(const char *path) {
const char *slash;
if (path == NULL) {
return "";
}
slash = strrchr(path, '/');
return slash == NULL ? path : slash + 1;
}
void omni_file_logger_init(omni_file_logger_t *logger, FILE *file) {
logger->file = file;
pthread_mutex_init(&logger->mutex, NULL);
}
void omni_file_logger_destroy(omni_file_logger_t *logger) {
pthread_mutex_destroy(&logger->mutex);
}
int omni_file_logger_write_line(omni_file_logger_t *logger, const char *line) {
int rc = 0;
if (logger == NULL || logger->file == NULL || line == NULL) {
errno = EINVAL;
return -1;
}
pthread_mutex_lock(&logger->mutex);
if (fputs(line, logger->file) == EOF || fputc('\n', logger->file) == EOF || fflush(logger->file) != 0) {
rc = -1;
}
pthread_mutex_unlock(&logger->mutex);
return rc;
}

204
src/peer_kcp_client.c Normal file
View File

@@ -0,0 +1,204 @@
#include "peer_kcp_client.h"
#include <pthread.h>
#include <string.h>
struct kcp_client {
char id[OMNI_MAX_PEER_ID];
char server_addr[OMNI_MAX_ADDR_TEXT];
kcp_conn_t *conn;
latency_logger_t *logger;
pthread_mutex_t id_mu;
uint64_t next_message_id;
};
static int kcp_client_next_message_id(kcp_client_t *client, uint64_t *out_id) {
pthread_mutex_lock(&client->id_mu);
*out_id = ++client->next_message_id;
pthread_mutex_unlock(&client->id_mu);
return 0;
}
static int kcp_client_persist_message_to_disk(const message_t *msg, const char *inbox_dir, char *out_path, size_t out_path_len) {
char path[512];
if (omni_ensure_dir(inbox_dir) != 0) {
return -1;
}
if (msg->type == MSG_TYPE_TEXT) {
char *body = omni_json_escape_bytes(msg->body, msg->body_len);
char *from = omni_json_escape(msg->from);
char *to = omni_json_escape(msg->to);
char *line;
if (body == NULL || from == NULL || to == NULL) {
free(body);
free(from);
free(to);
return -1;
}
snprintf(path, sizeof(path), "%s/messages.log", inbox_dir);
line = omni_strdup_printf(
"{\"message_type\":\"%s\",\"message_id\":%" PRIu64 ",\"from\":\"%s\",\"to\":\"%s\",\"body\":\"%s\"}\n",
protocol_message_type_name(msg->type),
msg->id,
from,
to,
body
);
free(body);
free(from);
free(to);
if (line == NULL) {
return -1;
}
if (omni_append_file(path, (const uint8_t *) line, strlen(line)) != 0) {
free(line);
return -1;
}
free(line);
} else if (msg->type == MSG_TYPE_FILE) {
const char *file_name = omni_path_base_name(msg->file_name);
if (file_name[0] == '\0') {
file_name = "unnamed";
}
snprintf(path, sizeof(path), "%s/%s-%" PRIu64 "-%s", inbox_dir, msg->from, msg->id, file_name);
if (omni_write_file(path, msg->body, msg->body_len) != 0) {
return -1;
}
} else {
errno = EINVAL;
return -1;
}
if (out_path != NULL && out_path_len > 0) {
snprintf(out_path, out_path_len, "%s", path);
}
return 0;
}
kcp_client_t *kcp_client_dial(const char *server_addr, const char *dial_addr, const char *peer_id, const char *bind_ip, const char *bind_device, latency_logger_t *logger, kcp_packet_debug_logger_t *packet_logger, kcp_session_stats_logger_t *stats_logger, int stats_interval_ms) {
kcp_client_t *client;
const char *actual_dial_addr = (dial_addr != NULL && dial_addr[0] != '\0') ? dial_addr : server_addr;
message_t register_msg;
int saved_errno = 0;
client = (kcp_client_t *) calloc(1, sizeof(*client));
if (client == NULL) {
return NULL;
}
snprintf(client->id, sizeof(client->id), "%s", peer_id);
snprintf(client->server_addr, sizeof(client->server_addr), "%s", server_addr == NULL ? "" : server_addr);
pthread_mutex_init(&client->id_mu, NULL);
client->logger = logger;
client->conn = kcp_conn_dial(actual_dial_addr, bind_ip, bind_device, packet_logger, logger, OMNI_NODE_ROLE_PEER, peer_id, stats_logger, stats_interval_ms);
if (client->conn == NULL) {
saved_errno = errno;
kcp_client_free(client);
errno = saved_errno;
return NULL;
}
protocol_message_init(&register_msg);
register_msg.type = MSG_TYPE_REGISTER;
register_msg.id = 0;
snprintf(register_msg.from, sizeof(register_msg.from), "%s", peer_id);
snprintf(register_msg.to, sizeof(register_msg.to), "%s", SERVER_PEER_ID);
if (kcp_conn_send(client->conn, &register_msg) != 0) {
saved_errno = errno;
kcp_client_free(client);
errno = saved_errno;
return NULL;
}
return client;
}
const char *kcp_client_id(const kcp_client_t *client) {
return client == NULL ? "" : client->id;
}
int kcp_client_send_text(kcp_client_t *client, const char *to, const char *text) {
message_t msg;
uint64_t id;
protocol_message_init(&msg);
kcp_client_next_message_id(client, &id);
msg.type = MSG_TYPE_TEXT;
msg.id = id;
snprintf(msg.from, sizeof(msg.from), "%s", client->id);
snprintf(msg.to, sizeof(msg.to), "%s", to);
msg.body = (uint8_t *) omni_strdup(text);
if (msg.body == NULL) {
return -1;
}
msg.body_len = strlen((const char *) msg.body);
latencylog_log_message_event(client->logger, OMNI_NODE_ROLE_PEER, client->id, EVENT_A_APP_PREP_BEGIN, &msg);
if (kcp_conn_send(client->conn, &msg) != 0) {
protocol_message_clear(&msg);
return -1;
}
protocol_message_clear(&msg);
return 0;
}
int kcp_client_send_file_path(kcp_client_t *client, const char *to, const char *path) {
message_t msg;
uint64_t id;
uint8_t *body = NULL;
size_t body_len = 0;
const char *base_name = strrchr(path, '/');
if (omni_read_file(path, &body, &body_len) != 0) {
return -1;
}
protocol_message_init(&msg);
kcp_client_next_message_id(client, &id);
msg.type = MSG_TYPE_FILE;
msg.id = id;
snprintf(msg.from, sizeof(msg.from), "%s", client->id);
snprintf(msg.to, sizeof(msg.to), "%s", to);
snprintf(msg.file_name, sizeof(msg.file_name), "%s", base_name == NULL ? path : base_name + 1);
msg.body = body;
msg.body_len = body_len;
latencylog_log_message_event(client->logger, OMNI_NODE_ROLE_PEER, client->id, EVENT_A_APP_PREP_BEGIN, &msg);
if (kcp_conn_send(client->conn, &msg) != 0) {
protocol_message_clear(&msg);
return -1;
}
protocol_message_clear(&msg);
return 0;
}
int kcp_client_receive(kcp_client_t *client, message_t *out_msg) {
if (kcp_conn_receive(client->conn, out_msg) != 0) {
return -1;
}
latencylog_log_message_event(client->logger, OMNI_NODE_ROLE_PEER, client->id, EVENT_B_APP_RECV, out_msg);
return 0;
}
int kcp_client_persist_message(kcp_client_t *client, const message_t *msg, const char *inbox_dir, char *out_path, size_t out_path_len) {
if (!latencylog_is_business_message(msg)) {
errno = EINVAL;
return -1;
}
latencylog_log_message_event(client->logger, OMNI_NODE_ROLE_PEER, client->id, EVENT_B_PERSIST_BEGIN, msg);
if (kcp_client_persist_message_to_disk(msg, inbox_dir, out_path, out_path_len) != 0) {
return -1;
}
latencylog_log_message_event(client->logger, OMNI_NODE_ROLE_PEER, client->id, EVENT_B_PERSIST_END, msg);
return 0;
}
int kcp_client_close(kcp_client_t *client) {
return client == NULL ? 0 : kcp_conn_close(client->conn);
}
void kcp_client_free(kcp_client_t *client) {
if (client == NULL) {
return;
}
kcp_conn_free(client->conn);
pthread_mutex_destroy(&client->id_mu);
free(client);
}

181
src/peer_udp_client.c Normal file
View File

@@ -0,0 +1,181 @@
#include "peer_udp_client.h"
#include <pthread.h>
struct udp_client {
char id[OMNI_MAX_PEER_ID];
udp_conn_t *conn;
latency_logger_t *logger;
pthread_mutex_t id_mu;
uint64_t next_message_id;
};
static int client_next_message_id(udp_client_t *client, uint64_t *out_id) {
pthread_mutex_lock(&client->id_mu);
*out_id = ++client->next_message_id;
pthread_mutex_unlock(&client->id_mu);
return 0;
}
static int client_persist_message_to_disk(const message_t *msg, const char *inbox_dir, char *out_path, size_t out_path_len) {
char path[512];
if (omni_ensure_dir(inbox_dir) != 0) {
return -1;
}
if (msg->type == MSG_TYPE_TEXT) {
char *body = omni_json_escape_bytes(msg->body, msg->body_len);
char *from = omni_json_escape(msg->from);
char *to = omni_json_escape(msg->to);
char *line;
if (body == NULL || from == NULL || to == NULL) {
free(body);
free(from);
free(to);
return -1;
}
snprintf(path, sizeof(path), "%s/messages.log", inbox_dir);
line = omni_strdup_printf("{\"message_type\":\"%s\",\"message_id\":%" PRIu64 ",\"from\":\"%s\",\"to\":\"%s\",\"body\":\"%s\"}\n", protocol_message_type_name(msg->type), msg->id, from, to, body);
free(body);
free(from);
free(to);
if (line == NULL) {
return -1;
}
if (omni_append_file(path, (const uint8_t *) line, strlen(line)) != 0) {
free(line);
return -1;
}
free(line);
} else if (msg->type == MSG_TYPE_FILE) {
const char *file_name = omni_path_base_name(msg->file_name);
if (file_name[0] == '\0') {
file_name = "unnamed";
}
snprintf(path, sizeof(path), "%s/%s-%" PRIu64 "-%s", inbox_dir, msg->from, msg->id, file_name);
if (omni_write_file(path, msg->body, msg->body_len) != 0) {
return -1;
}
} else {
errno = EINVAL;
return -1;
}
if (out_path != NULL && out_path_len > 0) {
snprintf(out_path, out_path_len, "%s", path);
}
return 0;
}
udp_client_t *udp_client_dial(const char *server_addr, const char *peer_id, const char *bind_ip, latency_logger_t *logger, tx_timestamp_debug_logger_t *debug_logger, int enable_timestamping) {
udp_client_t *client;
message_t register_msg;
client = (udp_client_t *) calloc(1, sizeof(*client));
if (client == NULL) {
return NULL;
}
snprintf(client->id, sizeof(client->id), "%s", peer_id);
pthread_mutex_init(&client->id_mu, NULL);
client->logger = logger;
client->conn = udp_conn_dial(server_addr, bind_ip, NULL, enable_timestamping, logger, OMNI_NODE_ROLE_PEER, peer_id, debug_logger);
if (client->conn == NULL) {
udp_client_free(client);
return NULL;
}
protocol_message_init(&register_msg);
register_msg.type = MSG_TYPE_REGISTER;
register_msg.id = 0;
snprintf(register_msg.from, sizeof(register_msg.from), "%s", peer_id);
snprintf(register_msg.to, sizeof(register_msg.to), "%s", SERVER_PEER_ID);
if (udp_conn_send(client->conn, &register_msg) != 0) {
udp_client_free(client);
return NULL;
}
return client;
}
const char *udp_client_id(const udp_client_t *client) {
return client == NULL ? "" : client->id;
}
int udp_client_send_text(udp_client_t *client, const char *to, const char *text) {
message_t msg;
uint64_t id;
protocol_message_init(&msg);
client_next_message_id(client, &id);
msg.type = MSG_TYPE_TEXT;
msg.id = id;
snprintf(msg.from, sizeof(msg.from), "%s", client->id);
snprintf(msg.to, sizeof(msg.to), "%s", to);
msg.body = (uint8_t *) omni_strdup(text);
if (msg.body == NULL) {
return -1;
}
msg.body_len = strlen((const char *) msg.body);
latencylog_log_message_event(client->logger, OMNI_NODE_ROLE_PEER, client->id, EVENT_A_APP_PREP_BEGIN, &msg);
if (udp_conn_send(client->conn, &msg) != 0) {
protocol_message_clear(&msg);
return -1;
}
protocol_message_clear(&msg);
return 0;
}
int udp_client_send_file_path(udp_client_t *client, const char *to, const char *path) {
message_t msg;
uint64_t id;
uint8_t *body = NULL;
size_t body_len = 0;
const char *base_name = strrchr(path, '/');
if (omni_read_file(path, &body, &body_len) != 0) {
return -1;
}
protocol_message_init(&msg);
client_next_message_id(client, &id);
msg.type = MSG_TYPE_FILE;
msg.id = id;
snprintf(msg.from, sizeof(msg.from), "%s", client->id);
snprintf(msg.to, sizeof(msg.to), "%s", to);
snprintf(msg.file_name, sizeof(msg.file_name), "%s", base_name == NULL ? path : base_name + 1);
msg.body = body;
msg.body_len = body_len;
latencylog_log_message_event(client->logger, OMNI_NODE_ROLE_PEER, client->id, EVENT_A_APP_PREP_BEGIN, &msg);
if (udp_conn_send(client->conn, &msg) != 0) {
protocol_message_clear(&msg);
return -1;
}
protocol_message_clear(&msg);
return 0;
}
int udp_client_receive(udp_client_t *client, message_t *out_msg) {
if (udp_conn_receive(client->conn, out_msg, NULL, NULL) != 0) {
return -1;
}
latencylog_log_message_event(client->logger, OMNI_NODE_ROLE_PEER, client->id, EVENT_B_APP_RECV, out_msg);
return 0;
}
int udp_client_persist_message(udp_client_t *client, const message_t *msg, const char *inbox_dir, char *out_path, size_t out_path_len) {
if (!latencylog_is_business_message(msg)) {
errno = EINVAL;
return -1;
}
latencylog_log_message_event(client->logger, OMNI_NODE_ROLE_PEER, client->id, EVENT_B_PERSIST_BEGIN, msg);
if (client_persist_message_to_disk(msg, inbox_dir, out_path, out_path_len) != 0) {
return -1;
}
latencylog_log_message_event(client->logger, OMNI_NODE_ROLE_PEER, client->id, EVENT_B_PERSIST_END, msg);
return 0;
}
int udp_client_close(udp_client_t *client) {
return client == NULL ? 0 : udp_conn_close(client->conn);
}
void udp_client_free(udp_client_t *client) {
if (client == NULL) {
return;
}
udp_conn_free(client->conn);
pthread_mutex_destroy(&client->id_mu);
free(client);
}

409
src/protocol.c Normal file
View File

@@ -0,0 +1,409 @@
#include "protocol.h"
#include "cJSON.h"
#include <arpa/inet.h>
static const char *protocol_message_type_table[] = {
"text",
"file",
"register",
"error"
};
const char *protocol_message_type_name(message_type_t type) {
if ((int) type < 0 || type >= MSG_TYPE_INVALID) {
return "invalid";
}
return protocol_message_type_table[type];
}
int protocol_message_type_from_name(const char *raw, message_type_t *out) {
size_t i;
if (raw == NULL || out == NULL) {
return -1;
}
for (i = 0; i < OMNI_ARRAY_LEN(protocol_message_type_table); ++i) {
if (strcmp(raw, protocol_message_type_table[i]) == 0) {
*out = (message_type_t) i;
return 0;
}
}
return -1;
}
void protocol_message_init(message_t *msg) {
if (msg == NULL) {
return;
}
memset(msg, 0, sizeof(*msg));
msg->type = MSG_TYPE_INVALID;
}
void protocol_message_clear(message_t *msg) {
if (msg == NULL) {
return;
}
free(msg->body);
protocol_message_init(msg);
}
int protocol_message_copy(message_t *dst, const message_t *src) {
if (dst == NULL || src == NULL) {
errno = EINVAL;
return -1;
}
protocol_message_clear(dst);
memcpy(dst, src, sizeof(*dst));
dst->body = NULL;
if (src->body_len > 0) {
dst->body = (uint8_t *) malloc(src->body_len);
if (dst->body == NULL) {
protocol_message_init(dst);
errno = ENOMEM;
return -1;
}
memcpy(dst->body, src->body, src->body_len);
}
return 0;
}
static int protocol_set_err(char *err, size_t err_len, const char *fmt, ...) {
va_list args;
if (err != NULL && err_len > 0) {
va_start(args, fmt);
vsnprintf(err, err_len, fmt, args);
va_end(args);
}
return -1;
}
int protocol_validate_message(const message_t *msg, char *err, size_t err_len) {
if (msg == NULL) {
return protocol_set_err(err, err_len, "protocol: nil message");
}
if (msg->from[0] == '\0') {
return protocol_set_err(err, err_len, "protocol: missing from");
}
if (msg->to[0] == '\0') {
return protocol_set_err(err, err_len, "protocol: missing to");
}
switch (msg->type) {
case MSG_TYPE_TEXT:
if (msg->file_name[0] != '\0') {
return protocol_set_err(err, err_len, "protocol: unexpected file name");
}
if (!omni_utf8_valid(msg->body, msg->body_len)) {
return protocol_set_err(err, err_len, "protocol: invalid text body");
}
break;
case MSG_TYPE_FILE:
if (msg->file_name[0] == '\0') {
return protocol_set_err(err, err_len, "protocol: missing file name");
}
break;
case MSG_TYPE_REGISTER:
if (strcmp(msg->to, SERVER_PEER_ID) != 0) {
return protocol_set_err(err, err_len, "protocol: invalid register target");
}
if (msg->file_name[0] != '\0') {
return protocol_set_err(err, err_len, "protocol: unexpected file name");
}
if (msg->body_len != 0) {
return protocol_set_err(err, err_len, "protocol: unexpected body");
}
break;
case MSG_TYPE_ERROR:
if (strcmp(msg->from, SERVER_PEER_ID) != 0) {
return protocol_set_err(err, err_len, "protocol: invalid error source");
}
if (msg->file_name[0] != '\0') {
return protocol_set_err(err, err_len, "protocol: unexpected file name");
}
if (!omni_utf8_valid(msg->body, msg->body_len)) {
return protocol_set_err(err, err_len, "protocol: invalid text body");
}
break;
default:
return protocol_set_err(err, err_len, "protocol: invalid message type");
}
return 0;
}
static int protocol_build_header_json(const message_t *msg, char **out_json, size_t *out_len) {
cJSON *root;
char *json;
root = cJSON_CreateObject();
if (root == NULL) {
errno = ENOMEM;
return -1;
}
cJSON_AddStringToObject(root, "type", protocol_message_type_name(msg->type));
cJSON_AddNumberToObject(root, "id", (double) msg->id);
cJSON_AddStringToObject(root, "from", msg->from);
cJSON_AddStringToObject(root, "to", msg->to);
if (msg->file_name[0] != '\0') {
cJSON_AddStringToObject(root, "file_name", msg->file_name);
}
cJSON_AddNumberToObject(root, "content_length", (double) msg->body_len);
json = cJSON_PrintUnformatted(root);
cJSON_Delete(root);
if (json == NULL) {
errno = ENOMEM;
return -1;
}
*out_len = strlen(json);
*out_json = json;
return 0;
}
int protocol_encode_message_datagram(const message_t *msg, uint8_t **out, size_t *out_len) {
uint8_t *buffer;
char *header_json;
size_t header_len;
uint32_t net_header_len;
char err[128];
if (out == NULL || out_len == NULL) {
errno = EINVAL;
return -1;
}
*out = NULL;
*out_len = 0;
if (protocol_validate_message(msg, err, sizeof(err)) != 0) {
errno = EINVAL;
return -1;
}
if (protocol_build_header_json(msg, &header_json, &header_len) != 0) {
return -1;
}
if (4U + header_len + msg->body_len > OMNI_MAX_FRAME_SIZE) {
cJSON_free(header_json);
errno = EMSGSIZE;
return -1;
}
buffer = (uint8_t *) malloc(4U + header_len + msg->body_len);
if (buffer == NULL) {
cJSON_free(header_json);
errno = ENOMEM;
return -1;
}
net_header_len = htonl((uint32_t) header_len);
memcpy(buffer, &net_header_len, 4);
memcpy(buffer + 4, header_json, header_len);
if (msg->body_len > 0) {
memcpy(buffer + 4 + header_len, msg->body, msg->body_len);
}
cJSON_free(header_json);
*out = buffer;
*out_len = 4U + header_len + msg->body_len;
return 0;
}
static int protocol_copy_string_field(char *dst, size_t dst_len, const cJSON *object, const char *field, int required, char *err, size_t err_len) {
const cJSON *item = cJSON_GetObjectItemCaseSensitive(object, field);
if (item == NULL) {
if (required) {
return protocol_set_err(err, err_len, "protocol: missing %s", field);
}
dst[0] = '\0';
return 0;
}
if (!cJSON_IsString(item) || item->valuestring == NULL) {
return protocol_set_err(err, err_len, "protocol: invalid %s", field);
}
snprintf(dst, dst_len, "%s", item->valuestring);
return 0;
}
static int protocol_copy_u64_field(uint64_t *dst, const cJSON *object, const char *field, int required, char *err, size_t err_len) {
const cJSON *item = cJSON_GetObjectItemCaseSensitive(object, field);
if (item == NULL) {
if (required) {
return protocol_set_err(err, err_len, "protocol: missing %s", field);
}
*dst = 0;
return 0;
}
if (!cJSON_IsNumber(item)) {
return protocol_set_err(err, err_len, "protocol: invalid %s", field);
}
*dst = (uint64_t) item->valuedouble;
return 0;
}
int protocol_decode_message_datagram(const uint8_t *data, size_t data_len, message_t *out_msg, char *err, size_t err_len) {
uint32_t net_header_len;
uint32_t header_len;
char *header_text = NULL;
cJSON *header = NULL;
const cJSON *type_item;
uint64_t content_length = 0;
if (data == NULL || out_msg == NULL || data_len < 4U) {
return protocol_set_err(err, err_len, "protocol: invalid datagram");
}
if (data_len > OMNI_MAX_FRAME_SIZE) {
return protocol_set_err(err, err_len, "protocol: frame too large");
}
protocol_message_clear(out_msg);
memcpy(&net_header_len, data, 4);
header_len = ntohl(net_header_len);
if (header_len == 0 || (size_t) header_len > data_len - 4U) {
return protocol_set_err(err, err_len, "protocol: invalid header length");
}
header_text = (char *) malloc((size_t) header_len + 1U);
if (header_text == NULL) {
errno = ENOMEM;
return -1;
}
memcpy(header_text, data + 4, header_len);
header_text[header_len] = '\0';
header = cJSON_Parse(header_text);
free(header_text);
if (header == NULL || !cJSON_IsObject(header)) {
if (header != NULL) {
cJSON_Delete(header);
}
return protocol_set_err(err, err_len, "protocol: invalid header json");
}
type_item = cJSON_GetObjectItemCaseSensitive(header, "type");
if (type_item == NULL || !cJSON_IsString(type_item) || protocol_message_type_from_name(type_item->valuestring, &out_msg->type) != 0) {
cJSON_Delete(header);
return protocol_set_err(err, err_len, "protocol: invalid message type");
}
if (protocol_copy_u64_field(&out_msg->id, header, "id", 1, err, err_len) != 0 ||
protocol_copy_string_field(out_msg->from, sizeof(out_msg->from), header, "from", 1, err, err_len) != 0 ||
protocol_copy_string_field(out_msg->to, sizeof(out_msg->to), header, "to", 1, err, err_len) != 0 ||
protocol_copy_string_field(out_msg->file_name, sizeof(out_msg->file_name), header, "file_name", 0, err, err_len) != 0 ||
protocol_copy_u64_field(&content_length, header, "content_length", 1, err, err_len) != 0) {
cJSON_Delete(header);
protocol_message_clear(out_msg);
return -1;
}
cJSON_Delete(header);
if ((size_t) content_length != data_len - 4U - (size_t) header_len) {
protocol_message_clear(out_msg);
return protocol_set_err(err, err_len, "protocol: invalid content length");
}
out_msg->body_len = (size_t) content_length;
if (out_msg->body_len > 0) {
out_msg->body = (uint8_t *) malloc(out_msg->body_len);
if (out_msg->body == NULL) {
protocol_message_clear(out_msg);
errno = ENOMEM;
return -1;
}
memcpy(out_msg->body, data + 4U + header_len, out_msg->body_len);
}
if (protocol_validate_message(out_msg, err, err_len) != 0) {
protocol_message_clear(out_msg);
return -1;
}
return 0;
}
int protocol_encode_message_stream(const message_t *msg, uint8_t **out, size_t *out_len) {
uint8_t *payload;
uint8_t *buffer;
size_t payload_len;
uint32_t net_len;
if (protocol_encode_message_datagram(msg, &payload, &payload_len) != 0) {
return -1;
}
buffer = (uint8_t *) malloc(payload_len + 4U);
if (buffer == NULL) {
free(payload);
errno = ENOMEM;
return -1;
}
net_len = htonl((uint32_t) payload_len);
memcpy(buffer, &net_len, 4);
memcpy(buffer + 4, payload, payload_len);
free(payload);
*out = buffer;
*out_len = payload_len + 4U;
return 0;
}
int protocol_decode_message_stream_payload(const uint8_t *payload, size_t payload_len, message_t *out_msg, char *err, size_t err_len) {
return protocol_decode_message_datagram(payload, payload_len, out_msg, err, err_len);
}
void protocol_frame_decoder_init(protocol_frame_decoder_t *decoder) {
memset(decoder, 0, sizeof(*decoder));
}
void protocol_frame_decoder_reset(protocol_frame_decoder_t *decoder) {
decoder->len = 0;
}
void protocol_frame_decoder_destroy(protocol_frame_decoder_t *decoder) {
free(decoder->buffer);
memset(decoder, 0, sizeof(*decoder));
}
int protocol_frame_decoder_feed(protocol_frame_decoder_t *decoder, const uint8_t *data, size_t data_len) {
uint8_t *next_buffer;
size_t next_cap;
if (decoder->len + data_len > OMNI_MAX_FRAME_SIZE * 2U) {
errno = EMSGSIZE;
return -1;
}
if (decoder->len + data_len > decoder->cap) {
next_cap = decoder->cap == 0 ? 4096U : decoder->cap;
while (next_cap < decoder->len + data_len) {
next_cap *= 2U;
}
next_buffer = (uint8_t *) realloc(decoder->buffer, next_cap);
if (next_buffer == NULL) {
errno = ENOMEM;
return -1;
}
decoder->buffer = next_buffer;
decoder->cap = next_cap;
}
memcpy(decoder->buffer + decoder->len, data, data_len);
decoder->len += data_len;
return 0;
}
int protocol_frame_decoder_next(protocol_frame_decoder_t *decoder, uint8_t **payload, size_t *payload_len) {
uint32_t net_len;
uint32_t frame_len;
uint8_t *frame;
if (payload == NULL || payload_len == NULL) {
errno = EINVAL;
return -1;
}
*payload = NULL;
*payload_len = 0;
if (decoder->len < 4U) {
return 0;
}
memcpy(&net_len, decoder->buffer, 4);
frame_len = ntohl(net_len);
if (frame_len == 0 || frame_len > OMNI_MAX_FRAME_SIZE) {
errno = EMSGSIZE;
return -1;
}
if (decoder->len < 4U + frame_len) {
return 0;
}
frame = (uint8_t *) malloc(frame_len);
if (frame == NULL) {
errno = ENOMEM;
return -1;
}
memcpy(frame, decoder->buffer + 4, frame_len);
memmove(decoder->buffer, decoder->buffer + 4U + frame_len, decoder->len - 4U - frame_len);
decoder->len -= 4U + frame_len;
*payload = frame;
*payload_len = frame_len;
return 1;
}

562
src/server_kcp_hub.c Normal file
View File

@@ -0,0 +1,562 @@
#include "server_kcp_hub.h"
#include <unistd.h>
#define KCP_RELAY_MAX_DATAGRAM_SIZE (60 * 1024)
typedef struct kcp_peer_entry {
struct kcp_peer_entry *next;
char peer_id[OMNI_MAX_PEER_ID];
kcp_conn_t *conn;
} kcp_peer_entry_t;
typedef struct kcp_session_thread_ctx {
kcp_hub_t *hub;
kcp_conn_t *conn;
} kcp_session_thread_ctx_t;
struct kcp_hub {
pthread_rwlock_t lock;
kcp_peer_entry_t *peers;
latency_logger_t *logger;
kcp_session_stats_logger_t *stats_logger;
int stats_interval_ms;
int relay_fd;
int relay_configured;
int relay_learn_peer;
struct sockaddr_storage relay_peer_addr;
socklen_t relay_peer_addr_len;
int closed;
};
static void kcp_hub_unregister(kcp_hub_t *hub, const char *peer_id, kcp_conn_t *conn) {
kcp_peer_entry_t *prev = NULL;
kcp_peer_entry_t *entry;
if (hub == NULL || peer_id == NULL || peer_id[0] == '\0') {
return;
}
pthread_rwlock_wrlock(&hub->lock);
for (entry = hub->peers; entry != NULL; entry = entry->next) {
if (strcmp(entry->peer_id, peer_id) == 0 && entry->conn == conn) {
if (prev == NULL) {
hub->peers = entry->next;
} else {
prev->next = entry->next;
}
free(entry);
break;
}
prev = entry;
}
pthread_rwlock_unlock(&hub->lock);
}
static kcp_peer_entry_t *kcp_hub_find_peer(kcp_hub_t *hub, const char *peer_id) {
kcp_peer_entry_t *entry;
for (entry = hub->peers; entry != NULL; entry = entry->next) {
if (strcmp(entry->peer_id, peer_id) == 0) {
return entry;
}
}
return NULL;
}
static int kcp_hub_send_server_error(kcp_conn_t *conn, const char *to, const char *message) {
message_t msg;
protocol_message_init(&msg);
msg.type = MSG_TYPE_ERROR;
snprintf(msg.from, sizeof(msg.from), "%s", SERVER_PEER_ID);
snprintf(msg.to, sizeof(msg.to), "%s", (to == NULL || to[0] == '\0') ? "unknown" : to);
msg.body = (uint8_t *) omni_strdup(message == NULL ? "" : message);
if (msg.body == NULL) {
return -1;
}
msg.body_len = strlen((const char *) msg.body);
if (kcp_conn_send(conn, &msg) != 0) {
protocol_message_clear(&msg);
return -1;
}
protocol_message_clear(&msg);
return 0;
}
static int kcp_hub_sockaddr_equal(const struct sockaddr *left, socklen_t left_len, const struct sockaddr *right, socklen_t right_len) {
char left_text[OMNI_MAX_ADDR_TEXT];
char right_text[OMNI_MAX_ADDR_TEXT];
if (left == NULL || right == NULL) {
return left == right;
}
return strcmp(
omni_sockaddr_to_string(left, left_len, left_text, sizeof(left_text)),
omni_sockaddr_to_string(right, right_len, right_text, sizeof(right_text))
) == 0;
}
static int kcp_hub_accept_relay_peer(kcp_hub_t *hub, const struct sockaddr *addr, socklen_t addr_len) {
int accepted = 0;
pthread_rwlock_wrlock(&hub->lock);
if (hub->relay_peer_addr_len == 0 && hub->relay_learn_peer) {
omni_clone_sockaddr(addr, addr_len, &hub->relay_peer_addr, &hub->relay_peer_addr_len);
accepted = 1;
} else if (hub->relay_peer_addr_len == 0) {
accepted = 1;
} else {
accepted = kcp_hub_sockaddr_equal((const struct sockaddr *) &hub->relay_peer_addr, hub->relay_peer_addr_len, addr, addr_len);
}
pthread_rwlock_unlock(&hub->lock);
return accepted;
}
static int kcp_hub_forward_to_relay(kcp_hub_t *hub, const message_t *msg, int *relay_status) {
uint8_t *payload = NULL;
size_t payload_len = 0;
struct sockaddr_storage relay_addr;
socklen_t relay_addr_len = 0;
int relay_fd = -1;
int relay_configured = 0;
if (relay_status != NULL) {
*relay_status = 0;
}
if (protocol_encode_message_datagram(msg, &payload, &payload_len) != 0) {
return -1;
}
if (payload_len > KCP_RELAY_MAX_DATAGRAM_SIZE) {
free(payload);
errno = EMSGSIZE;
if (relay_status != NULL) {
*relay_status = 3;
}
return -1;
}
pthread_rwlock_rdlock(&hub->lock);
relay_fd = hub->relay_fd;
relay_configured = hub->relay_configured;
if (hub->relay_peer_addr_len > 0) {
omni_clone_sockaddr((const struct sockaddr *) &hub->relay_peer_addr, hub->relay_peer_addr_len, &relay_addr, &relay_addr_len);
}
pthread_rwlock_unlock(&hub->lock);
if (!relay_configured || relay_fd < 0) {
free(payload);
errno = ENOTCONN;
if (relay_status != NULL) {
*relay_status = 1;
}
return -1;
}
if (relay_addr_len == 0) {
free(payload);
errno = EDESTADDRREQ;
if (relay_status != NULL) {
*relay_status = 2;
}
return -1;
}
if (sendto(relay_fd, payload, payload_len, 0, (struct sockaddr *) &relay_addr, relay_addr_len) < 0) {
free(payload);
return -1;
}
free(payload);
return 0;
}
static int kcp_hub_forward_relay_server_error(kcp_hub_t *hub, const char *to, const char *message) {
message_t msg;
int rc;
protocol_message_init(&msg);
msg.type = MSG_TYPE_ERROR;
snprintf(msg.from, sizeof(msg.from), "%s", SERVER_PEER_ID);
snprintf(msg.to, sizeof(msg.to), "%s", (to == NULL || to[0] == '\0') ? "unknown" : to);
msg.body = (uint8_t *) omni_strdup(message == NULL ? "" : message);
if (msg.body == NULL) {
return -1;
}
msg.body_len = strlen((const char *) msg.body);
rc = kcp_hub_forward_to_relay(hub, &msg, NULL);
protocol_message_clear(&msg);
return rc;
}
static int kcp_hub_deliver_to_local_peer(kcp_hub_t *hub, const message_t *msg) {
kcp_conn_t *target_conn = NULL;
int rc;
pthread_rwlock_rdlock(&hub->lock);
{
kcp_peer_entry_t *entry = kcp_hub_find_peer(hub, msg->to);
if (entry != NULL) {
target_conn = entry->conn;
}
}
pthread_rwlock_unlock(&hub->lock);
if (target_conn == NULL) {
errno = ENOENT;
return -1;
}
rc = kcp_conn_send(target_conn, msg);
if (rc != 0) {
kcp_hub_unregister(hub, msg->to, target_conn);
kcp_conn_close(target_conn);
return -1;
}
return 0;
}
static int kcp_hub_deliver_relayed_message(kcp_hub_t *hub, const message_t *msg) {
char *error_text;
if (kcp_hub_deliver_to_local_peer(hub, msg) == 0) {
return 0;
}
if (errno != ENOENT) {
if (msg->type == MSG_TYPE_ERROR) {
return 0;
}
error_text = omni_strdup_printf("failed to forward to %s", msg->to);
if (error_text == NULL) {
return -1;
}
if (kcp_hub_forward_relay_server_error(hub, msg->from, error_text) != 0) {
free(error_text);
return -1;
}
free(error_text);
return 0;
}
if (msg->type == MSG_TYPE_ERROR) {
return 0;
}
error_text = omni_strdup_printf("unknown target: %s", msg->to);
if (error_text == NULL) {
return -1;
}
if (kcp_hub_forward_relay_server_error(hub, msg->from, error_text) != 0) {
free(error_text);
return -1;
}
free(error_text);
return 0;
}
static int kcp_hub_handle_peer_message(kcp_hub_t *hub, const char *peer_id, kcp_conn_t *conn, message_t *msg) {
char *error_text = NULL;
int relay_status = 0;
switch (msg->type) {
case MSG_TYPE_TEXT:
case MSG_TYPE_FILE:
snprintf(msg->from, sizeof(msg->from), "%s", peer_id);
if (kcp_hub_deliver_to_local_peer(hub, msg) == 0) {
return 0;
}
if (errno != ENOENT) {
error_text = omni_strdup_printf("failed to forward to %s", msg->to);
if (error_text == NULL) {
return -1;
}
if (kcp_hub_send_server_error(conn, peer_id, error_text) != 0) {
free(error_text);
return -1;
}
free(error_text);
return 0;
}
if (kcp_hub_forward_to_relay(hub, msg, &relay_status) == 0) {
return 0;
}
if (relay_status == 1) {
error_text = omni_strdup_printf("unknown target: %s", msg->to);
} else if (relay_status == 2) {
error_text = omni_strdup("failed to relay to remote peer");
} else if (relay_status == 3) {
error_text = omni_strdup("message too large for relay udp");
} else {
error_text = omni_strdup("failed to relay to remote peer");
}
if (error_text == NULL) {
return -1;
}
if (kcp_hub_send_server_error(conn, peer_id, error_text) != 0) {
free(error_text);
return -1;
}
free(error_text);
return 0;
case MSG_TYPE_REGISTER:
case MSG_TYPE_ERROR:
if (kcp_hub_send_server_error(conn, peer_id, "registered peers can only send text or file messages") != 0) {
return -1;
}
errno = EPROTO;
return -1;
default:
error_text = omni_strdup_printf("unsupported message type: %s", protocol_message_type_name(msg->type));
if (error_text == NULL) {
return -1;
}
if (kcp_hub_send_server_error(conn, peer_id, error_text) != 0) {
free(error_text);
return -1;
}
free(error_text);
errno = EPROTO;
return -1;
}
}
static int kcp_hub_register_conn(kcp_hub_t *hub, kcp_conn_t *conn, char *peer_id, size_t peer_id_len) {
message_t msg;
kcp_peer_entry_t *entry;
protocol_message_init(&msg);
if (kcp_conn_receive(conn, &msg) != 0) {
protocol_message_clear(&msg);
return -1;
}
if (msg.type != MSG_TYPE_REGISTER) {
kcp_hub_send_server_error(conn, msg.from, "first message must be register");
protocol_message_clear(&msg);
errno = EPROTO;
return -1;
}
pthread_rwlock_wrlock(&hub->lock);
entry = kcp_hub_find_peer(hub, msg.from);
if (entry != NULL) {
char *error_text;
pthread_rwlock_unlock(&hub->lock);
error_text = omni_strdup_printf("duplicate peer id: %s", msg.from);
if (error_text != NULL) {
(void) kcp_hub_send_server_error(conn, msg.from, error_text);
free(error_text);
}
protocol_message_clear(&msg);
errno = EEXIST;
return -1;
}
entry = (kcp_peer_entry_t *) calloc(1, sizeof(*entry));
if (entry == NULL) {
pthread_rwlock_unlock(&hub->lock);
protocol_message_clear(&msg);
return -1;
}
snprintf(entry->peer_id, sizeof(entry->peer_id), "%s", msg.from);
entry->conn = conn;
entry->next = hub->peers;
hub->peers = entry;
pthread_rwlock_unlock(&hub->lock);
snprintf(peer_id, peer_id_len, "%s", msg.from);
protocol_message_clear(&msg);
return 0;
}
static void *kcp_hub_session_thread_main(void *arg) {
kcp_session_thread_ctx_t *ctx = (kcp_session_thread_ctx_t *) arg;
kcp_hub_serve_session(ctx->hub, ctx->conn);
free(ctx);
return NULL;
}
kcp_hub_t *kcp_hub_new(latency_logger_t *logger, kcp_session_stats_logger_t *stats_logger, int stats_interval_ms) {
kcp_hub_t *hub = (kcp_hub_t *) calloc(1, sizeof(*hub));
if (hub == NULL) {
return NULL;
}
pthread_rwlock_init(&hub->lock, NULL);
hub->logger = logger;
hub->stats_logger = stats_logger;
hub->stats_interval_ms = stats_interval_ms > 0 ? stats_interval_ms : KCP_DEFAULT_STATS_INTERVAL_MS;
hub->relay_fd = -1;
return hub;
}
int kcp_hub_serve_listener(kcp_hub_t *hub, kcp_listener_t *listener) {
if (hub == NULL || listener == NULL) {
errno = EINVAL;
return -1;
}
while (!hub->closed) {
kcp_conn_t *conn = kcp_listener_accept(listener);
kcp_session_thread_ctx_t *ctx;
pthread_t thread;
if (conn == NULL) {
if (hub->closed) {
return 0;
}
return -1;
}
ctx = (kcp_session_thread_ctx_t *) calloc(1, sizeof(*ctx));
if (ctx == NULL) {
kcp_conn_close(conn);
kcp_conn_free(conn);
return -1;
}
ctx->hub = hub;
ctx->conn = conn;
if (pthread_create(&thread, NULL, kcp_hub_session_thread_main, ctx) != 0) {
free(ctx);
kcp_conn_close(conn);
kcp_conn_free(conn);
return -1;
}
pthread_detach(thread);
}
return 0;
}
int kcp_hub_serve_session(kcp_hub_t *hub, kcp_conn_t *conn) {
char peer_id[OMNI_MAX_PEER_ID];
int rc = 0;
if (hub == NULL || conn == NULL) {
errno = EINVAL;
return -1;
}
peer_id[0] = '\0';
if (kcp_conn_configure_runtime(conn, hub->logger, OMNI_NODE_ROLE_SERVER, "hub", hub->stats_logger, hub->stats_interval_ms) != 0) {
kcp_conn_close(conn);
kcp_conn_free(conn);
return -1;
}
if (kcp_hub_register_conn(hub, conn, peer_id, sizeof(peer_id)) != 0) {
kcp_conn_close(conn);
kcp_conn_free(conn);
return -1;
}
for (;;) {
message_t msg;
protocol_message_init(&msg);
if (kcp_conn_receive(conn, &msg) != 0) {
protocol_message_clear(&msg);
rc = -1;
break;
}
if (kcp_hub_handle_peer_message(hub, peer_id, conn, &msg) != 0) {
protocol_message_clear(&msg);
rc = -1;
break;
}
protocol_message_clear(&msg);
}
kcp_hub_unregister(hub, peer_id, conn);
kcp_conn_close(conn);
kcp_conn_free(conn);
return rc;
}
int kcp_hub_set_relay(kcp_hub_t *hub, int relay_fd, const struct sockaddr *peer_addr, socklen_t peer_addr_len, int learn_peer) {
if (hub == NULL || relay_fd < 0) {
errno = EINVAL;
return -1;
}
pthread_rwlock_wrlock(&hub->lock);
hub->relay_fd = relay_fd;
hub->relay_configured = 1;
hub->relay_learn_peer = learn_peer;
hub->relay_peer_addr_len = 0;
if (peer_addr != NULL && peer_addr_len > 0) {
omni_clone_sockaddr(peer_addr, peer_addr_len, &hub->relay_peer_addr, &hub->relay_peer_addr_len);
}
pthread_rwlock_unlock(&hub->lock);
return 0;
}
int kcp_hub_serve_relay(kcp_hub_t *hub) {
uint8_t buffer[KCP_RELAY_MAX_DATAGRAM_SIZE];
if (hub == NULL) {
errno = EINVAL;
return -1;
}
while (!hub->closed) {
struct sockaddr_storage source;
socklen_t source_len = sizeof(source);
ssize_t n;
message_t msg;
char err[128];
int relay_fd;
pthread_rwlock_rdlock(&hub->lock);
relay_fd = hub->relay_fd;
pthread_rwlock_unlock(&hub->lock);
if (relay_fd < 0) {
errno = ENOTCONN;
return -1;
}
n = recvfrom(relay_fd, buffer, sizeof(buffer), 0, (struct sockaddr *) &source, &source_len);
if (n < 0) {
if (hub->closed) {
return 0;
}
if (errno == EINTR) {
continue;
}
return -1;
}
if (!kcp_hub_accept_relay_peer(hub, (struct sockaddr *) &source, source_len)) {
continue;
}
protocol_message_init(&msg);
if (protocol_decode_message_datagram(buffer, (size_t) n, &msg, err, sizeof(err)) != 0) {
protocol_message_clear(&msg);
continue;
}
if (msg.type != MSG_TYPE_TEXT && msg.type != MSG_TYPE_FILE && msg.type != MSG_TYPE_ERROR) {
protocol_message_clear(&msg);
continue;
}
(void) kcp_hub_deliver_relayed_message(hub, &msg);
protocol_message_clear(&msg);
}
return 0;
}
int kcp_hub_close(kcp_hub_t *hub) {
if (hub == NULL) {
return 0;
}
if (!hub->closed) {
hub->closed = 1;
if (hub->relay_fd >= 0) {
close(hub->relay_fd);
hub->relay_fd = -1;
}
}
return 0;
}
void kcp_hub_free(kcp_hub_t *hub) {
kcp_peer_entry_t *entry;
kcp_peer_entry_t *next;
if (hub == NULL) {
return;
}
kcp_hub_close(hub);
for (entry = hub->peers; entry != NULL; entry = next) {
next = entry->next;
if (entry->conn != NULL) {
kcp_conn_close(entry->conn);
}
free(entry);
}
pthread_rwlock_destroy(&hub->lock);
free(hub);
}

181
src/server_udp_hub.c Normal file
View File

@@ -0,0 +1,181 @@
#include "server_udp_hub.h"
#include <unistd.h>
typedef struct udp_peer_entry {
struct udp_peer_entry *next;
char peer_id[OMNI_MAX_PEER_ID];
struct sockaddr_storage addr;
socklen_t addr_len;
} udp_peer_entry_t;
struct udp_hub {
udp_conn_t *conn;
pthread_rwlock_t lock;
udp_peer_entry_t *peers;
};
static int udp_addr_equal(const struct sockaddr_storage *a, socklen_t a_len, const struct sockaddr_storage *b, socklen_t b_len) {
return a_len == b_len && memcmp(a, b, a_len) == 0;
}
static udp_peer_entry_t *udp_hub_find_by_id(udp_hub_t *hub, const char *peer_id) {
udp_peer_entry_t *entry;
for (entry = hub->peers; entry != NULL; entry = entry->next) {
if (strcmp(entry->peer_id, peer_id) == 0) {
return entry;
}
}
return NULL;
}
static udp_peer_entry_t *udp_hub_find_by_addr(udp_hub_t *hub, const struct sockaddr_storage *addr, socklen_t addr_len) {
udp_peer_entry_t *entry;
for (entry = hub->peers; entry != NULL; entry = entry->next) {
if (udp_addr_equal(&entry->addr, entry->addr_len, addr, addr_len)) {
return entry;
}
}
return NULL;
}
static int udp_hub_send_error(udp_hub_t *hub, const struct sockaddr_storage *addr, socklen_t addr_len, const char *to, const char *message) {
message_t msg;
protocol_message_init(&msg);
msg.type = MSG_TYPE_ERROR;
msg.id = 0;
snprintf(msg.from, sizeof(msg.from), "%s", SERVER_PEER_ID);
snprintf(msg.to, sizeof(msg.to), "%s", to == NULL || to[0] == '\0' ? "unknown" : to);
msg.body = (uint8_t *) omni_strdup(message);
msg.body_len = msg.body == NULL ? 0 : strlen((const char *) msg.body);
if (msg.body == NULL) {
return -1;
}
if (udp_conn_send_to(hub->conn, &msg, (const struct sockaddr *) addr, addr_len) != 0) {
protocol_message_clear(&msg);
return -1;
}
protocol_message_clear(&msg);
return 0;
}
udp_hub_t *udp_hub_open(const char *listen_addr, latency_logger_t *logger, tx_timestamp_debug_logger_t *debug_logger, int enable_timestamping) {
udp_hub_t *hub = (udp_hub_t *) calloc(1, sizeof(*hub));
if (hub == NULL) {
return NULL;
}
hub->conn = udp_conn_bind(listen_addr, NULL, enable_timestamping, logger, OMNI_NODE_ROLE_SERVER, "hub", debug_logger);
if (hub->conn == NULL) {
free(hub);
return NULL;
}
pthread_rwlock_init(&hub->lock, NULL);
return hub;
}
int udp_hub_serve(udp_hub_t *hub) {
message_t msg;
struct sockaddr_storage addr;
socklen_t addr_len;
udp_peer_entry_t *sender;
udp_peer_entry_t *target;
udp_peer_entry_t *entry;
if (hub == NULL) {
errno = EINVAL;
return -1;
}
protocol_message_init(&msg);
for (;;) {
protocol_message_clear(&msg);
if (udp_conn_receive(hub->conn, &msg, &addr, &addr_len) != 0) {
return -1;
}
if (msg.type == MSG_TYPE_REGISTER) {
pthread_rwlock_wrlock(&hub->lock);
entry = udp_hub_find_by_id(hub, msg.from);
if (entry == NULL) {
entry = (udp_peer_entry_t *) calloc(1, sizeof(*entry));
if (entry == NULL) {
pthread_rwlock_unlock(&hub->lock);
protocol_message_clear(&msg);
return -1;
}
snprintf(entry->peer_id, sizeof(entry->peer_id), "%s", msg.from);
entry->next = hub->peers;
hub->peers = entry;
}
memcpy(&entry->addr, &addr, sizeof(addr));
entry->addr_len = addr_len;
pthread_rwlock_unlock(&hub->lock);
continue;
}
if (msg.type != MSG_TYPE_TEXT && msg.type != MSG_TYPE_FILE) {
if (msg.type == MSG_TYPE_ERROR) {
udp_hub_send_error(hub, &addr, addr_len, msg.from, "peers cannot send error messages");
} else {
char *error_text = omni_strdup_printf("unsupported message type: %s", protocol_message_type_name(msg.type));
if (error_text != NULL) {
udp_hub_send_error(hub, &addr, addr_len, msg.from, error_text);
free(error_text);
}
}
continue;
}
pthread_rwlock_rdlock(&hub->lock);
sender = udp_hub_find_by_addr(hub, &addr, addr_len);
if (sender == NULL) {
pthread_rwlock_unlock(&hub->lock);
udp_hub_send_error(hub, &addr, addr_len, msg.from, "not registered; send register first");
continue;
}
snprintf(msg.from, sizeof(msg.from), "%s", sender->peer_id);
target = udp_hub_find_by_id(hub, msg.to);
if (target == NULL) {
char *error_text;
pthread_rwlock_unlock(&hub->lock);
error_text = omni_strdup_printf("unknown target: %s", msg.to);
if (error_text != NULL) {
udp_hub_send_error(hub, &addr, addr_len, sender->peer_id, error_text);
free(error_text);
}
continue;
}
if (udp_conn_send_to(hub->conn, &msg, (const struct sockaddr *) &target->addr, target->addr_len) != 0) {
char *error_text;
pthread_rwlock_unlock(&hub->lock);
error_text = omni_strdup_printf("failed to forward to %s", msg.to);
if (error_text != NULL) {
udp_hub_send_error(hub, &addr, addr_len, sender->peer_id, error_text);
free(error_text);
}
continue;
}
pthread_rwlock_unlock(&hub->lock);
}
}
int udp_hub_close(udp_hub_t *hub) {
if (hub == NULL) {
return 0;
}
return udp_conn_close(hub->conn);
}
void udp_hub_free(udp_hub_t *hub) {
udp_peer_entry_t *entry;
udp_peer_entry_t *next;
if (hub == NULL) {
return;
}
udp_conn_free(hub->conn);
for (entry = hub->peers; entry != NULL; entry = next) {
next = entry->next;
free(entry);
}
pthread_rwlock_destroy(&hub->lock);
free(hub);
}

417
src/server_udp_relay.c Normal file
View File

@@ -0,0 +1,417 @@
#include "server_udp_relay.h"
#include <arpa/inet.h>
#include <unistd.h>
#define UDP_RELAY_BUF_SIZE (64U * 1024U)
struct udp_relay {
int downstream_fd;
int upstream_fd;
struct sockaddr_storage upstream_addr;
socklen_t upstream_addr_len;
char downstream_local_addr[OMNI_MAX_ADDR_TEXT];
char upstream_local_addr[OMNI_MAX_ADDR_TEXT];
struct sockaddr_storage client_addr;
socklen_t client_addr_len;
int has_client;
pthread_mutex_t lock;
pthread_mutex_t log_mu;
pthread_mutex_t state_mu;
pthread_cond_t state_cond;
pthread_t downstream_thread;
int downstream_thread_started;
pthread_t upstream_thread;
int upstream_thread_started;
int worker_done;
int worker_rc;
int worker_errno;
int closed;
};
static void udp_relay_parse_kcp_summary(const uint8_t *packet, size_t len, int *has_conv, uint32_t *conv, size_t *segment_count) {
size_t offset = 0;
size_t count = 0;
if (has_conv != NULL) {
*has_conv = 0;
}
if (conv != NULL) {
*conv = 0;
}
if (segment_count != NULL) {
*segment_count = 0;
}
if (packet == NULL || len < 4U) {
return;
}
if (has_conv != NULL) {
*has_conv = 1;
}
if (conv != NULL) {
*conv = (uint32_t) ((unsigned char) packet[0] |
((unsigned char) packet[1] << 8) |
((unsigned char) packet[2] << 16) |
((unsigned char) packet[3] << 24));
}
while (offset + 24U <= len) {
uint32_t seg_len = (uint32_t) ((unsigned char) packet[offset + 20] |
((unsigned char) packet[offset + 21] << 8) |
((unsigned char) packet[offset + 22] << 16) |
((unsigned char) packet[offset + 23] << 24));
if (offset + 24U + seg_len > len) {
return;
}
count++;
offset += 24U + seg_len;
}
if (segment_count != NULL) {
*segment_count = count;
}
}
static void udp_relay_print_packet(udp_relay_t *relay, const char *event_name, const char *local_addr, const struct sockaddr_storage *remote_addr, socklen_t remote_addr_len, const uint8_t *packet, size_t packet_len) {
char remote_addr_text[OMNI_MAX_ADDR_TEXT];
int64_t ts_unix_nano;
int has_conv = 0;
uint32_t conv = 0;
size_t segment_count = 0;
if (relay == NULL) {
return;
}
if (remote_addr != NULL && remote_addr_len > 0) {
omni_sockaddr_to_string((const struct sockaddr *) remote_addr, remote_addr_len, remote_addr_text, sizeof(remote_addr_text));
} else {
remote_addr_text[0] = '\0';
}
ts_unix_nano = omni_now_unix_nano();
udp_relay_parse_kcp_summary(packet, packet_len, &has_conv, &conv, &segment_count);
pthread_mutex_lock(&relay->log_mu);
if (has_conv) {
fprintf(stderr, "[relay] ts=%" PRId64 " event=%s local=%s remote=%s bytes=%zu conv=%" PRIu32 " segs=%zu\n",
ts_unix_nano,
event_name == NULL ? "" : event_name,
local_addr == NULL ? "" : local_addr,
remote_addr_text,
packet_len,
conv,
segment_count);
} else {
fprintf(stderr, "[relay] ts=%" PRId64 " event=%s local=%s remote=%s bytes=%zu\n",
ts_unix_nano,
event_name == NULL ? "" : event_name,
local_addr == NULL ? "" : local_addr,
remote_addr_text,
packet_len);
}
fflush(stderr);
pthread_mutex_unlock(&relay->log_mu);
}
static int udp_relay_is_closed(udp_relay_t *relay) {
int closed;
pthread_mutex_lock(&relay->state_mu);
closed = relay->closed;
pthread_mutex_unlock(&relay->state_mu);
return closed;
}
static void udp_relay_note_result(udp_relay_t *relay, int rc, int errnum) {
pthread_mutex_lock(&relay->state_mu);
if (!relay->worker_done) {
relay->worker_done = 1;
relay->worker_rc = rc;
relay->worker_errno = errnum;
pthread_cond_signal(&relay->state_cond);
}
pthread_mutex_unlock(&relay->state_mu);
}
static void udp_relay_record_client(udp_relay_t *relay, const struct sockaddr_storage *addr, socklen_t addr_len) {
pthread_mutex_lock(&relay->lock);
memcpy(&relay->client_addr, addr, sizeof(*addr));
relay->client_addr_len = addr_len;
relay->has_client = 1;
pthread_mutex_unlock(&relay->lock);
}
static int udp_relay_copy_client(udp_relay_t *relay, struct sockaddr_storage *addr, socklen_t *addr_len) {
int has_client;
pthread_mutex_lock(&relay->lock);
has_client = relay->has_client;
if (has_client) {
memcpy(addr, &relay->client_addr, sizeof(*addr));
*addr_len = relay->client_addr_len;
}
pthread_mutex_unlock(&relay->lock);
return has_client;
}
static void *udp_relay_forward_downstream_to_upstream(void *arg) {
udp_relay_t *relay = (udp_relay_t *) arg;
uint8_t buffer[UDP_RELAY_BUF_SIZE];
for (;;) {
struct sockaddr_storage source;
socklen_t source_len = sizeof(source);
ssize_t n = recvfrom(relay->downstream_fd, buffer, sizeof(buffer), 0, (struct sockaddr *) &source, &source_len);
if (n < 0) {
int errnum = errno;
if (errnum == EINTR) {
continue;
}
if (udp_relay_is_closed(relay)) {
udp_relay_note_result(relay, 0, 0);
} else {
udp_relay_note_result(relay, -1, errnum);
}
return NULL;
}
udp_relay_record_client(relay, &source, source_len);
udp_relay_print_packet(relay, "relay_downstream_rx", relay->downstream_local_addr, &source, source_len, buffer, (size_t) n);
for (;;) {
if (send(relay->upstream_fd, buffer, (size_t) n, 0) >= 0) {
udp_relay_print_packet(relay, "relay_upstream_tx", relay->upstream_local_addr, &relay->upstream_addr, relay->upstream_addr_len, buffer, (size_t) n);
break;
}
{
int errnum = errno;
if (errnum == EINTR) {
continue;
}
if (udp_relay_is_closed(relay)) {
udp_relay_note_result(relay, 0, 0);
} else {
udp_relay_note_result(relay, -1, errnum);
}
return NULL;
}
}
}
}
static void *udp_relay_forward_upstream_to_downstream(void *arg) {
udp_relay_t *relay = (udp_relay_t *) arg;
uint8_t buffer[UDP_RELAY_BUF_SIZE];
for (;;) {
struct sockaddr_storage client_addr;
socklen_t client_addr_len = 0;
ssize_t n = recv(relay->upstream_fd, buffer, sizeof(buffer), 0);
if (n < 0) {
int errnum = errno;
if (errnum == EINTR) {
continue;
}
if (udp_relay_is_closed(relay)) {
udp_relay_note_result(relay, 0, 0);
} else {
udp_relay_note_result(relay, -1, errnum);
}
return NULL;
}
udp_relay_print_packet(relay, "relay_upstream_rx", relay->upstream_local_addr, &relay->upstream_addr, relay->upstream_addr_len, buffer, (size_t) n);
if (!udp_relay_copy_client(relay, &client_addr, &client_addr_len)) {
udp_relay_print_packet(relay, "relay_upstream_drop_no_client", relay->upstream_local_addr, &relay->upstream_addr, relay->upstream_addr_len, buffer, (size_t) n);
continue;
}
for (;;) {
if (sendto(relay->downstream_fd, buffer, (size_t) n, 0, (struct sockaddr *) &client_addr, client_addr_len) >= 0) {
udp_relay_print_packet(relay, "relay_downstream_tx", relay->downstream_local_addr, &client_addr, client_addr_len, buffer, (size_t) n);
break;
}
{
int errnum = errno;
if (errnum == EINTR) {
continue;
}
if (udp_relay_is_closed(relay)) {
udp_relay_note_result(relay, 0, 0);
} else {
udp_relay_note_result(relay, -1, errnum);
}
return NULL;
}
}
}
}
static void udp_relay_join_threads(udp_relay_t *relay) {
if (relay->downstream_thread_started) {
pthread_join(relay->downstream_thread, NULL);
relay->downstream_thread_started = 0;
}
if (relay->upstream_thread_started) {
pthread_join(relay->upstream_thread, NULL);
relay->upstream_thread_started = 0;
}
}
udp_relay_t *udp_relay_open(const char *listen_addr, const char *upstream_addr) {
struct sockaddr_storage listen_ss;
struct sockaddr_storage upstream_ss;
struct sockaddr_storage downstream_local_ss;
struct sockaddr_storage upstream_local_ss;
socklen_t listen_len;
socklen_t upstream_len;
socklen_t downstream_local_len = sizeof(downstream_local_ss);
socklen_t upstream_local_len = sizeof(upstream_local_ss);
int family;
int fd_listen = -1;
int fd_upstream = -1;
udp_relay_t *relay = NULL;
if (omni_parse_sockaddr(listen_addr, 1, &listen_ss, &listen_len, &family) != 0 ||
omni_parse_sockaddr(upstream_addr, 0, &upstream_ss, &upstream_len, &family) != 0) {
return NULL;
}
fd_listen = socket(listen_ss.ss_family, SOCK_DGRAM, 0);
if (fd_listen < 0) {
return NULL;
}
if (bind(fd_listen, (struct sockaddr *) &listen_ss, listen_len) != 0) {
close(fd_listen);
return NULL;
}
fd_upstream = socket(upstream_ss.ss_family, SOCK_DGRAM, 0);
if (fd_upstream < 0) {
close(fd_listen);
return NULL;
}
if (connect(fd_upstream, (struct sockaddr *) &upstream_ss, upstream_len) != 0) {
close(fd_upstream);
close(fd_listen);
return NULL;
}
relay = (udp_relay_t *) calloc(1, sizeof(*relay));
if (relay == NULL) {
close(fd_upstream);
close(fd_listen);
return NULL;
}
relay->downstream_fd = fd_listen;
relay->upstream_fd = fd_upstream;
memcpy(&relay->upstream_addr, &upstream_ss, sizeof(upstream_ss));
relay->upstream_addr_len = upstream_len;
if (getsockname(fd_listen, (struct sockaddr *) &downstream_local_ss, &downstream_local_len) == 0) {
omni_sockaddr_to_string((const struct sockaddr *) &downstream_local_ss, downstream_local_len, relay->downstream_local_addr, sizeof(relay->downstream_local_addr));
} else {
snprintf(relay->downstream_local_addr, sizeof(relay->downstream_local_addr), "%s", listen_addr == NULL ? "" : listen_addr);
}
if (getsockname(fd_upstream, (struct sockaddr *) &upstream_local_ss, &upstream_local_len) == 0) {
omni_sockaddr_to_string((const struct sockaddr *) &upstream_local_ss, upstream_local_len, relay->upstream_local_addr, sizeof(relay->upstream_local_addr));
} else {
snprintf(relay->upstream_local_addr, sizeof(relay->upstream_local_addr), "%s", listen_addr == NULL ? "" : listen_addr);
}
pthread_mutex_init(&relay->lock, NULL);
pthread_mutex_init(&relay->log_mu, NULL);
pthread_mutex_init(&relay->state_mu, NULL);
pthread_cond_init(&relay->state_cond, NULL);
return relay;
}
int udp_relay_serve(udp_relay_t *relay) {
int thread_rc;
int rc;
int errnum;
if (relay == NULL) {
errno = EINVAL;
return -1;
}
if (udp_relay_is_closed(relay)) {
errno = ECANCELED;
return -1;
}
pthread_mutex_lock(&relay->state_mu);
relay->worker_done = 0;
relay->worker_rc = 0;
relay->worker_errno = 0;
pthread_mutex_unlock(&relay->state_mu);
thread_rc = pthread_create(&relay->downstream_thread, NULL, udp_relay_forward_downstream_to_upstream, relay);
if (thread_rc != 0) {
errno = thread_rc;
return -1;
}
relay->downstream_thread_started = 1;
thread_rc = pthread_create(&relay->upstream_thread, NULL, udp_relay_forward_upstream_to_downstream, relay);
if (thread_rc != 0) {
errno = thread_rc;
udp_relay_close(relay);
udp_relay_join_threads(relay);
return -1;
}
relay->upstream_thread_started = 1;
pthread_mutex_lock(&relay->state_mu);
while (!relay->worker_done) {
pthread_cond_wait(&relay->state_cond, &relay->state_mu);
}
rc = relay->worker_rc;
errnum = relay->worker_errno;
pthread_mutex_unlock(&relay->state_mu);
udp_relay_close(relay);
udp_relay_join_threads(relay);
if (rc != 0 && errnum != 0) {
errno = errnum;
}
return rc;
}
int udp_relay_close(udp_relay_t *relay) {
int downstream_fd;
int upstream_fd;
if (relay == NULL) {
return 0;
}
pthread_mutex_lock(&relay->state_mu);
if (relay->closed) {
pthread_mutex_unlock(&relay->state_mu);
return 0;
}
relay->closed = 1;
downstream_fd = relay->downstream_fd;
upstream_fd = relay->upstream_fd;
relay->downstream_fd = -1;
relay->upstream_fd = -1;
pthread_cond_broadcast(&relay->state_cond);
pthread_mutex_unlock(&relay->state_mu);
if (downstream_fd >= 0) {
close(downstream_fd);
}
if (upstream_fd >= 0) {
close(upstream_fd);
}
return 0;
}
void udp_relay_free(udp_relay_t *relay) {
if (relay == NULL) {
return;
}
udp_relay_close(relay);
udp_relay_join_threads(relay);
pthread_mutex_destroy(&relay->lock);
pthread_mutex_destroy(&relay->log_mu);
pthread_cond_destroy(&relay->state_cond);
pthread_mutex_destroy(&relay->state_mu);
free(relay);
}

1738
src/transport_kcp.c Normal file

File diff suppressed because it is too large Load Diff

476
src/transport_udp.c Normal file
View File

@@ -0,0 +1,476 @@
#include "transport_udp.h"
#include <arpa/inet.h>
#include <netdb.h>
#include <pthread.h>
#include <unistd.h>
typedef struct udp_pending_tx {
struct udp_pending_tx *next;
uint32_t tx_id;
message_t msg;
int bytes_written;
int saw_sched;
int saw_software;
} udp_pending_tx_t;
struct udp_conn {
int fd;
int connected;
int timestamping_enabled;
latency_logger_t *logger;
tx_timestamp_debug_logger_t *debug_logger;
char node_role[OMNI_MAX_NODE_ROLE];
char node_id[OMNI_MAX_PEER_ID];
pthread_mutex_t write_mu;
pthread_mutex_t pending_mu;
pthread_t errqueue_thread;
int errqueue_thread_started;
uint32_t next_tx_id;
udp_pending_tx_t *pending_head;
uint8_t *recv_buffer;
size_t recv_buffer_cap;
int closed;
};
static int udp_open_socket_for_addr(const struct sockaddr *addr, socklen_t addr_len, int bind_device, const char *device) {
int fd;
int reuse = 1;
fd = socket(addr->sa_family, SOCK_DGRAM, 0);
if (fd < 0) {
return -1;
}
setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse));
if (bind_device && omni_bind_device(fd, device) != 0) {
close(fd);
return -1;
}
(void) addr_len;
return fd;
}
static int udp_resolve_ip_only(const char *ip, int family, struct sockaddr_storage *out, socklen_t *out_len) {
struct addrinfo hints;
struct addrinfo *result = NULL;
memset(&hints, 0, sizeof(hints));
hints.ai_family = family;
hints.ai_socktype = SOCK_DGRAM;
if (getaddrinfo(ip, "0", &hints, &result) != 0 || result == NULL) {
errno = EINVAL;
return -1;
}
memcpy(out, result->ai_addr, result->ai_addrlen);
*out_len = (socklen_t) result->ai_addrlen;
freeaddrinfo(result);
return 0;
}
static udp_conn_t *udp_conn_alloc(int fd, int connected, int enable_timestamping, latency_logger_t *logger, const char *node_role, const char *node_id, tx_timestamp_debug_logger_t *debug_logger) {
udp_conn_t *conn = (udp_conn_t *) calloc(1, sizeof(*conn));
if (conn == NULL) {
return NULL;
}
conn->fd = fd;
conn->connected = connected;
conn->timestamping_enabled = enable_timestamping;
conn->logger = logger;
conn->debug_logger = debug_logger;
snprintf(conn->node_role, sizeof(conn->node_role), "%s", node_role == NULL ? "" : node_role);
snprintf(conn->node_id, sizeof(conn->node_id), "%s", node_id == NULL ? "" : node_id);
conn->recv_buffer = (uint8_t *) malloc(OMNI_MAX_FRAME_SIZE);
if (conn->recv_buffer == NULL) {
free(conn);
return NULL;
}
conn->recv_buffer_cap = OMNI_MAX_FRAME_SIZE;
pthread_mutex_init(&conn->write_mu, NULL);
pthread_mutex_init(&conn->pending_mu, NULL);
return conn;
}
static void udp_pending_destroy(udp_pending_tx_t *pending) {
while (pending != NULL) {
udp_pending_tx_t *next = pending->next;
protocol_message_clear(&pending->msg);
free(pending);
pending = next;
}
}
static int udp_debug_log_send_chunk(udp_conn_t *conn, const message_t *msg, int bytes_written, uint32_t tx_id) {
tx_timestamp_debug_record_t record;
if (conn->debug_logger == NULL) {
return 0;
}
memset(&record, 0, sizeof(record));
snprintf(record.record_type, sizeof(record.record_type), "%s", TX_TIMESTAMP_DEBUG_RECORD_SEND_CHUNK);
snprintf(record.node_role, sizeof(record.node_role), "%s", conn->node_role);
snprintf(record.node_id, sizeof(record.node_id), "%s", conn->node_id);
record.message_type = msg->type;
record.message_id = msg->id;
snprintf(record.from, sizeof(record.from), "%s", msg->from);
snprintf(record.to, sizeof(record.to), "%s", msg->to);
snprintf(record.file_name, sizeof(record.file_name), "%s", msg->file_name);
record.body_size = (int) msg->body_len;
record.send_call_index = 0;
record.frame_offset_start = 0;
record.frame_offset_end = bytes_written > 0 ? bytes_written - 1 : 0;
record.bytes_written = bytes_written;
record.expected_tx_id = tx_id;
return tx_timestamp_debug_log(conn->debug_logger, &record);
}
static void udp_debug_log_errqueue_event(udp_conn_t *conn, const message_t *msg, const omni_tx_timestamp_event_t *event, uint32_t tx_id, int selected) {
tx_timestamp_debug_record_t record;
if (conn->debug_logger == NULL) {
return;
}
memset(&record, 0, sizeof(record));
snprintf(record.record_type, sizeof(record.record_type), "%s", TX_TIMESTAMP_DEBUG_RECORD_ERRQUEUE_EVENT);
snprintf(record.node_role, sizeof(record.node_role), "%s", conn->node_role);
snprintf(record.node_id, sizeof(record.node_id), "%s", conn->node_id);
record.message_type = msg->type;
record.message_id = msg->id;
snprintf(record.from, sizeof(record.from), "%s", msg->from);
snprintf(record.to, sizeof(record.to), "%s", msg->to);
snprintf(record.file_name, sizeof(record.file_name), "%s", msg->file_name);
record.body_size = (int) msg->body_len;
snprintf(record.phase, sizeof(record.phase), "%s", "background");
record.read_index = 0;
snprintf(record.event_name, sizeof(record.event_name), "%s", event->event_name);
record.ts_unix_nano = event->ts_unix_nano;
record.ee_info = event->ee_info;
record.ee_data = event->ee_data;
record.expected_tx_id = tx_id;
record.selected_for_latency = selected;
tx_timestamp_debug_log(conn->debug_logger, &record);
}
static void *udp_errqueue_thread_main(void *arg) {
udp_conn_t *conn = (udp_conn_t *) arg;
uint8_t control[512];
struct msghdr msg;
struct iovec iov;
uint8_t dummy;
while (!conn->closed) {
ssize_t rc;
omni_tx_timestamp_event_t event;
udp_pending_tx_t *prev = NULL;
udp_pending_tx_t *cur = NULL;
memset(&msg, 0, sizeof(msg));
memset(control, 0, sizeof(control));
dummy = 0;
iov.iov_base = &dummy;
iov.iov_len = sizeof(dummy);
msg.msg_iov = &iov;
msg.msg_iovlen = 1;
msg.msg_control = control;
msg.msg_controllen = sizeof(control);
rc = recvmsg(conn->fd, &msg, MSG_ERRQUEUE | MSG_DONTWAIT);
if (rc < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
usleep(10000);
continue;
}
if (conn->closed) {
return NULL;
}
usleep(10000);
continue;
}
if (linux_timestamping_parse_tx_timestamp(&msg, &event) != 0) {
continue;
}
pthread_mutex_lock(&conn->pending_mu);
cur = NULL;
prev = NULL;
{
udp_pending_tx_t **head = &conn->pending_head;
udp_pending_tx_t *iter = *head;
while (iter != NULL) {
if (iter->tx_id == event.ee_data) {
cur = iter;
break;
}
prev = iter;
iter = iter->next;
}
if (cur != NULL) {
if (strcmp(event.event_name, EVENT_A_TX_SCHED) == 0 && !cur->saw_sched) {
cur->saw_sched = 1;
latencylog_log_message_event_at(conn->logger, conn->node_role, conn->node_id, EVENT_A_TX_SCHED, event.ts_unix_nano, &cur->msg);
} else if (strcmp(event.event_name, EVENT_A_TX_SOFTWARE) == 0 && !cur->saw_software) {
cur->saw_software = 1;
latencylog_log_message_event_at(conn->logger, conn->node_role, conn->node_id, EVENT_A_TX_SOFTWARE, event.ts_unix_nano, &cur->msg);
}
udp_debug_log_errqueue_event(conn, &cur->msg, &event, cur->tx_id, 1);
if (cur->saw_sched && cur->saw_software) {
if (prev == NULL) {
*head = cur->next;
} else {
prev->next = cur->next;
}
protocol_message_clear(&cur->msg);
free(cur);
}
}
}
pthread_mutex_unlock(&conn->pending_mu);
}
return NULL;
}
static int udp_conn_start_errqueue(udp_conn_t *conn) {
if (!conn->timestamping_enabled) {
return 0;
}
if (pthread_create(&conn->errqueue_thread, NULL, udp_errqueue_thread_main, conn) != 0) {
return -1;
}
conn->errqueue_thread_started = 1;
return 0;
}
udp_conn_t *udp_conn_dial(const char *server_addr, const char *bind_ip, const char *bind_device, int enable_timestamping, latency_logger_t *logger, const char *node_role, const char *node_id, tx_timestamp_debug_logger_t *debug_logger) {
struct sockaddr_storage remote_addr;
struct sockaddr_storage local_addr;
socklen_t remote_len;
socklen_t local_len;
int family;
int fd;
udp_conn_t *conn;
if (omni_parse_sockaddr(server_addr, 0, &remote_addr, &remote_len, &family) != 0) {
return NULL;
}
fd = udp_open_socket_for_addr((struct sockaddr *) &remote_addr, remote_len, bind_device != NULL && bind_device[0] != '\0', bind_device);
if (fd < 0) {
return NULL;
}
if (bind_ip != NULL && bind_ip[0] != '\0') {
if (udp_resolve_ip_only(bind_ip, family, &local_addr, &local_len) != 0 || bind(fd, (struct sockaddr *) &local_addr, local_len) != 0) {
close(fd);
return NULL;
}
}
if (connect(fd, (struct sockaddr *) &remote_addr, remote_len) != 0) {
close(fd);
return NULL;
}
if (enable_timestamping && linux_timestamping_enable_udp_socket(fd, 1) != 0) {
close(fd);
return NULL;
}
conn = udp_conn_alloc(fd, 1, enable_timestamping, logger, node_role, node_id, debug_logger);
if (conn == NULL) {
close(fd);
return NULL;
}
if (udp_conn_start_errqueue(conn) != 0) {
udp_conn_free(conn);
return NULL;
}
return conn;
}
udp_conn_t *udp_conn_bind(const char *listen_addr, const char *bind_device, int enable_timestamping, latency_logger_t *logger, const char *node_role, const char *node_id, tx_timestamp_debug_logger_t *debug_logger) {
struct sockaddr_storage local_addr;
socklen_t local_len;
int family;
int fd;
udp_conn_t *conn;
if (omni_parse_sockaddr(listen_addr, 1, &local_addr, &local_len, &family) != 0) {
return NULL;
}
fd = udp_open_socket_for_addr((struct sockaddr *) &local_addr, local_len, bind_device != NULL && bind_device[0] != '\0', bind_device);
if (fd < 0) {
return NULL;
}
if (bind(fd, (struct sockaddr *) &local_addr, local_len) != 0) {
close(fd);
return NULL;
}
if (enable_timestamping && linux_timestamping_enable_udp_socket(fd, 1) != 0) {
close(fd);
return NULL;
}
conn = udp_conn_alloc(fd, 0, enable_timestamping, logger, node_role, node_id, debug_logger);
if (conn == NULL) {
close(fd);
return NULL;
}
if (udp_conn_start_errqueue(conn) != 0) {
udp_conn_free(conn);
return NULL;
}
return conn;
}
static int udp_conn_send_inner(udp_conn_t *conn, const message_t *msg, const struct sockaddr *addr, socklen_t addr_len) {
uint8_t *payload = NULL;
size_t payload_len = 0;
ssize_t rc;
udp_pending_tx_t *pending = NULL;
uint32_t tx_id;
if (protocol_encode_message_datagram(msg, &payload, &payload_len) != 0) {
return -1;
}
pthread_mutex_lock(&conn->write_mu);
latencylog_log_message_event(conn->logger, conn->node_role, conn->node_id, EVENT_SEND_HANDOFF_BEGIN, msg);
tx_id = conn->next_tx_id++;
if (conn->timestamping_enabled) {
pending = (udp_pending_tx_t *) calloc(1, sizeof(*pending));
if (pending == NULL || protocol_message_copy(&pending->msg, msg) != 0) {
free(payload);
pthread_mutex_unlock(&conn->write_mu);
free(pending);
return -1;
}
pending->tx_id = tx_id;
pending->bytes_written = (int) payload_len;
pthread_mutex_lock(&conn->pending_mu);
pending->next = conn->pending_head;
conn->pending_head = pending;
pthread_mutex_unlock(&conn->pending_mu);
udp_debug_log_send_chunk(conn, msg, (int) payload_len, tx_id);
}
if (addr != NULL) {
rc = sendto(conn->fd, payload, payload_len, 0, addr, addr_len);
} else {
rc = send(conn->fd, payload, payload_len, 0);
}
free(payload);
if (rc < 0 || (size_t) rc != payload_len) {
if (pending != NULL) {
udp_pending_tx_t *prev = NULL;
udp_pending_tx_t *cur;
pthread_mutex_lock(&conn->pending_mu);
for (cur = conn->pending_head; cur != NULL; cur = cur->next) {
if (cur == pending) {
if (prev == NULL) {
conn->pending_head = cur->next;
} else {
prev->next = cur->next;
}
break;
}
prev = cur;
}
pthread_mutex_unlock(&conn->pending_mu);
protocol_message_clear(&pending->msg);
free(pending);
}
pthread_mutex_unlock(&conn->write_mu);
return -1;
}
latencylog_log_message_event(conn->logger, conn->node_role, conn->node_id, EVENT_SEND_HANDOFF_END, msg);
pthread_mutex_unlock(&conn->write_mu);
return 0;
}
int udp_conn_send(udp_conn_t *conn, const message_t *msg) {
if (conn == NULL || !conn->connected) {
errno = ENOTCONN;
return -1;
}
return udp_conn_send_inner(conn, msg, NULL, 0);
}
int udp_conn_send_to(udp_conn_t *conn, const message_t *msg, const struct sockaddr *addr, socklen_t addr_len) {
if (conn == NULL || addr == NULL) {
errno = EINVAL;
return -1;
}
return udp_conn_send_inner(conn, msg, addr, addr_len);
}
int udp_conn_receive(udp_conn_t *conn, message_t *out_msg, struct sockaddr_storage *addr, socklen_t *addr_len) {
uint8_t control[512];
struct sockaddr_storage source;
struct iovec iov;
struct msghdr msg;
ssize_t n;
int64_t rx_ts = 0;
char err[128];
if (conn == NULL || out_msg == NULL) {
errno = EINVAL;
return -1;
}
memset(&msg, 0, sizeof(msg));
memset(&source, 0, sizeof(source));
iov.iov_base = conn->recv_buffer;
iov.iov_len = conn->recv_buffer_cap;
msg.msg_name = &source;
msg.msg_namelen = sizeof(source);
msg.msg_iov = &iov;
msg.msg_iovlen = 1;
if (conn->timestamping_enabled) {
msg.msg_control = control;
msg.msg_controllen = sizeof(control);
}
n = recvmsg(conn->fd, &msg, 0);
if (n < 0) {
return -1;
}
if (conn->timestamping_enabled) {
rx_ts = linux_timestamping_parse_rx_timestamp(&msg);
}
if (protocol_decode_message_datagram(conn->recv_buffer, (size_t) n, out_msg, err, sizeof(err)) != 0) {
errno = EPROTO;
return -1;
}
if (addr != NULL && addr_len != NULL) {
omni_clone_sockaddr((struct sockaddr *) &source, msg.msg_namelen, addr, addr_len);
}
if (rx_ts > 0) {
latencylog_log_message_event_at(conn->logger, conn->node_role, conn->node_id, EVENT_B_RX_SOFTWARE, rx_ts, out_msg);
}
return 0;
}
int udp_conn_fd(const udp_conn_t *conn) {
return conn == NULL ? -1 : conn->fd;
}
int udp_conn_local_addr(const udp_conn_t *conn, struct sockaddr_storage *addr, socklen_t *addr_len) {
socklen_t len = sizeof(*addr);
if (conn == NULL || addr == NULL || addr_len == NULL) {
errno = EINVAL;
return -1;
}
if (getsockname(conn->fd, (struct sockaddr *) addr, &len) != 0) {
return -1;
}
*addr_len = len;
return 0;
}
int udp_conn_close(udp_conn_t *conn) {
if (conn == NULL) {
return 0;
}
if (!conn->closed) {
conn->closed = 1;
close(conn->fd);
if (conn->errqueue_thread_started) {
pthread_join(conn->errqueue_thread, NULL);
conn->errqueue_thread_started = 0;
}
}
return 0;
}
void udp_conn_free(udp_conn_t *conn) {
if (conn == NULL) {
return;
}
udp_conn_close(conn);
udp_pending_destroy(conn->pending_head);
free(conn->recv_buffer);
pthread_mutex_destroy(&conn->write_mu);
pthread_mutex_destroy(&conn->pending_mu);
free(conn);
}

108
src/tx_timestamp_debug.c Normal file
View File

@@ -0,0 +1,108 @@
#include "tx_timestamp_debug.h"
tx_timestamp_debug_logger_t *tx_timestamp_debug_open_jsonl(const char *path) {
tx_timestamp_debug_logger_t *logger;
FILE *file;
if (path == NULL || path[0] == '\0') {
return NULL;
}
if (omni_ensure_parent_dir(path) != 0) {
return NULL;
}
file = fopen(path, "ab");
if (file == NULL) {
return NULL;
}
logger = (tx_timestamp_debug_logger_t *) calloc(1, sizeof(*logger));
if (logger == NULL) {
fclose(file);
return NULL;
}
omni_file_logger_init(&logger->file_logger, file);
logger->enabled = 1;
return logger;
}
void tx_timestamp_debug_close(tx_timestamp_debug_logger_t *logger) {
if (logger == NULL) {
return;
}
if (logger->file_logger.file != NULL) {
fclose(logger->file_logger.file);
}
omni_file_logger_destroy(&logger->file_logger);
free(logger);
}
int tx_timestamp_debug_log(tx_timestamp_debug_logger_t *logger, const tx_timestamp_debug_record_t *record) {
char *line;
char *node_role;
char *node_id;
char *from;
char *to;
char *file_name;
char *phase;
char *event_name;
if (logger == NULL || record == NULL || !logger->enabled) {
return 0;
}
node_role = omni_json_escape(record->node_role);
node_id = omni_json_escape(record->node_id);
from = omni_json_escape(record->from);
to = omni_json_escape(record->to);
file_name = omni_json_escape(record->file_name);
phase = omni_json_escape(record->phase);
event_name = omni_json_escape(record->event_name);
if (node_role == NULL || node_id == NULL || from == NULL || to == NULL || file_name == NULL || phase == NULL || event_name == NULL) {
free(node_role);
free(node_id);
free(from);
free(to);
free(file_name);
free(phase);
free(event_name);
return -1;
}
line = omni_strdup_printf(
"{\"record_type\":\"%s\",\"node_role\":\"%s\",\"node_id\":\"%s\",\"message_type\":\"%s\",\"message_id\":%" PRIu64 ",\"from\":\"%s\",\"to\":\"%s\",\"file_name\":\"%s\",\"body_size\":%d,\"phase\":\"%s\",\"send_call_index\":%d,\"frame_offset_start\":%d,\"frame_offset_end\":%d,\"bytes_written\":%d,\"expected_tx_id\":%u,\"read_index\":%d,\"event_name\":\"%s\",\"ts_unix_nano\":%" PRId64 ",\"ee_info\":%u,\"ee_data\":%u,\"matched_send_call_index\":%d,\"selected_for_latency\":%d}",
record->record_type,
node_role,
node_id,
protocol_message_type_name(record->message_type),
record->message_id,
from,
to,
file_name,
record->body_size,
phase,
record->send_call_index,
record->frame_offset_start,
record->frame_offset_end,
record->bytes_written,
record->expected_tx_id,
record->read_index,
event_name,
record->ts_unix_nano,
record->ee_info,
record->ee_data,
record->matched_send_call_index,
record->selected_for_latency
);
free(node_role);
free(node_id);
free(from);
free(to);
free(file_name);
free(phase);
free(event_name);
if (line == NULL) {
return -1;
}
if (omni_file_logger_write_line(&logger->file_logger, line) != 0) {
free(line);
return -1;
}
free(line);
return 0;
}