feat: Go转C

This commit is contained in:
2026-03-30 13:52:56 +08:00
parent fd0270084b
commit d5ef84200e
46 changed files with 14830 additions and 1 deletions

334
c/cmd/kcppeer.c Normal file
View File

@@ -0,0 +1,334 @@
#include "cli_parse.h"
#include "interactive.h"
#include "peer_kcp_client.h"
#include <pthread.h>
typedef struct kcppeer_receive_ctx {
kcp_client_t *client;
const char *inbox_dir;
volatile int stop_requested;
int rc;
} kcppeer_receive_ctx_t;
static void kcppeer_usage(FILE *out) {
fprintf(out, "usage: kcppeer [-id peer-a] [-server 127.0.0.1:9002] [-relay-via addr]\n");
fprintf(out, " [-to peer] [-text msg | -file path] [-bind-ip ip] [-bind-device dev]\n");
fprintf(out, " [-inbox-dir dir] [-latency-log path] [-kcp-ts-debug-log path]\n");
fprintf(out, " [-kcp-session-stats-log path] [-kcp-session-stats-interval 100ms]\n");
fprintf(out, " [-interactive[=true|false]]\n");
}
static void *kcppeer_receive_thread_main(void *arg) {
kcppeer_receive_ctx_t *ctx = (kcppeer_receive_ctx_t *) arg;
for (;;) {
message_t msg;
char persisted_path[512];
protocol_message_init(&msg);
if (kcp_client_receive(ctx->client, &msg) != 0) {
protocol_message_clear(&msg);
ctx->rc = ctx->stop_requested ? 0 : -1;
return NULL;
}
switch (msg.type) {
case MSG_TYPE_TEXT:
if (kcp_client_persist_message(ctx->client, &msg, ctx->inbox_dir, persisted_path, sizeof(persisted_path)) != 0) {
fprintf(stderr, "kcppeer: persist text from %s to %s failed\n", msg.from, msg.to);
protocol_message_clear(&msg);
ctx->rc = -1;
return NULL;
}
fprintf(stderr, "received text from %s to %s and persisted to %s\n", msg.from, msg.to, persisted_path);
break;
case MSG_TYPE_FILE:
if (kcp_client_persist_message(ctx->client, &msg, ctx->inbox_dir, persisted_path, sizeof(persisted_path)) != 0) {
fprintf(stderr, "kcppeer: persist file from %s to %s failed\n", msg.from, msg.to);
protocol_message_clear(&msg);
ctx->rc = -1;
return NULL;
}
fprintf(stderr, "received file from %s to %s: %s (%lu bytes) -> %s\n", msg.from, msg.to, msg.file_name, (unsigned long) msg.body_len, persisted_path);
break;
case MSG_TYPE_ERROR:
fprintf(stderr, "received error from %s to %s: %.*s\n", msg.from, msg.to, (int) msg.body_len, msg.body == NULL ? "" : (const char *) msg.body);
break;
default:
fprintf(stderr, "received unexpected message type %s from %s\n", protocol_message_type_name(msg.type), msg.from);
protocol_message_clear(&msg);
ctx->rc = -1;
return NULL;
}
protocol_message_clear(&msg);
}
}
int main(int argc, char **argv) {
const char *peer_id = "peer-a";
const char *server_addr = "127.0.0.1:9002";
const char *relay_via = "";
const char *target_peer = "";
const char *text = "";
const char *file_path = "";
const char *bind_ip = "";
const char *bind_device = "";
const char *inbox_dir = "inbox";
const char *latency_log_path = "";
const char *packet_log_path = "";
const char *stats_log_path = "";
const char *stats_interval_raw = "";
int stats_interval_ms = KCP_DEFAULT_STATS_INTERVAL_MS;
int interactive = 1;
latency_logger_t *latency_logger = NULL;
kcp_packet_debug_logger_t *packet_logger = NULL;
kcp_session_stats_logger_t *stats_logger = NULL;
kcp_client_t *client = NULL;
kcppeer_receive_ctx_t receive_ctx;
pthread_t receive_thread;
int receive_thread_started = 0;
int i;
int rc = 1;
memset(&receive_ctx, 0, sizeof(receive_ctx));
for (i = 1; i < argc; ++i) {
const char *value = NULL;
int handled;
if ((handled = cli_parse_value_flag(argc, argv, &i, argv[i], "-id", &value)) < 0) {
fprintf(stderr, "kcppeer: flag -id requires a value\n");
return 1;
} else if (handled) {
peer_id = value;
continue;
}
if ((handled = cli_parse_value_flag(argc, argv, &i, argv[i], "-server", &value)) < 0) {
fprintf(stderr, "kcppeer: flag -server requires a value\n");
return 1;
} else if (handled) {
server_addr = value;
continue;
}
if ((handled = cli_parse_value_flag(argc, argv, &i, argv[i], "-relay-via", &value)) < 0) {
fprintf(stderr, "kcppeer: flag -relay-via requires a value\n");
return 1;
} else if (handled) {
relay_via = value;
continue;
}
if ((handled = cli_parse_value_flag(argc, argv, &i, argv[i], "-to", &value)) < 0) {
fprintf(stderr, "kcppeer: flag -to requires a value\n");
return 1;
} else if (handled) {
target_peer = value;
continue;
}
if ((handled = cli_parse_value_flag(argc, argv, &i, argv[i], "-text", &value)) < 0) {
fprintf(stderr, "kcppeer: flag -text requires a value\n");
return 1;
} else if (handled) {
text = value;
continue;
}
if ((handled = cli_parse_value_flag(argc, argv, &i, argv[i], "-file", &value)) < 0) {
fprintf(stderr, "kcppeer: flag -file requires a value\n");
return 1;
} else if (handled) {
file_path = value;
continue;
}
if ((handled = cli_parse_value_flag(argc, argv, &i, argv[i], "-bind-ip", &value)) < 0) {
fprintf(stderr, "kcppeer: flag -bind-ip requires a value\n");
return 1;
} else if (handled) {
bind_ip = value;
continue;
}
if ((handled = cli_parse_value_flag(argc, argv, &i, argv[i], "-bind-device", &value)) < 0) {
fprintf(stderr, "kcppeer: flag -bind-device requires a value\n");
return 1;
} else if (handled) {
bind_device = value;
continue;
}
if ((handled = cli_parse_value_flag(argc, argv, &i, argv[i], "-inbox-dir", &value)) < 0) {
fprintf(stderr, "kcppeer: flag -inbox-dir requires a value\n");
return 1;
} else if (handled) {
inbox_dir = value;
continue;
}
if ((handled = cli_parse_value_flag(argc, argv, &i, argv[i], "-latency-log", &value)) < 0) {
fprintf(stderr, "kcppeer: flag -latency-log requires a value\n");
return 1;
} else if (handled) {
latency_log_path = value;
continue;
}
if ((handled = cli_parse_value_flag(argc, argv, &i, argv[i], "-kcp-ts-debug-log", &value)) < 0) {
fprintf(stderr, "kcppeer: flag -kcp-ts-debug-log requires a value\n");
return 1;
} else if (handled) {
packet_log_path = value;
continue;
}
if ((handled = cli_parse_value_flag(argc, argv, &i, argv[i], "-kcp-session-stats-log", &value)) < 0) {
fprintf(stderr, "kcppeer: flag -kcp-session-stats-log requires a value\n");
return 1;
} else if (handled) {
stats_log_path = value;
continue;
}
if ((handled = cli_parse_value_flag(argc, argv, &i, argv[i], "-kcp-session-stats-interval", &value)) < 0) {
fprintf(stderr, "kcppeer: flag -kcp-session-stats-interval requires a value\n");
return 1;
} else if (handled) {
stats_interval_raw = value;
continue;
}
if ((handled = cli_parse_bool_flag(argv[i], "-interactive", &interactive)) < 0) {
fprintf(stderr, "kcppeer: invalid -interactive value\n");
return 1;
} else if (handled) {
continue;
}
if (strcmp(argv[i], "-h") == 0 || strcmp(argv[i], "--help") == 0) {
kcppeer_usage(stdout);
return 0;
}
fprintf(stderr, "kcppeer: unknown argument %s\n", argv[i]);
kcppeer_usage(stderr);
return 1;
}
if (text[0] != '\0' && file_path[0] != '\0') {
fprintf(stderr, "kcppeer: only one of -text or -file may be specified\n");
return 1;
}
if ((text[0] != '\0' || file_path[0] != '\0') && target_peer[0] == '\0') {
fprintf(stderr, "kcppeer: flag -to is required when sending text or file\n");
return 1;
}
if (kcp_session_stats_parse_interval_ms(stats_interval_raw, &stats_interval_ms) != 0) {
fprintf(stderr, "kcppeer: invalid -kcp-session-stats-interval value %s\n", stats_interval_raw);
return 1;
}
if (latency_log_path[0] != '\0') {
latency_logger = latencylog_open_jsonl(latency_log_path);
if (latency_logger == NULL) {
fprintf(stderr, "kcppeer: open latency logger %s failed\n", latency_log_path);
goto cleanup;
}
}
if (packet_log_path[0] != '\0') {
packet_logger = kcp_packet_debug_open_jsonl(packet_log_path);
if (packet_logger == NULL) {
fprintf(stderr, "kcppeer: open kcp packet debug logger %s failed\n", packet_log_path);
goto cleanup;
}
}
if (stats_log_path[0] != '\0') {
stats_logger = kcp_session_stats_open_jsonl(stats_log_path);
if (stats_logger == NULL) {
fprintf(stderr, "kcppeer: open kcp session stats logger %s failed\n", stats_log_path);
goto cleanup;
}
}
client = kcp_client_dial(server_addr, relay_via, peer_id, bind_ip, bind_device, latency_logger, packet_logger, stats_logger, stats_interval_ms);
if (client == NULL) {
fprintf(stderr, "kcppeer: dial kcp server %s failed\n", server_addr);
goto cleanup;
}
if (relay_via[0] != '\0') {
fprintf(stderr, "opened KCP session as %s; logical server=%s, actual dial target=%s via relay; register not yet confirmed\n", kcp_client_id(client), server_addr, relay_via);
} else {
fprintf(stderr, "opened KCP session as %s; logical server=%s, actual dial target=%s; register not yet confirmed\n", kcp_client_id(client), server_addr, server_addr);
}
receive_ctx.client = client;
receive_ctx.inbox_dir = inbox_dir;
if (pthread_create(&receive_thread, NULL, kcppeer_receive_thread_main, &receive_ctx) != 0) {
fprintf(stderr, "kcppeer: create receive thread failed\n");
goto cleanup;
}
receive_thread_started = 1;
if (target_peer[0] != '\0' && text[0] != '\0') {
if (kcp_client_send_text(client, target_peer, text) != 0) {
fprintf(stderr, "kcppeer: send text to %s failed\n", target_peer);
goto cleanup;
}
fprintf(stderr, "sent text to %s\n", target_peer);
}
if (target_peer[0] != '\0' && file_path[0] != '\0') {
if (kcp_client_send_file_path(client, target_peer, file_path) != 0) {
fprintf(stderr, "kcppeer: send file %s to %s failed\n", file_path, target_peer);
goto cleanup;
}
fprintf(stderr, "sent file %s to %s\n", file_path, target_peer);
}
if (interactive) {
char line[2048];
char prompt[128];
snprintf(prompt, sizeof(prompt), "%s> ", kcp_client_id(client));
interactive_print_help(stdout, "KCP");
while (fputs(prompt, stdout) >= 0 && fflush(stdout) == 0 && fgets(line, sizeof(line), stdin) != NULL) {
interactive_command_t command;
char err[128];
omni_trim_newline(line);
if (interactive_parse_command(line, &command, err, sizeof(err)) != 0) {
if (strstr(err, "empty command") == NULL) {
fprintf(stderr, "%s\n", err);
}
continue;
}
if (command.type == INTERACTIVE_CMD_HELP) {
interactive_print_help(stdout, "KCP");
continue;
}
if (command.type == INTERACTIVE_CMD_QUIT) {
break;
}
if (command.type == INTERACTIVE_CMD_TEXT) {
if (kcp_client_send_text(client, command.to, command.value) != 0) {
fprintf(stderr, "kcppeer: send text to %s failed\n", command.to);
continue;
}
fprintf(stderr, "sent text to %s\n", command.to);
continue;
}
if (command.type == INTERACTIVE_CMD_FILE) {
if (kcp_client_send_file_path(client, command.to, command.value) != 0) {
fprintf(stderr, "kcppeer: send file %s to %s failed\n", command.value, command.to);
continue;
}
fprintf(stderr, "sent file %s to %s\n", command.value, command.to);
continue;
}
}
}
rc = 0;
cleanup:
receive_ctx.stop_requested = 1;
kcp_client_close(client);
if (receive_thread_started) {
pthread_join(receive_thread, NULL);
if (rc == 0 && receive_ctx.rc != 0) {
rc = 1;
}
}
kcp_client_free(client);
kcp_session_stats_close(stats_logger);
kcp_packet_debug_close(packet_logger);
latencylog_close(latency_logger);
return rc;
}

788
c/cmd/kcpping.c Normal file
View File

@@ -0,0 +1,788 @@
#include "cli_parse.h"
#include "peer_kcp_client.h"
#include "cJSON.h"
#include <signal.h>
#include <unistd.h>
typedef struct kcp_ping_message_node {
struct kcp_ping_message_node *next;
message_t msg;
} kcp_ping_message_node_t;
typedef struct kcp_ping_receiver_ctx {
kcp_client_t *client;
pthread_mutex_t mu;
kcp_ping_message_node_t *head;
kcp_ping_message_node_t *tail;
volatile int stop_requested;
int closed;
int rc;
} kcp_ping_receiver_ctx_t;
typedef struct kcp_pending_ping {
struct kcp_pending_ping *next;
uint64_t seq;
int64_t deadline_ns;
} kcp_pending_ping_t;
typedef struct kcp_ping_tracker {
kcp_pending_ping_t *pending;
int pending_count;
int sent;
int duplicates;
uint64_t max_seq_sent;
int64_t *samples_ns;
size_t sample_count;
size_t sample_cap;
} kcp_ping_tracker_t;
static volatile sig_atomic_t g_kcpping_stop = 0;
static void kcpping_on_signal(int signo) {
(void) signo;
g_kcpping_stop = 1;
}
static void kcpping_usage(FILE *out) {
fprintf(out, "usage: kcpping [-id pinger] [-server 127.0.0.1:9002] [-to peer] [-echo]\n");
fprintf(out, " [-count 100] [-interval 100ms] [-size 64] [-timeout 3s]\n");
fprintf(out, " [-bind-ip ip] [-bind-device dev] [-latency-log path]\n");
}
static int kcp_ping_compare_i64(const void *left, const void *right) {
const int64_t *a = (const int64_t *) left;
const int64_t *b = (const int64_t *) right;
if (*a < *b) {
return -1;
}
if (*a > *b) {
return 1;
}
return 0;
}
static double kcp_ping_sqrt(double value) {
double x = value;
int i;
if (value <= 0.0) {
return 0.0;
}
if (x < 1.0) {
x = 1.0;
}
for (i = 0; i < 16; ++i) {
x = 0.5 * (x + value / x);
}
return x;
}
static int kcp_ping_build_payload(uint64_t seq, int64_t ts_ns, int size, char **out_body, size_t *out_len) {
cJSON *root = NULL;
char *json = NULL;
char *pad = NULL;
size_t base_len;
size_t pad_len;
*out_body = NULL;
*out_len = 0;
root = cJSON_CreateObject();
if (root == NULL) {
return -1;
}
cJSON_AddNumberToObject(root, "seq", (double) seq);
cJSON_AddNumberToObject(root, "ts_ns", (double) ts_ns);
cJSON_AddStringToObject(root, "pad", "");
json = cJSON_PrintUnformatted(root);
cJSON_Delete(root);
if (json == NULL) {
return -1;
}
base_len = strlen(json);
cJSON_free(json);
if ((int) base_len > size) {
errno = EMSGSIZE;
return -1;
}
pad_len = (size_t) size - base_len;
pad = (char *) malloc(pad_len + 1U);
if (pad == NULL) {
return -1;
}
memset(pad, 'A', pad_len);
pad[pad_len] = '\0';
root = cJSON_CreateObject();
if (root == NULL) {
free(pad);
return -1;
}
cJSON_AddNumberToObject(root, "seq", (double) seq);
cJSON_AddNumberToObject(root, "ts_ns", (double) ts_ns);
cJSON_AddStringToObject(root, "pad", pad);
free(pad);
json = cJSON_PrintUnformatted(root);
cJSON_Delete(root);
if (json == NULL) {
return -1;
}
if ((int) strlen(json) != size) {
cJSON_free(json);
errno = EINVAL;
return -1;
}
*out_body = json;
*out_len = (size_t) size;
return 0;
}
static int kcp_ping_parse_payload(const uint8_t *body, size_t body_len, uint64_t *seq, int64_t *ts_ns) {
char *text;
cJSON *root;
const cJSON *seq_item;
const cJSON *ts_item;
if (body == NULL || seq == NULL || ts_ns == NULL) {
errno = EINVAL;
return -1;
}
text = (char *) malloc(body_len + 1U);
if (text == NULL) {
return -1;
}
memcpy(text, body, body_len);
text[body_len] = '\0';
root = cJSON_Parse(text);
free(text);
if (root == NULL) {
errno = EPROTO;
return -1;
}
seq_item = cJSON_GetObjectItemCaseSensitive(root, "seq");
ts_item = cJSON_GetObjectItemCaseSensitive(root, "ts_ns");
if (!cJSON_IsNumber(seq_item) || !cJSON_IsNumber(ts_item) || seq_item->valuedouble <= 0 || ts_item->valuedouble <= 0) {
cJSON_Delete(root);
errno = EPROTO;
return -1;
}
*seq = (uint64_t) seq_item->valuedouble;
*ts_ns = (int64_t) ts_item->valuedouble;
cJSON_Delete(root);
return 0;
}
static void kcp_ping_receiver_ctx_init(kcp_ping_receiver_ctx_t *ctx, kcp_client_t *client) {
memset(ctx, 0, sizeof(*ctx));
ctx->client = client;
pthread_mutex_init(&ctx->mu, NULL);
}
static void kcp_ping_receiver_ctx_destroy(kcp_ping_receiver_ctx_t *ctx) {
kcp_ping_message_node_t *node;
kcp_ping_message_node_t *next;
if (ctx == NULL) {
return;
}
for (node = ctx->head; node != NULL; node = next) {
next = node->next;
protocol_message_clear(&node->msg);
free(node);
}
pthread_mutex_destroy(&ctx->mu);
}
static void *kcpping_receive_thread_main(void *arg) {
kcp_ping_receiver_ctx_t *ctx = (kcp_ping_receiver_ctx_t *) arg;
for (;;) {
message_t msg;
kcp_ping_message_node_t *node;
protocol_message_init(&msg);
if (kcp_client_receive(ctx->client, &msg) != 0) {
protocol_message_clear(&msg);
pthread_mutex_lock(&ctx->mu);
ctx->rc = ctx->stop_requested ? 0 : -1;
ctx->closed = 1;
pthread_mutex_unlock(&ctx->mu);
return NULL;
}
node = (kcp_ping_message_node_t *) calloc(1, sizeof(*node));
if (node == NULL) {
protocol_message_clear(&msg);
pthread_mutex_lock(&ctx->mu);
ctx->rc = -1;
ctx->closed = 1;
pthread_mutex_unlock(&ctx->mu);
return NULL;
}
node->msg = msg;
pthread_mutex_lock(&ctx->mu);
if (ctx->tail == NULL) {
ctx->head = node;
} else {
ctx->tail->next = node;
}
ctx->tail = node;
pthread_mutex_unlock(&ctx->mu);
}
}
static int kcp_ping_receiver_pop(kcp_ping_receiver_ctx_t *ctx, message_t *out_msg) {
kcp_ping_message_node_t *node;
pthread_mutex_lock(&ctx->mu);
node = ctx->head;
if (node != NULL) {
ctx->head = node->next;
if (ctx->head == NULL) {
ctx->tail = NULL;
}
}
pthread_mutex_unlock(&ctx->mu);
if (node == NULL) {
return 0;
}
*out_msg = node->msg;
free(node);
return 1;
}
static int kcp_ping_receiver_status(kcp_ping_receiver_ctx_t *ctx, int *closed, int *rc) {
pthread_mutex_lock(&ctx->mu);
*closed = ctx->closed;
*rc = ctx->rc;
pthread_mutex_unlock(&ctx->mu);
return 0;
}
static void kcp_ping_tracker_init(kcp_ping_tracker_t *tracker) {
memset(tracker, 0, sizeof(*tracker));
}
static void kcp_ping_tracker_destroy(kcp_ping_tracker_t *tracker) {
kcp_pending_ping_t *pending;
kcp_pending_ping_t *next;
for (pending = tracker->pending; pending != NULL; pending = next) {
next = pending->next;
free(pending);
}
free(tracker->samples_ns);
}
static int kcp_ping_tracker_mark_sent(kcp_ping_tracker_t *tracker, uint64_t seq, int64_t sent_at_ns, int64_t timeout_ns) {
kcp_pending_ping_t *pending = (kcp_pending_ping_t *) calloc(1, sizeof(*pending));
if (pending == NULL) {
return -1;
}
pending->seq = seq;
pending->deadline_ns = sent_at_ns + timeout_ns;
pending->next = tracker->pending;
tracker->pending = pending;
tracker->pending_count++;
tracker->sent++;
tracker->max_seq_sent = seq;
return 0;
}
static kcp_pending_ping_t *kcp_ping_tracker_find_pending(kcp_ping_tracker_t *tracker, uint64_t seq, kcp_pending_ping_t **out_prev) {
kcp_pending_ping_t *prev = NULL;
kcp_pending_ping_t *cur;
for (cur = tracker->pending; cur != NULL; cur = cur->next) {
if (cur->seq == seq) {
if (out_prev != NULL) {
*out_prev = prev;
}
return cur;
}
prev = cur;
}
if (out_prev != NULL) {
*out_prev = NULL;
}
return NULL;
}
static int kcp_ping_tracker_add_sample(kcp_ping_tracker_t *tracker, int64_t rtt_ns) {
int64_t *next_samples;
size_t next_cap;
if (tracker->sample_count == tracker->sample_cap) {
next_cap = tracker->sample_cap == 0 ? 16U : tracker->sample_cap * 2U;
next_samples = (int64_t *) realloc(tracker->samples_ns, next_cap * sizeof(*next_samples));
if (next_samples == NULL) {
return -1;
}
tracker->samples_ns = next_samples;
tracker->sample_cap = next_cap;
}
tracker->samples_ns[tracker->sample_count++] = rtt_ns;
return 0;
}
static int kcp_ping_tracker_observe_reply(kcp_ping_tracker_t *tracker, uint64_t seq, int64_t sent_ts_ns, int64_t received_ts_ns, int *disposition, int64_t *rtt_ns) {
kcp_pending_ping_t *prev = NULL;
kcp_pending_ping_t *pending;
if (seq == 0 || seq > tracker->max_seq_sent) {
*disposition = 2;
*rtt_ns = 0;
return 0;
}
pending = kcp_ping_tracker_find_pending(tracker, seq, &prev);
if (pending == NULL) {
tracker->duplicates++;
*disposition = 1;
*rtt_ns = 0;
return 0;
}
if (prev == NULL) {
tracker->pending = pending->next;
} else {
prev->next = pending->next;
}
tracker->pending_count--;
free(pending);
*rtt_ns = received_ts_ns - sent_ts_ns;
if (*rtt_ns < 0) {
*rtt_ns = 0;
}
if (kcp_ping_tracker_add_sample(tracker, *rtt_ns) != 0) {
return -1;
}
*disposition = 0;
return 0;
}
static void kcp_ping_tracker_expire(kcp_ping_tracker_t *tracker, int64_t now_ns, FILE *out) {
kcp_pending_ping_t *prev = NULL;
kcp_pending_ping_t *cur = tracker->pending;
while (cur != NULL) {
if (cur->deadline_ns <= now_ns) {
kcp_pending_ping_t *next = cur->next;
fprintf(out, "seq=%" PRIu64 " timeout\n", cur->seq);
if (prev == NULL) {
tracker->pending = next;
} else {
prev->next = next;
}
free(cur);
tracker->pending_count--;
cur = next;
continue;
}
prev = cur;
cur = cur->next;
}
}
static int64_t kcp_ping_percentile_ns(const int64_t *sorted, size_t count, double percentile) {
size_t index;
double raw_index;
if (count == 0) {
return 0;
}
if (percentile <= 0.0) {
return sorted[0];
}
if (percentile >= 1.0) {
return sorted[count - 1];
}
raw_index = percentile * (double) count;
index = (size_t) raw_index;
if ((double) index < raw_index) {
index++;
}
if (index > 0) {
index--;
}
if (index >= count) {
index = count - 1;
}
return sorted[index];
}
static void kcp_ping_print_summary(FILE *out, const char *target, const kcp_ping_tracker_t *tracker) {
int received = (int) tracker->sample_count;
double loss_pct = tracker->sent == 0 ? 0.0 : ((double) (tracker->sent - received) * 100.0 / (double) tracker->sent);
fprintf(out, "--- %s kcp ping statistics ---\n", target);
fprintf(out, "%d packets transmitted, %d received, %d duplicates, %.2f%% packet loss\n", tracker->sent, received, tracker->duplicates, loss_pct);
if (tracker->sample_count == 0) {
fprintf(out, "rtt min/avg/max/p50/p95/p99 = n/a/n/a/n/a/n/a/n/a/n/a, stddev=n/a\n");
return;
}
{
int64_t *sorted = (int64_t *) malloc(tracker->sample_count * sizeof(*sorted));
size_t i;
double sum = 0.0;
double variance = 0.0;
double avg;
int64_t min_ns;
int64_t max_ns;
int64_t p50_ns;
int64_t p95_ns;
int64_t p99_ns;
if (sorted == NULL) {
fprintf(out, "rtt summary unavailable: memory allocation failed\n");
return;
}
memcpy(sorted, tracker->samples_ns, tracker->sample_count * sizeof(*sorted));
qsort(sorted, tracker->sample_count, sizeof(*sorted), kcp_ping_compare_i64);
for (i = 0; i < tracker->sample_count; ++i) {
sum += (double) sorted[i];
}
avg = sum / (double) tracker->sample_count;
for (i = 0; i < tracker->sample_count; ++i) {
double delta = (double) sorted[i] - avg;
variance += delta * delta;
}
variance /= (double) tracker->sample_count;
min_ns = sorted[0];
max_ns = sorted[tracker->sample_count - 1];
p50_ns = kcp_ping_percentile_ns(sorted, tracker->sample_count, 0.50);
p95_ns = kcp_ping_percentile_ns(sorted, tracker->sample_count, 0.95);
p99_ns = kcp_ping_percentile_ns(sorted, tracker->sample_count, 0.99);
fprintf(
out,
"rtt min/avg/max/p50/p95/p99 = %.2fms/%.2fms/%.2fms/%.2fms/%.2fms/%.2fms, stddev=%.2fms\n",
(double) min_ns / 1000000.0,
avg / 1000000.0,
(double) max_ns / 1000000.0,
(double) p50_ns / 1000000.0,
(double) p95_ns / 1000000.0,
(double) p99_ns / 1000000.0,
kcp_ping_sqrt(variance) / 1000000.0
);
free(sorted);
}
}
static int kcp_ping_expiry_poll_ms(int timeout_ms) {
int interval = timeout_ms / 4;
if (interval < 10) {
return 10;
}
if (interval > 100) {
return 100;
}
return interval;
}
int main(int argc, char **argv) {
const char *peer_id = "pinger";
const char *server_addr = "127.0.0.1:9002";
const char *target_peer = "";
const char *bind_ip = "";
const char *bind_device = "";
const char *latency_log_path = "";
int echo_mode = 0;
int count = 100;
int interval_ms = 100;
int size = 64;
int timeout_ms = 3000;
latency_logger_t *latency_logger = NULL;
kcp_client_t *client = NULL;
kcp_ping_receiver_ctx_t receiver_ctx;
pthread_t receiver_thread;
int receiver_ctx_initialized = 0;
int receiver_thread_started = 0;
kcp_ping_tracker_t tracker;
int i;
int rc = 1;
kcp_ping_tracker_init(&tracker);
memset(&receiver_ctx, 0, sizeof(receiver_ctx));
for (i = 1; i < argc; ++i) {
const char *value = NULL;
int handled;
if ((handled = cli_parse_value_flag(argc, argv, &i, argv[i], "-id", &value)) < 0) {
fprintf(stderr, "kcpping: flag -id requires a value\n");
return 1;
} else if (handled) {
peer_id = value;
continue;
}
if ((handled = cli_parse_value_flag(argc, argv, &i, argv[i], "-server", &value)) < 0) {
fprintf(stderr, "kcpping: flag -server requires a value\n");
return 1;
} else if (handled) {
server_addr = value;
continue;
}
if ((handled = cli_parse_value_flag(argc, argv, &i, argv[i], "-to", &value)) < 0) {
fprintf(stderr, "kcpping: flag -to requires a value\n");
return 1;
} else if (handled) {
target_peer = value;
continue;
}
if ((handled = cli_parse_value_flag(argc, argv, &i, argv[i], "-count", &value)) < 0) {
fprintf(stderr, "kcpping: flag -count requires a value\n");
return 1;
} else if (handled) {
count = atoi(value);
continue;
}
if ((handled = cli_parse_value_flag(argc, argv, &i, argv[i], "-interval", &value)) < 0) {
fprintf(stderr, "kcpping: flag -interval requires a value\n");
return 1;
} else if (handled) {
if (omni_parse_duration_ms(value, interval_ms, &interval_ms) != 0) {
fprintf(stderr, "kcpping: invalid -interval value %s\n", value);
return 1;
}
continue;
}
if ((handled = cli_parse_value_flag(argc, argv, &i, argv[i], "-size", &value)) < 0) {
fprintf(stderr, "kcpping: flag -size requires a value\n");
return 1;
} else if (handled) {
size = atoi(value);
continue;
}
if ((handled = cli_parse_value_flag(argc, argv, &i, argv[i], "-timeout", &value)) < 0) {
fprintf(stderr, "kcpping: flag -timeout requires a value\n");
return 1;
} else if (handled) {
if (omni_parse_duration_ms(value, timeout_ms, &timeout_ms) != 0) {
fprintf(stderr, "kcpping: invalid -timeout value %s\n", value);
return 1;
}
continue;
}
if ((handled = cli_parse_value_flag(argc, argv, &i, argv[i], "-bind-ip", &value)) < 0) {
fprintf(stderr, "kcpping: flag -bind-ip requires a value\n");
return 1;
} else if (handled) {
bind_ip = value;
continue;
}
if ((handled = cli_parse_value_flag(argc, argv, &i, argv[i], "-bind-device", &value)) < 0) {
fprintf(stderr, "kcpping: flag -bind-device requires a value\n");
return 1;
} else if (handled) {
bind_device = value;
continue;
}
if ((handled = cli_parse_value_flag(argc, argv, &i, argv[i], "-latency-log", &value)) < 0) {
fprintf(stderr, "kcpping: flag -latency-log requires a value\n");
return 1;
} else if (handled) {
latency_log_path = value;
continue;
}
if ((handled = cli_parse_bool_flag(argv[i], "-echo", &echo_mode)) < 0) {
fprintf(stderr, "kcpping: invalid -echo value\n");
return 1;
} else if (handled) {
continue;
}
if (strcmp(argv[i], "-h") == 0 || strcmp(argv[i], "--help") == 0) {
kcpping_usage(stdout);
return 0;
}
fprintf(stderr, "kcpping: unknown argument %s\n", argv[i]);
kcpping_usage(stderr);
return 1;
}
if (peer_id[0] == '\0' || server_addr[0] == '\0') {
fprintf(stderr, "kcpping: flags -id and -server are required\n");
return 1;
}
if (!echo_mode && target_peer[0] == '\0') {
fprintf(stderr, "kcpping: flag -to is required unless -echo is set\n");
return 1;
}
if (count < 0 || interval_ms <= 0 || size <= 0 || timeout_ms <= 0) {
fprintf(stderr, "kcpping: invalid numeric flag value\n");
return 1;
}
signal(SIGINT, kcpping_on_signal);
if (latency_log_path[0] != '\0') {
latency_logger = latencylog_open_jsonl(latency_log_path);
if (latency_logger == NULL) {
fprintf(stderr, "kcpping: open latency logger %s failed\n", latency_log_path);
goto cleanup;
}
}
client = kcp_client_dial(server_addr, NULL, peer_id, bind_ip, bind_device, latency_logger, NULL, NULL, KCP_DEFAULT_STATS_INTERVAL_MS);
if (client == NULL) {
fprintf(stderr, "kcpping: dial kcp server %s failed\n", server_addr);
goto cleanup;
}
if (echo_mode) {
while (!g_kcpping_stop) {
message_t msg;
protocol_message_init(&msg);
if (kcp_client_receive(client, &msg) != 0) {
protocol_message_clear(&msg);
if (g_kcpping_stop) {
break;
}
fprintf(stderr, "kcpping: receive failed in echo mode\n");
goto cleanup;
}
if (msg.type == MSG_TYPE_TEXT) {
char *text = (char *) malloc(msg.body_len + 1U);
if (text == NULL) {
protocol_message_clear(&msg);
goto cleanup;
}
memcpy(text, msg.body, msg.body_len);
text[msg.body_len] = '\0';
if (kcp_client_send_text(client, msg.from, text) != 0) {
free(text);
protocol_message_clear(&msg);
fprintf(stderr, "kcpping: echo send back to %s failed\n", msg.from);
goto cleanup;
}
free(text);
} else if (msg.type == MSG_TYPE_ERROR) {
fprintf(stderr, "server error: %.*s\n", (int) msg.body_len, msg.body == NULL ? "" : (const char *) msg.body);
} else {
fprintf(stderr, "unexpected message type %s from %s ignored\n", protocol_message_type_name(msg.type), msg.from);
}
protocol_message_clear(&msg);
}
rc = 0;
goto cleanup;
}
fprintf(stdout, "KCP PING %s via %s (payload=%d bytes, KCP)\n", target_peer, server_addr, size);
kcp_ping_receiver_ctx_init(&receiver_ctx, client);
receiver_ctx_initialized = 1;
if (pthread_create(&receiver_thread, NULL, kcpping_receive_thread_main, &receiver_ctx) != 0) {
fprintf(stderr, "kcpping: create receive thread failed\n");
goto cleanup;
}
receiver_thread_started = 1;
{
uint64_t next_seq = 1;
int stop_sending = 0;
int64_t next_send_at_ns = omni_now_unix_nano();
int poll_ms = kcp_ping_expiry_poll_ms(timeout_ms);
int64_t timeout_ns = (int64_t) timeout_ms * 1000000LL;
while (!g_kcpping_stop || tracker.pending_count > 0 || !stop_sending) {
int64_t now_ns = omni_now_unix_nano();
message_t msg;
int popped;
int receiver_closed;
int receiver_status_rc;
if (!stop_sending && now_ns >= next_send_at_ns) {
char *payload = NULL;
size_t payload_len = 0;
if (count > 0 && tracker.sent >= count) {
stop_sending = 1;
} else {
if (kcp_ping_build_payload(next_seq, now_ns, size, &payload, &payload_len) != 0) {
fprintf(stderr, "kcpping: build payload for seq=%" PRIu64 " failed\n", next_seq);
free(payload);
goto cleanup;
}
if (kcp_client_send_text(client, target_peer, payload) != 0) {
fprintf(stderr, "kcpping: send ping seq=%" PRIu64 " failed\n", next_seq);
free(payload);
goto cleanup;
}
free(payload);
if (kcp_ping_tracker_mark_sent(&tracker, next_seq, now_ns, timeout_ns) != 0) {
goto cleanup;
}
next_seq++;
next_send_at_ns = now_ns + (int64_t) interval_ms * 1000000LL;
if (count > 0 && tracker.sent >= count) {
stop_sending = 1;
}
}
}
kcp_ping_tracker_expire(&tracker, now_ns, stdout);
do {
popped = kcp_ping_receiver_pop(&receiver_ctx, &msg);
if (popped == 1) {
if (msg.type == MSG_TYPE_TEXT) {
uint64_t seq;
int64_t sent_ts_ns;
int disposition;
int64_t rtt_ns;
if (kcp_ping_parse_payload(msg.body, msg.body_len, &seq, &sent_ts_ns) != 0) {
fprintf(stderr, "ignore non-ping text message from %s\n", msg.from);
} else if (kcp_ping_tracker_observe_reply(&tracker, seq, sent_ts_ns, omni_now_unix_nano(), &disposition, &rtt_ns) != 0) {
protocol_message_clear(&msg);
goto cleanup;
} else if (disposition == 0) {
fprintf(stdout, "seq=%" PRIu64 " rtt=%.2fms\n", seq, (double) rtt_ns / 1000000.0);
} else if (disposition == 1) {
fprintf(stderr, "seq=%" PRIu64 " duplicate or late reply ignored\n", seq);
} else {
fprintf(stderr, "seq=%" PRIu64 " unexpected reply ignored\n", seq);
}
} else if (msg.type == MSG_TYPE_ERROR) {
fprintf(stderr, "server error: %.*s\n", (int) msg.body_len, msg.body == NULL ? "" : (const char *) msg.body);
} else {
fprintf(stderr, "unexpected message type %s from %s ignored\n", protocol_message_type_name(msg.type), msg.from);
}
protocol_message_clear(&msg);
}
} while (popped == 1);
kcp_ping_receiver_status(&receiver_ctx, &receiver_closed, &receiver_status_rc);
if (receiver_closed && receiver_status_rc != 0) {
fprintf(stderr, "kcpping: receive loop failed\n");
goto cleanup;
}
if ((g_kcpping_stop || stop_sending) && tracker.pending_count == 0) {
break;
}
usleep((useconds_t) poll_ms * 1000U);
}
}
kcp_ping_print_summary(stdout, target_peer, &tracker);
rc = 0;
cleanup:
receiver_ctx.stop_requested = 1;
kcp_client_close(client);
if (receiver_thread_started) {
pthread_join(receiver_thread, NULL);
kcp_ping_receiver_ctx_destroy(&receiver_ctx);
} else if (receiver_ctx_initialized) {
kcp_ping_receiver_ctx_destroy(&receiver_ctx);
}
kcp_client_free(client);
latencylog_close(latency_logger);
kcp_ping_tracker_destroy(&tracker);
return rc;
}

223
c/cmd/kcpserver.c Normal file
View File

@@ -0,0 +1,223 @@
#include "cli_parse.h"
#include "server_kcp_hub.h"
#include "server_udp_relay.h"
static void kcpserver_usage(FILE *out) {
fprintf(out, "usage: kcpserver [-mode hub|relay] [-listen addr] [-bind-device dev]\n");
fprintf(out, " [-latency-log path] [-kcp-ts-debug-log path]\n");
fprintf(out, " [-kcp-session-stats-log path] [-kcp-session-stats-interval 100ms]\n");
fprintf(out, " [-relay-remote addr] [-relay-listen addr] [-relay-peer addr]\n");
}
int main(int argc, char **argv) {
const char *mode = "hub";
const char *listen_addr = ":9002";
const char *bind_device = "";
const char *latency_log_path = "";
const char *packet_log_path = "";
const char *stats_log_path = "";
const char *stats_interval_raw = "";
const char *relay_listen_alias = "";
const char *relay_remote_addr = "";
const char *relay_peer_alias = "";
int stats_interval_ms = KCP_DEFAULT_STATS_INTERVAL_MS;
int i;
int rc = 1;
latency_logger_t *latency_logger = NULL;
kcp_packet_debug_logger_t *packet_logger = NULL;
kcp_session_stats_logger_t *stats_logger = NULL;
kcp_listener_t *listener = NULL;
kcp_hub_t *hub = NULL;
udp_relay_t *relay = NULL;
for (i = 1; i < argc; ++i) {
const char *value = NULL;
int handled;
if ((handled = cli_parse_value_flag(argc, argv, &i, argv[i], "-mode", &value)) < 0) {
fprintf(stderr, "kcpserver: flag -mode requires a value\n");
return 1;
} else if (handled) {
mode = value;
continue;
}
if ((handled = cli_parse_value_flag(argc, argv, &i, argv[i], "-listen", &value)) < 0) {
fprintf(stderr, "kcpserver: flag -listen requires a value\n");
return 1;
} else if (handled) {
listen_addr = value;
continue;
}
if ((handled = cli_parse_value_flag(argc, argv, &i, argv[i], "-bind-device", &value)) < 0) {
fprintf(stderr, "kcpserver: flag -bind-device requires a value\n");
return 1;
} else if (handled) {
bind_device = value;
continue;
}
if ((handled = cli_parse_value_flag(argc, argv, &i, argv[i], "-latency-log", &value)) < 0) {
fprintf(stderr, "kcpserver: flag -latency-log requires a value\n");
return 1;
} else if (handled) {
latency_log_path = value;
continue;
}
if ((handled = cli_parse_value_flag(argc, argv, &i, argv[i], "-kcp-ts-debug-log", &value)) < 0) {
fprintf(stderr, "kcpserver: flag -kcp-ts-debug-log requires a value\n");
return 1;
} else if (handled) {
packet_log_path = value;
continue;
}
if ((handled = cli_parse_value_flag(argc, argv, &i, argv[i], "-kcp-session-stats-log", &value)) < 0) {
fprintf(stderr, "kcpserver: flag -kcp-session-stats-log requires a value\n");
return 1;
} else if (handled) {
stats_log_path = value;
continue;
}
if ((handled = cli_parse_value_flag(argc, argv, &i, argv[i], "-kcp-session-stats-interval", &value)) < 0) {
fprintf(stderr, "kcpserver: flag -kcp-session-stats-interval requires a value\n");
return 1;
} else if (handled) {
stats_interval_raw = value;
continue;
}
if ((handled = cli_parse_value_flag(argc, argv, &i, argv[i], "-relay-listen", &value)) < 0) {
fprintf(stderr, "kcpserver: flag -relay-listen requires a value\n");
return 1;
} else if (handled) {
relay_listen_alias = value;
continue;
}
if ((handled = cli_parse_value_flag(argc, argv, &i, argv[i], "-relay-remote", &value)) < 0) {
fprintf(stderr, "kcpserver: flag -relay-remote requires a value\n");
return 1;
} else if (handled) {
relay_remote_addr = value;
continue;
}
if ((handled = cli_parse_value_flag(argc, argv, &i, argv[i], "-relay-peer", &value)) < 0) {
fprintf(stderr, "kcpserver: flag -relay-peer requires a value\n");
return 1;
} else if (handled) {
relay_peer_alias = value;
continue;
}
if (strcmp(argv[i], "-h") == 0 || strcmp(argv[i], "--help") == 0) {
kcpserver_usage(stdout);
return 0;
}
fprintf(stderr, "kcpserver: unknown argument %s\n", argv[i]);
kcpserver_usage(stderr);
return 1;
}
if (kcp_session_stats_parse_interval_ms(stats_interval_raw, &stats_interval_ms) != 0) {
fprintf(stderr, "kcpserver: invalid -kcp-session-stats-interval value %s\n", stats_interval_raw);
return 1;
}
if (relay_peer_alias[0] != '\0' && relay_remote_addr[0] != '\0' && strcmp(relay_peer_alias, relay_remote_addr) != 0) {
fprintf(stderr, "kcpserver: flags -relay-remote and -relay-peer must match when both are set\n");
return 1;
}
if (relay_remote_addr[0] == '\0' && relay_peer_alias[0] != '\0') {
relay_remote_addr = relay_peer_alias;
}
if (relay_peer_alias[0] != '\0') {
fprintf(stderr, "warning: flag -relay-peer is deprecated; use -relay-remote instead\n");
}
if (relay_listen_alias[0] != '\0') {
if (strcmp(mode, "relay") != 0) {
fprintf(stderr, "kcpserver: flag -relay-listen may only be used in relay mode\n");
return 1;
}
if (listen_addr[0] != '\0' && strcmp(listen_addr, ":9002") != 0 && strcmp(listen_addr, relay_listen_alias) != 0) {
fprintf(stderr, "kcpserver: flags -listen and -relay-listen must match when both are set in relay mode\n");
return 1;
}
listen_addr = relay_listen_alias;
fprintf(stderr, "warning: flag -relay-listen is deprecated; use -listen with -mode=relay instead\n");
}
if (strcmp(mode, "hub") == 0) {
if (relay_remote_addr[0] != '\0') {
fprintf(stderr, "kcpserver: flag -relay-remote may only be used in relay mode\n");
return 1;
}
if (latency_log_path[0] != '\0') {
latency_logger = latencylog_open_jsonl(latency_log_path);
if (latency_logger == NULL) {
fprintf(stderr, "kcpserver: open latency logger %s failed\n", latency_log_path);
goto cleanup;
}
}
if (packet_log_path[0] != '\0') {
packet_logger = kcp_packet_debug_open_jsonl(packet_log_path);
if (packet_logger == NULL) {
fprintf(stderr, "kcpserver: open packet debug logger %s failed\n", packet_log_path);
goto cleanup;
}
}
if (stats_log_path[0] != '\0') {
stats_logger = kcp_session_stats_open_jsonl(stats_log_path);
if (stats_logger == NULL) {
fprintf(stderr, "kcpserver: open session stats logger %s failed\n", stats_log_path);
goto cleanup;
}
}
listener = kcp_listener_listen(listen_addr, bind_device, packet_logger, OMNI_NODE_ROLE_SERVER, "hub");
if (listener == NULL) {
fprintf(stderr, "kcpserver: listen on %s failed\n", listen_addr);
goto cleanup;
}
hub = kcp_hub_new(latency_logger, stats_logger, stats_interval_ms);
if (hub == NULL) {
fprintf(stderr, "kcpserver: create hub failed\n");
goto cleanup;
}
fprintf(stderr, "kcp hub listening on %s\n", listen_addr);
if (kcp_hub_serve_listener(hub, listener) != 0) {
fprintf(stderr, "kcpserver: serve listener failed\n");
goto cleanup;
}
rc = 0;
goto cleanup;
}
if (strcmp(mode, "relay") == 0) {
if (bind_device[0] != '\0') {
fprintf(stderr, "kcpserver: flag -bind-device is not supported in relay mode\n");
return 1;
}
if (relay_remote_addr[0] == '\0') {
fprintf(stderr, "kcpserver: flag -relay-remote is required in relay mode\n");
return 1;
}
relay = udp_relay_open(listen_addr, relay_remote_addr);
if (relay == NULL) {
fprintf(stderr, "kcpserver: open udp relay %s -> %s failed\n", listen_addr, relay_remote_addr);
goto cleanup;
}
fprintf(stderr, "udp relay listening on %s and forwarding to %s\n", listen_addr, relay_remote_addr);
if (udp_relay_serve(relay) != 0) {
fprintf(stderr, "kcpserver: udp relay stopped with error\n");
goto cleanup;
}
rc = 0;
goto cleanup;
}
fprintf(stderr, "kcpserver: unsupported -mode=%s; want hub or relay\n", mode);
cleanup:
udp_relay_free(relay);
kcp_hub_free(hub);
kcp_listener_free(listener);
kcp_session_stats_close(stats_logger);
kcp_packet_debug_close(packet_logger);
latencylog_close(latency_logger);
return rc;
}

282
c/cmd/udppeer.c Normal file
View File

@@ -0,0 +1,282 @@
#include "cli_parse.h"
#include "interactive.h"
#include "peer_udp_client.h"
#include <pthread.h>
typedef struct udppeer_receive_ctx {
udp_client_t *client;
const char *inbox_dir;
volatile int stop_requested;
int rc;
} udppeer_receive_ctx_t;
static void udppeer_usage(FILE *out) {
fprintf(out, "usage: udppeer [-id peer-a] [-server 127.0.0.1:9001] [-to peer] [-text msg | -file path]\n");
fprintf(out, " [-bind-ip ip] [-inbox-dir dir] [-latency-log path] [-tx-ts-debug-log path]\n");
fprintf(out, " [-interactive[=true|false]]\n");
}
static void *udppeer_receive_thread_main(void *arg) {
udppeer_receive_ctx_t *ctx = (udppeer_receive_ctx_t *) arg;
for (;;) {
message_t msg;
char persisted_path[512];
protocol_message_init(&msg);
if (udp_client_receive(ctx->client, &msg) != 0) {
protocol_message_clear(&msg);
ctx->rc = ctx->stop_requested ? 0 : -1;
return NULL;
}
switch (msg.type) {
case MSG_TYPE_TEXT:
if (udp_client_persist_message(ctx->client, &msg, ctx->inbox_dir, persisted_path, sizeof(persisted_path)) != 0) {
fprintf(stderr, "udppeer: persist text from %s to %s failed\n", msg.from, msg.to);
protocol_message_clear(&msg);
ctx->rc = -1;
return NULL;
}
fprintf(stderr, "received text from %s to %s and persisted to %s\n", msg.from, msg.to, persisted_path);
break;
case MSG_TYPE_FILE:
if (udp_client_persist_message(ctx->client, &msg, ctx->inbox_dir, persisted_path, sizeof(persisted_path)) != 0) {
fprintf(stderr, "udppeer: persist file from %s to %s failed\n", msg.from, msg.to);
protocol_message_clear(&msg);
ctx->rc = -1;
return NULL;
}
fprintf(stderr, "received file from %s to %s: %s (%lu bytes) -> %s\n", msg.from, msg.to, msg.file_name, (unsigned long) msg.body_len, persisted_path);
break;
case MSG_TYPE_ERROR:
fprintf(stderr, "received error from %s to %s: %.*s\n", msg.from, msg.to, (int) msg.body_len, msg.body == NULL ? "" : (const char *) msg.body);
break;
default:
fprintf(stderr, "received unexpected message type %s from %s\n", protocol_message_type_name(msg.type), msg.from);
protocol_message_clear(&msg);
ctx->rc = -1;
return NULL;
}
protocol_message_clear(&msg);
}
}
int main(int argc, char **argv) {
const char *peer_id = "peer-a";
const char *server_addr = "127.0.0.1:9001";
const char *target_peer = "";
const char *text = "";
const char *file_path = "";
const char *bind_ip = "";
const char *inbox_dir = "inbox";
const char *latency_log_path = "";
const char *tx_debug_log_path = "";
int interactive = 1;
latency_logger_t *latency_logger = NULL;
tx_timestamp_debug_logger_t *debug_logger = NULL;
udp_client_t *client = NULL;
udppeer_receive_ctx_t receive_ctx;
pthread_t receive_thread;
int receive_thread_started = 0;
int i;
int rc = 1;
memset(&receive_ctx, 0, sizeof(receive_ctx));
for (i = 1; i < argc; ++i) {
const char *value = NULL;
int handled;
if ((handled = cli_parse_value_flag(argc, argv, &i, argv[i], "-id", &value)) < 0) {
fprintf(stderr, "udppeer: flag -id requires a value\n");
return 1;
} else if (handled) {
peer_id = value;
continue;
}
if ((handled = cli_parse_value_flag(argc, argv, &i, argv[i], "-server", &value)) < 0) {
fprintf(stderr, "udppeer: flag -server requires a value\n");
return 1;
} else if (handled) {
server_addr = value;
continue;
}
if ((handled = cli_parse_value_flag(argc, argv, &i, argv[i], "-to", &value)) < 0) {
fprintf(stderr, "udppeer: flag -to requires a value\n");
return 1;
} else if (handled) {
target_peer = value;
continue;
}
if ((handled = cli_parse_value_flag(argc, argv, &i, argv[i], "-text", &value)) < 0) {
fprintf(stderr, "udppeer: flag -text requires a value\n");
return 1;
} else if (handled) {
text = value;
continue;
}
if ((handled = cli_parse_value_flag(argc, argv, &i, argv[i], "-file", &value)) < 0) {
fprintf(stderr, "udppeer: flag -file requires a value\n");
return 1;
} else if (handled) {
file_path = value;
continue;
}
if ((handled = cli_parse_value_flag(argc, argv, &i, argv[i], "-bind-ip", &value)) < 0) {
fprintf(stderr, "udppeer: flag -bind-ip requires a value\n");
return 1;
} else if (handled) {
bind_ip = value;
continue;
}
if ((handled = cli_parse_value_flag(argc, argv, &i, argv[i], "-inbox-dir", &value)) < 0) {
fprintf(stderr, "udppeer: flag -inbox-dir requires a value\n");
return 1;
} else if (handled) {
inbox_dir = value;
continue;
}
if ((handled = cli_parse_value_flag(argc, argv, &i, argv[i], "-latency-log", &value)) < 0) {
fprintf(stderr, "udppeer: flag -latency-log requires a value\n");
return 1;
} else if (handled) {
latency_log_path = value;
continue;
}
if ((handled = cli_parse_value_flag(argc, argv, &i, argv[i], "-tx-ts-debug-log", &value)) < 0) {
fprintf(stderr, "udppeer: flag -tx-ts-debug-log requires a value\n");
return 1;
} else if (handled) {
tx_debug_log_path = value;
continue;
}
if ((handled = cli_parse_bool_flag(argv[i], "-interactive", &interactive)) < 0) {
fprintf(stderr, "udppeer: invalid -interactive value\n");
return 1;
} else if (handled) {
continue;
}
if (strcmp(argv[i], "-h") == 0 || strcmp(argv[i], "--help") == 0) {
udppeer_usage(stdout);
return 0;
}
fprintf(stderr, "udppeer: unknown argument %s\n", argv[i]);
udppeer_usage(stderr);
return 1;
}
if (text[0] != '\0' && file_path[0] != '\0') {
fprintf(stderr, "udppeer: only one of -text or -file may be specified\n");
return 1;
}
if ((text[0] != '\0' || file_path[0] != '\0') && target_peer[0] == '\0') {
fprintf(stderr, "udppeer: flag -to is required when sending text or file\n");
return 1;
}
if (latency_log_path[0] != '\0') {
latency_logger = latencylog_open_jsonl(latency_log_path);
if (latency_logger == NULL) {
fprintf(stderr, "udppeer: open latency logger %s failed\n", latency_log_path);
goto cleanup;
}
}
if (tx_debug_log_path[0] != '\0') {
debug_logger = tx_timestamp_debug_open_jsonl(tx_debug_log_path);
if (debug_logger == NULL) {
fprintf(stderr, "udppeer: open tx timestamp debug logger %s failed\n", tx_debug_log_path);
goto cleanup;
}
}
client = udp_client_dial(server_addr, peer_id, bind_ip, latency_logger, debug_logger, tx_debug_log_path[0] != '\0');
if (client == NULL) {
fprintf(stderr, "udppeer: dial udp server %s failed\n", server_addr);
goto cleanup;
}
fprintf(stderr, "connected to %s as %s (UDP)\n", server_addr, udp_client_id(client));
receive_ctx.client = client;
receive_ctx.inbox_dir = inbox_dir;
if (pthread_create(&receive_thread, NULL, udppeer_receive_thread_main, &receive_ctx) != 0) {
fprintf(stderr, "udppeer: create receive thread failed\n");
goto cleanup;
}
receive_thread_started = 1;
if (target_peer[0] != '\0' && text[0] != '\0') {
if (udp_client_send_text(client, target_peer, text) != 0) {
fprintf(stderr, "udppeer: send text to %s failed\n", target_peer);
goto cleanup;
}
fprintf(stderr, "sent text to %s\n", target_peer);
}
if (target_peer[0] != '\0' && file_path[0] != '\0') {
if (udp_client_send_file_path(client, target_peer, file_path) != 0) {
fprintf(stderr, "udppeer: send file %s to %s failed\n", file_path, target_peer);
goto cleanup;
}
fprintf(stderr, "sent file %s to %s\n", file_path, target_peer);
}
if (interactive) {
char line[2048];
char prompt[128];
snprintf(prompt, sizeof(prompt), "%s> ", udp_client_id(client));
interactive_print_help(stdout, "UDP");
while (fputs(prompt, stdout) >= 0 && fflush(stdout) == 0 && fgets(line, sizeof(line), stdin) != NULL) {
interactive_command_t command;
char err[128];
omni_trim_newline(line);
if (interactive_parse_command(line, &command, err, sizeof(err)) != 0) {
if (strstr(err, "empty command") == NULL) {
fprintf(stderr, "%s\n", err);
}
continue;
}
if (command.type == INTERACTIVE_CMD_HELP) {
interactive_print_help(stdout, "UDP");
continue;
}
if (command.type == INTERACTIVE_CMD_QUIT) {
break;
}
if (command.type == INTERACTIVE_CMD_TEXT) {
if (udp_client_send_text(client, command.to, command.value) != 0) {
fprintf(stderr, "udppeer: send text to %s failed\n", command.to);
continue;
}
fprintf(stderr, "sent text to %s\n", command.to);
continue;
}
if (command.type == INTERACTIVE_CMD_FILE) {
if (udp_client_send_file_path(client, command.to, command.value) != 0) {
fprintf(stderr, "udppeer: send file %s to %s failed\n", command.value, command.to);
continue;
}
fprintf(stderr, "sent file %s to %s\n", command.value, command.to);
continue;
}
}
}
rc = 0;
cleanup:
receive_ctx.stop_requested = 1;
udp_client_close(client);
if (receive_thread_started) {
pthread_join(receive_thread, NULL);
if (rc == 0 && receive_ctx.rc != 0) {
rc = 1;
}
}
udp_client_free(client);
tx_timestamp_debug_close(debug_logger);
latencylog_close(latency_logger);
return rc;
}

780
c/cmd/udpping.c Normal file
View File

@@ -0,0 +1,780 @@
#include "cli_parse.h"
#include "peer_udp_client.h"
#include "cJSON.h"
#include <signal.h>
#include <unistd.h>
typedef struct ping_message_node {
struct ping_message_node *next;
message_t msg;
} ping_message_node_t;
typedef struct ping_receiver_ctx {
udp_client_t *client;
pthread_mutex_t mu;
ping_message_node_t *head;
ping_message_node_t *tail;
volatile int stop_requested;
int closed;
int rc;
} ping_receiver_ctx_t;
typedef struct pending_ping {
struct pending_ping *next;
uint64_t seq;
int64_t deadline_ns;
} pending_ping_t;
typedef struct ping_tracker {
pending_ping_t *pending;
int pending_count;
int sent;
int duplicates;
uint64_t max_seq_sent;
int64_t *samples_ns;
size_t sample_count;
size_t sample_cap;
} ping_tracker_t;
static volatile sig_atomic_t g_udpping_stop = 0;
static void udpping_on_signal(int signo) {
(void) signo;
g_udpping_stop = 1;
}
static void udpping_usage(FILE *out) {
fprintf(out, "usage: udpping [-id pinger] [-server 127.0.0.1:9001] [-to peer] [-echo]\n");
fprintf(out, " [-count 100] [-interval 100ms] [-size 64] [-timeout 3s]\n");
fprintf(out, " [-bind-ip ip] [-latency-log path]\n");
}
static int ping_compare_i64(const void *left, const void *right) {
const int64_t *a = (const int64_t *) left;
const int64_t *b = (const int64_t *) right;
if (*a < *b) {
return -1;
}
if (*a > *b) {
return 1;
}
return 0;
}
static double ping_sqrt(double value) {
double x = value;
int i;
if (value <= 0.0) {
return 0.0;
}
if (x < 1.0) {
x = 1.0;
}
for (i = 0; i < 16; ++i) {
x = 0.5 * (x + value / x);
}
return x;
}
static int ping_build_payload(uint64_t seq, int64_t ts_ns, int size, char **out_body, size_t *out_len) {
cJSON *root = NULL;
char *json = NULL;
char *pad = NULL;
size_t base_len;
size_t pad_len;
*out_body = NULL;
*out_len = 0;
root = cJSON_CreateObject();
if (root == NULL) {
return -1;
}
cJSON_AddNumberToObject(root, "seq", (double) seq);
cJSON_AddNumberToObject(root, "ts_ns", (double) ts_ns);
cJSON_AddStringToObject(root, "pad", "");
json = cJSON_PrintUnformatted(root);
cJSON_Delete(root);
if (json == NULL) {
return -1;
}
base_len = strlen(json);
cJSON_free(json);
if ((int) base_len > size) {
errno = EMSGSIZE;
return -1;
}
pad_len = (size_t) size - base_len;
pad = (char *) malloc(pad_len + 1U);
if (pad == NULL) {
return -1;
}
memset(pad, 'A', pad_len);
pad[pad_len] = '\0';
root = cJSON_CreateObject();
if (root == NULL) {
free(pad);
return -1;
}
cJSON_AddNumberToObject(root, "seq", (double) seq);
cJSON_AddNumberToObject(root, "ts_ns", (double) ts_ns);
cJSON_AddStringToObject(root, "pad", pad);
free(pad);
json = cJSON_PrintUnformatted(root);
cJSON_Delete(root);
if (json == NULL) {
return -1;
}
if ((int) strlen(json) != size) {
cJSON_free(json);
errno = EINVAL;
return -1;
}
*out_body = json;
*out_len = (size_t) size;
return 0;
}
static int ping_parse_payload(const uint8_t *body, size_t body_len, uint64_t *seq, int64_t *ts_ns) {
char *text;
cJSON *root;
const cJSON *seq_item;
const cJSON *ts_item;
if (body == NULL || seq == NULL || ts_ns == NULL) {
errno = EINVAL;
return -1;
}
text = (char *) malloc(body_len + 1U);
if (text == NULL) {
return -1;
}
memcpy(text, body, body_len);
text[body_len] = '\0';
root = cJSON_Parse(text);
free(text);
if (root == NULL) {
errno = EPROTO;
return -1;
}
seq_item = cJSON_GetObjectItemCaseSensitive(root, "seq");
ts_item = cJSON_GetObjectItemCaseSensitive(root, "ts_ns");
if (!cJSON_IsNumber(seq_item) || !cJSON_IsNumber(ts_item) || seq_item->valuedouble <= 0 || ts_item->valuedouble <= 0) {
cJSON_Delete(root);
errno = EPROTO;
return -1;
}
*seq = (uint64_t) seq_item->valuedouble;
*ts_ns = (int64_t) ts_item->valuedouble;
cJSON_Delete(root);
return 0;
}
static void ping_receiver_ctx_init(ping_receiver_ctx_t *ctx, udp_client_t *client) {
memset(ctx, 0, sizeof(*ctx));
ctx->client = client;
pthread_mutex_init(&ctx->mu, NULL);
}
static void ping_receiver_ctx_destroy(ping_receiver_ctx_t *ctx) {
ping_message_node_t *node;
ping_message_node_t *next;
if (ctx == NULL) {
return;
}
for (node = ctx->head; node != NULL; node = next) {
next = node->next;
protocol_message_clear(&node->msg);
free(node);
}
pthread_mutex_destroy(&ctx->mu);
}
static void *udpping_receive_thread_main(void *arg) {
ping_receiver_ctx_t *ctx = (ping_receiver_ctx_t *) arg;
for (;;) {
message_t msg;
ping_message_node_t *node;
protocol_message_init(&msg);
if (udp_client_receive(ctx->client, &msg) != 0) {
protocol_message_clear(&msg);
pthread_mutex_lock(&ctx->mu);
ctx->rc = ctx->stop_requested ? 0 : -1;
ctx->closed = 1;
pthread_mutex_unlock(&ctx->mu);
return NULL;
}
node = (ping_message_node_t *) calloc(1, sizeof(*node));
if (node == NULL) {
protocol_message_clear(&msg);
pthread_mutex_lock(&ctx->mu);
ctx->rc = -1;
ctx->closed = 1;
pthread_mutex_unlock(&ctx->mu);
return NULL;
}
node->msg = msg;
pthread_mutex_lock(&ctx->mu);
if (ctx->tail == NULL) {
ctx->head = node;
} else {
ctx->tail->next = node;
}
ctx->tail = node;
pthread_mutex_unlock(&ctx->mu);
}
}
static int ping_receiver_pop(ping_receiver_ctx_t *ctx, message_t *out_msg) {
ping_message_node_t *node;
pthread_mutex_lock(&ctx->mu);
node = ctx->head;
if (node != NULL) {
ctx->head = node->next;
if (ctx->head == NULL) {
ctx->tail = NULL;
}
}
pthread_mutex_unlock(&ctx->mu);
if (node == NULL) {
return 0;
}
*out_msg = node->msg;
free(node);
return 1;
}
static int ping_receiver_status(ping_receiver_ctx_t *ctx, int *closed, int *rc) {
pthread_mutex_lock(&ctx->mu);
*closed = ctx->closed;
*rc = ctx->rc;
pthread_mutex_unlock(&ctx->mu);
return 0;
}
static void ping_tracker_init(ping_tracker_t *tracker) {
memset(tracker, 0, sizeof(*tracker));
}
static void ping_tracker_destroy(ping_tracker_t *tracker) {
pending_ping_t *pending;
pending_ping_t *next;
for (pending = tracker->pending; pending != NULL; pending = next) {
next = pending->next;
free(pending);
}
free(tracker->samples_ns);
}
static int ping_tracker_mark_sent(ping_tracker_t *tracker, uint64_t seq, int64_t sent_at_ns, int64_t timeout_ns) {
pending_ping_t *pending = (pending_ping_t *) calloc(1, sizeof(*pending));
if (pending == NULL) {
return -1;
}
pending->seq = seq;
pending->deadline_ns = sent_at_ns + timeout_ns;
pending->next = tracker->pending;
tracker->pending = pending;
tracker->pending_count++;
tracker->sent++;
tracker->max_seq_sent = seq;
return 0;
}
static pending_ping_t *ping_tracker_find_pending(ping_tracker_t *tracker, uint64_t seq, pending_ping_t **out_prev) {
pending_ping_t *prev = NULL;
pending_ping_t *cur;
for (cur = tracker->pending; cur != NULL; cur = cur->next) {
if (cur->seq == seq) {
if (out_prev != NULL) {
*out_prev = prev;
}
return cur;
}
prev = cur;
}
if (out_prev != NULL) {
*out_prev = NULL;
}
return NULL;
}
static int ping_tracker_add_sample(ping_tracker_t *tracker, int64_t rtt_ns) {
int64_t *next_samples;
size_t next_cap;
if (tracker->sample_count == tracker->sample_cap) {
next_cap = tracker->sample_cap == 0 ? 16U : tracker->sample_cap * 2U;
next_samples = (int64_t *) realloc(tracker->samples_ns, next_cap * sizeof(*next_samples));
if (next_samples == NULL) {
return -1;
}
tracker->samples_ns = next_samples;
tracker->sample_cap = next_cap;
}
tracker->samples_ns[tracker->sample_count++] = rtt_ns;
return 0;
}
static int ping_tracker_observe_reply(ping_tracker_t *tracker, uint64_t seq, int64_t sent_ts_ns, int64_t received_ts_ns, int *disposition, int64_t *rtt_ns) {
pending_ping_t *prev = NULL;
pending_ping_t *pending;
if (seq == 0 || seq > tracker->max_seq_sent) {
*disposition = 2;
*rtt_ns = 0;
return 0;
}
pending = ping_tracker_find_pending(tracker, seq, &prev);
if (pending == NULL) {
tracker->duplicates++;
*disposition = 1;
*rtt_ns = 0;
return 0;
}
if (prev == NULL) {
tracker->pending = pending->next;
} else {
prev->next = pending->next;
}
tracker->pending_count--;
free(pending);
*rtt_ns = received_ts_ns - sent_ts_ns;
if (*rtt_ns < 0) {
*rtt_ns = 0;
}
if (ping_tracker_add_sample(tracker, *rtt_ns) != 0) {
return -1;
}
*disposition = 0;
return 0;
}
static void ping_tracker_expire(ping_tracker_t *tracker, int64_t now_ns, FILE *out) {
pending_ping_t *prev = NULL;
pending_ping_t *cur = tracker->pending;
while (cur != NULL) {
if (cur->deadline_ns <= now_ns) {
pending_ping_t *next = cur->next;
fprintf(out, "seq=%" PRIu64 " timeout\n", cur->seq);
if (prev == NULL) {
tracker->pending = next;
} else {
prev->next = next;
}
free(cur);
tracker->pending_count--;
cur = next;
continue;
}
prev = cur;
cur = cur->next;
}
}
static int64_t ping_percentile_ns(const int64_t *sorted, size_t count, double percentile) {
size_t index;
double raw_index;
if (count == 0) {
return 0;
}
if (percentile <= 0.0) {
return sorted[0];
}
if (percentile >= 1.0) {
return sorted[count - 1];
}
raw_index = percentile * (double) count;
index = (size_t) raw_index;
if ((double) index < raw_index) {
index++;
}
if (index > 0) {
index--;
}
if (index >= count) {
index = count - 1;
}
return sorted[index];
}
static void ping_print_summary(FILE *out, const char *target, const ping_tracker_t *tracker) {
int received = (int) tracker->sample_count;
double loss_pct = tracker->sent == 0 ? 0.0 : ((double) (tracker->sent - received) * 100.0 / (double) tracker->sent);
fprintf(out, "--- %s udp ping statistics ---\n", target);
fprintf(out, "%d packets transmitted, %d received, %d duplicates, %.2f%% packet loss\n", tracker->sent, received, tracker->duplicates, loss_pct);
if (tracker->sample_count == 0) {
fprintf(out, "rtt min/avg/max/p50/p95/p99 = n/a/n/a/n/a/n/a/n/a/n/a, stddev=n/a\n");
return;
}
{
int64_t *sorted = (int64_t *) malloc(tracker->sample_count * sizeof(*sorted));
size_t i;
double sum = 0.0;
double variance = 0.0;
double avg;
int64_t min_ns;
int64_t max_ns;
int64_t p50_ns;
int64_t p95_ns;
int64_t p99_ns;
if (sorted == NULL) {
fprintf(out, "rtt summary unavailable: memory allocation failed\n");
return;
}
memcpy(sorted, tracker->samples_ns, tracker->sample_count * sizeof(*sorted));
qsort(sorted, tracker->sample_count, sizeof(*sorted), ping_compare_i64);
for (i = 0; i < tracker->sample_count; ++i) {
sum += (double) sorted[i];
}
avg = sum / (double) tracker->sample_count;
for (i = 0; i < tracker->sample_count; ++i) {
double delta = (double) sorted[i] - avg;
variance += delta * delta;
}
variance /= (double) tracker->sample_count;
min_ns = sorted[0];
max_ns = sorted[tracker->sample_count - 1];
p50_ns = ping_percentile_ns(sorted, tracker->sample_count, 0.50);
p95_ns = ping_percentile_ns(sorted, tracker->sample_count, 0.95);
p99_ns = ping_percentile_ns(sorted, tracker->sample_count, 0.99);
fprintf(
out,
"rtt min/avg/max/p50/p95/p99 = %.2fms/%.2fms/%.2fms/%.2fms/%.2fms/%.2fms, stddev=%.2fms\n",
(double) min_ns / 1000000.0,
avg / 1000000.0,
(double) max_ns / 1000000.0,
(double) p50_ns / 1000000.0,
(double) p95_ns / 1000000.0,
(double) p99_ns / 1000000.0,
ping_sqrt(variance) / 1000000.0
);
free(sorted);
}
}
static int ping_expiry_poll_ms(int timeout_ms) {
int interval = timeout_ms / 4;
if (interval < 10) {
return 10;
}
if (interval > 100) {
return 100;
}
return interval;
}
int main(int argc, char **argv) {
const char *peer_id = "pinger";
const char *server_addr = "127.0.0.1:9001";
const char *target_peer = "";
const char *bind_ip = "";
const char *latency_log_path = "";
int echo_mode = 0;
int count = 100;
int interval_ms = 100;
int size = 64;
int timeout_ms = 3000;
latency_logger_t *latency_logger = NULL;
udp_client_t *client = NULL;
ping_receiver_ctx_t receiver_ctx;
pthread_t receiver_thread;
int receiver_ctx_initialized = 0;
int receiver_thread_started = 0;
ping_tracker_t tracker;
int i;
int rc = 1;
ping_tracker_init(&tracker);
memset(&receiver_ctx, 0, sizeof(receiver_ctx));
for (i = 1; i < argc; ++i) {
const char *value = NULL;
int handled;
if ((handled = cli_parse_value_flag(argc, argv, &i, argv[i], "-id", &value)) < 0) {
fprintf(stderr, "udpping: flag -id requires a value\n");
return 1;
} else if (handled) {
peer_id = value;
continue;
}
if ((handled = cli_parse_value_flag(argc, argv, &i, argv[i], "-server", &value)) < 0) {
fprintf(stderr, "udpping: flag -server requires a value\n");
return 1;
} else if (handled) {
server_addr = value;
continue;
}
if ((handled = cli_parse_value_flag(argc, argv, &i, argv[i], "-to", &value)) < 0) {
fprintf(stderr, "udpping: flag -to requires a value\n");
return 1;
} else if (handled) {
target_peer = value;
continue;
}
if ((handled = cli_parse_value_flag(argc, argv, &i, argv[i], "-count", &value)) < 0) {
fprintf(stderr, "udpping: flag -count requires a value\n");
return 1;
} else if (handled) {
count = atoi(value);
continue;
}
if ((handled = cli_parse_value_flag(argc, argv, &i, argv[i], "-interval", &value)) < 0) {
fprintf(stderr, "udpping: flag -interval requires a value\n");
return 1;
} else if (handled) {
if (omni_parse_duration_ms(value, interval_ms, &interval_ms) != 0) {
fprintf(stderr, "udpping: invalid -interval value %s\n", value);
return 1;
}
continue;
}
if ((handled = cli_parse_value_flag(argc, argv, &i, argv[i], "-size", &value)) < 0) {
fprintf(stderr, "udpping: flag -size requires a value\n");
return 1;
} else if (handled) {
size = atoi(value);
continue;
}
if ((handled = cli_parse_value_flag(argc, argv, &i, argv[i], "-timeout", &value)) < 0) {
fprintf(stderr, "udpping: flag -timeout requires a value\n");
return 1;
} else if (handled) {
if (omni_parse_duration_ms(value, timeout_ms, &timeout_ms) != 0) {
fprintf(stderr, "udpping: invalid -timeout value %s\n", value);
return 1;
}
continue;
}
if ((handled = cli_parse_value_flag(argc, argv, &i, argv[i], "-bind-ip", &value)) < 0) {
fprintf(stderr, "udpping: flag -bind-ip requires a value\n");
return 1;
} else if (handled) {
bind_ip = value;
continue;
}
if ((handled = cli_parse_value_flag(argc, argv, &i, argv[i], "-latency-log", &value)) < 0) {
fprintf(stderr, "udpping: flag -latency-log requires a value\n");
return 1;
} else if (handled) {
latency_log_path = value;
continue;
}
if ((handled = cli_parse_bool_flag(argv[i], "-echo", &echo_mode)) < 0) {
fprintf(stderr, "udpping: invalid -echo value\n");
return 1;
} else if (handled) {
continue;
}
if (strcmp(argv[i], "-h") == 0 || strcmp(argv[i], "--help") == 0) {
udpping_usage(stdout);
return 0;
}
fprintf(stderr, "udpping: unknown argument %s\n", argv[i]);
udpping_usage(stderr);
return 1;
}
if (peer_id[0] == '\0' || server_addr[0] == '\0') {
fprintf(stderr, "udpping: flags -id and -server are required\n");
return 1;
}
if (!echo_mode && target_peer[0] == '\0') {
fprintf(stderr, "udpping: flag -to is required unless -echo is set\n");
return 1;
}
if (count < 0 || interval_ms <= 0 || size <= 0 || timeout_ms <= 0) {
fprintf(stderr, "udpping: invalid numeric flag value\n");
return 1;
}
signal(SIGINT, udpping_on_signal);
if (latency_log_path[0] != '\0') {
latency_logger = latencylog_open_jsonl(latency_log_path);
if (latency_logger == NULL) {
fprintf(stderr, "udpping: open latency logger %s failed\n", latency_log_path);
goto cleanup;
}
}
client = udp_client_dial(server_addr, peer_id, bind_ip, latency_logger, NULL, 0);
if (client == NULL) {
fprintf(stderr, "udpping: dial udp server %s failed\n", server_addr);
goto cleanup;
}
if (echo_mode) {
while (!g_udpping_stop) {
message_t msg;
protocol_message_init(&msg);
if (udp_client_receive(client, &msg) != 0) {
protocol_message_clear(&msg);
if (g_udpping_stop) {
break;
}
fprintf(stderr, "udpping: receive failed in echo mode\n");
goto cleanup;
}
if (msg.type == MSG_TYPE_TEXT) {
char *text = (char *) malloc(msg.body_len + 1U);
if (text == NULL) {
protocol_message_clear(&msg);
goto cleanup;
}
memcpy(text, msg.body, msg.body_len);
text[msg.body_len] = '\0';
if (udp_client_send_text(client, msg.from, text) != 0) {
free(text);
protocol_message_clear(&msg);
fprintf(stderr, "udpping: echo send back to %s failed\n", msg.from);
goto cleanup;
}
free(text);
} else if (msg.type == MSG_TYPE_ERROR) {
fprintf(stderr, "server error: %.*s\n", (int) msg.body_len, msg.body == NULL ? "" : (const char *) msg.body);
} else {
fprintf(stderr, "unexpected message type %s from %s ignored\n", protocol_message_type_name(msg.type), msg.from);
}
protocol_message_clear(&msg);
}
rc = 0;
goto cleanup;
}
fprintf(stdout, "UDP PING %s via %s (payload=%d bytes, UDP)\n", target_peer, server_addr, size);
ping_receiver_ctx_init(&receiver_ctx, client);
receiver_ctx_initialized = 1;
if (pthread_create(&receiver_thread, NULL, udpping_receive_thread_main, &receiver_ctx) != 0) {
fprintf(stderr, "udpping: create receive thread failed\n");
goto cleanup;
}
receiver_thread_started = 1;
{
uint64_t next_seq = 1;
int stop_sending = 0;
int64_t next_send_at_ns = omni_now_unix_nano();
int poll_ms = ping_expiry_poll_ms(timeout_ms);
int64_t timeout_ns = (int64_t) timeout_ms * 1000000LL;
while (!g_udpping_stop || tracker.pending_count > 0 || !stop_sending) {
int64_t now_ns = omni_now_unix_nano();
message_t msg;
int popped;
int receiver_closed;
int receiver_status_rc;
if (!stop_sending && now_ns >= next_send_at_ns) {
char *payload = NULL;
size_t payload_len = 0;
if (count > 0 && tracker.sent >= count) {
stop_sending = 1;
} else {
if (ping_build_payload(next_seq, now_ns, size, &payload, &payload_len) != 0) {
fprintf(stderr, "udpping: build payload for seq=%" PRIu64 " failed\n", next_seq);
free(payload);
goto cleanup;
}
if (udp_client_send_text(client, target_peer, payload) != 0) {
fprintf(stderr, "udpping: send ping seq=%" PRIu64 " failed\n", next_seq);
free(payload);
goto cleanup;
}
free(payload);
if (ping_tracker_mark_sent(&tracker, next_seq, now_ns, timeout_ns) != 0) {
goto cleanup;
}
next_seq++;
next_send_at_ns = now_ns + (int64_t) interval_ms * 1000000LL;
if (count > 0 && tracker.sent >= count) {
stop_sending = 1;
}
}
}
ping_tracker_expire(&tracker, now_ns, stdout);
do {
popped = ping_receiver_pop(&receiver_ctx, &msg);
if (popped == 1) {
if (msg.type == MSG_TYPE_TEXT) {
uint64_t seq;
int64_t sent_ts_ns;
int disposition;
int64_t rtt_ns;
if (ping_parse_payload(msg.body, msg.body_len, &seq, &sent_ts_ns) != 0) {
fprintf(stderr, "ignore non-ping text message from %s\n", msg.from);
} else if (ping_tracker_observe_reply(&tracker, seq, sent_ts_ns, omni_now_unix_nano(), &disposition, &rtt_ns) != 0) {
protocol_message_clear(&msg);
goto cleanup;
} else if (disposition == 0) {
fprintf(stdout, "seq=%" PRIu64 " rtt=%.2fms\n", seq, (double) rtt_ns / 1000000.0);
} else if (disposition == 1) {
fprintf(stderr, "seq=%" PRIu64 " duplicate or late reply ignored\n", seq);
} else {
fprintf(stderr, "seq=%" PRIu64 " unexpected reply ignored\n", seq);
}
} else if (msg.type == MSG_TYPE_ERROR) {
fprintf(stderr, "server error: %.*s\n", (int) msg.body_len, msg.body == NULL ? "" : (const char *) msg.body);
} else {
fprintf(stderr, "unexpected message type %s from %s ignored\n", protocol_message_type_name(msg.type), msg.from);
}
protocol_message_clear(&msg);
}
} while (popped == 1);
ping_receiver_status(&receiver_ctx, &receiver_closed, &receiver_status_rc);
if (receiver_closed && receiver_status_rc != 0) {
fprintf(stderr, "udpping: receive loop failed\n");
goto cleanup;
}
if ((g_udpping_stop || stop_sending) && tracker.pending_count == 0) {
break;
}
usleep((useconds_t) poll_ms * 1000U);
}
}
ping_print_summary(stdout, target_peer, &tracker);
rc = 0;
cleanup:
receiver_ctx.stop_requested = 1;
udp_client_close(client);
if (receiver_thread_started) {
pthread_join(receiver_thread, NULL);
ping_receiver_ctx_destroy(&receiver_ctx);
} else if (receiver_ctx_initialized) {
ping_receiver_ctx_destroy(&receiver_ctx);
}
udp_client_free(client);
latencylog_close(latency_logger);
ping_tracker_destroy(&tracker);
return rc;
}

59
c/cmd/udprelay.c Normal file
View File

@@ -0,0 +1,59 @@
#include "cli_parse.h"
#include "server_udp_relay.h"
static void udprelay_usage(FILE *out) {
fprintf(out, "usage: udprelay [-listen addr] [-upstream addr]\n");
}
int main(int argc, char **argv) {
const char *listen_addr = ":9003";
const char *upstream_addr = "127.0.0.1:9002";
udp_relay_t *relay = NULL;
int i;
int rc = 1;
for (i = 1; i < argc; ++i) {
const char *value = NULL;
int handled;
if ((handled = cli_parse_value_flag(argc, argv, &i, argv[i], "-listen", &value)) < 0) {
fprintf(stderr, "udprelay: flag -listen requires a value\n");
return 1;
} else if (handled) {
listen_addr = value;
continue;
}
if ((handled = cli_parse_value_flag(argc, argv, &i, argv[i], "-upstream", &value)) < 0) {
fprintf(stderr, "udprelay: flag -upstream requires a value\n");
return 1;
} else if (handled) {
upstream_addr = value;
continue;
}
if (strcmp(argv[i], "-h") == 0 || strcmp(argv[i], "--help") == 0) {
udprelay_usage(stdout);
return 0;
}
fprintf(stderr, "udprelay: unknown argument %s\n", argv[i]);
udprelay_usage(stderr);
return 1;
}
relay = udp_relay_open(listen_addr, upstream_addr);
if (relay == NULL) {
fprintf(stderr, "udprelay: open relay %s -> %s failed\n", listen_addr, upstream_addr);
goto cleanup;
}
fprintf(stderr, "udp relay listening on %s, upstream %s\n", listen_addr, upstream_addr);
if (udp_relay_serve(relay) != 0) {
fprintf(stderr, "udprelay: relay serve failed\n");
goto cleanup;
}
rc = 0;
cleanup:
udp_relay_free(relay);
return rc;
}

88
c/cmd/udpserver.c Normal file
View File

@@ -0,0 +1,88 @@
#include "cli_parse.h"
#include "server_udp_hub.h"
static void udpserver_usage(FILE *out) {
fprintf(out, "usage: udpserver [-listen addr] [-latency-log path] [-tx-ts-debug-log path]\n");
}
int main(int argc, char **argv) {
const char *listen_addr = ":9001";
const char *latency_log_path = "";
const char *tx_debug_log_path = "";
latency_logger_t *latency_logger = NULL;
tx_timestamp_debug_logger_t *debug_logger = NULL;
udp_hub_t *hub = NULL;
int enable_timestamping = 0;
int i;
int rc = 1;
for (i = 1; i < argc; ++i) {
const char *value = NULL;
int handled;
if ((handled = cli_parse_value_flag(argc, argv, &i, argv[i], "-listen", &value)) < 0) {
fprintf(stderr, "udpserver: flag -listen requires a value\n");
return 1;
} else if (handled) {
listen_addr = value;
continue;
}
if ((handled = cli_parse_value_flag(argc, argv, &i, argv[i], "-latency-log", &value)) < 0) {
fprintf(stderr, "udpserver: flag -latency-log requires a value\n");
return 1;
} else if (handled) {
latency_log_path = value;
continue;
}
if ((handled = cli_parse_value_flag(argc, argv, &i, argv[i], "-tx-ts-debug-log", &value)) < 0) {
fprintf(stderr, "udpserver: flag -tx-ts-debug-log requires a value\n");
return 1;
} else if (handled) {
tx_debug_log_path = value;
continue;
}
if (strcmp(argv[i], "-h") == 0 || strcmp(argv[i], "--help") == 0) {
udpserver_usage(stdout);
return 0;
}
fprintf(stderr, "udpserver: unknown argument %s\n", argv[i]);
udpserver_usage(stderr);
return 1;
}
if (latency_log_path[0] != '\0') {
latency_logger = latencylog_open_jsonl(latency_log_path);
if (latency_logger == NULL) {
fprintf(stderr, "udpserver: open latency logger %s failed\n", latency_log_path);
goto cleanup;
}
}
if (tx_debug_log_path[0] != '\0') {
debug_logger = tx_timestamp_debug_open_jsonl(tx_debug_log_path);
if (debug_logger == NULL) {
fprintf(stderr, "udpserver: open tx timestamp debug logger %s failed\n", tx_debug_log_path);
goto cleanup;
}
enable_timestamping = 1;
}
hub = udp_hub_open(listen_addr, latency_logger, debug_logger, enable_timestamping);
if (hub == NULL) {
fprintf(stderr, "udpserver: listen on %s failed\n", listen_addr);
goto cleanup;
}
fprintf(stderr, "udp server listening on %s\n", listen_addr);
if (udp_hub_serve(hub) != 0) {
fprintf(stderr, "udpserver: serve failed\n");
goto cleanup;
}
rc = 0;
cleanup:
udp_hub_free(hub);
tx_timestamp_debug_close(debug_logger);
latencylog_close(latency_logger);
return rc;
}