789 lines
25 KiB
C
789 lines
25 KiB
C
#include "cli_parse.h"
|
|
#include "peer_kcp_client.h"
|
|
|
|
#include "cJSON.h"
|
|
|
|
#include <signal.h>
|
|
#include <unistd.h>
|
|
|
|
typedef struct kcp_ping_message_node {
|
|
struct kcp_ping_message_node *next;
|
|
message_t msg;
|
|
} kcp_ping_message_node_t;
|
|
|
|
typedef struct kcp_ping_receiver_ctx {
|
|
kcp_client_t *client;
|
|
pthread_mutex_t mu;
|
|
kcp_ping_message_node_t *head;
|
|
kcp_ping_message_node_t *tail;
|
|
volatile int stop_requested;
|
|
int closed;
|
|
int rc;
|
|
} kcp_ping_receiver_ctx_t;
|
|
|
|
typedef struct kcp_pending_ping {
|
|
struct kcp_pending_ping *next;
|
|
uint64_t seq;
|
|
int64_t deadline_ns;
|
|
} kcp_pending_ping_t;
|
|
|
|
typedef struct kcp_ping_tracker {
|
|
kcp_pending_ping_t *pending;
|
|
int pending_count;
|
|
int sent;
|
|
int duplicates;
|
|
uint64_t max_seq_sent;
|
|
int64_t *samples_ns;
|
|
size_t sample_count;
|
|
size_t sample_cap;
|
|
} kcp_ping_tracker_t;
|
|
|
|
static volatile sig_atomic_t g_kcpping_stop = 0;
|
|
|
|
static void kcpping_on_signal(int signo) {
|
|
(void) signo;
|
|
g_kcpping_stop = 1;
|
|
}
|
|
|
|
static void kcpping_usage(FILE *out) {
|
|
fprintf(out, "usage: kcpping [-id pinger] [-server 127.0.0.1:9002] [-to peer] [-echo]\n");
|
|
fprintf(out, " [-count 100] [-interval 100ms] [-size 64] [-timeout 3s]\n");
|
|
fprintf(out, " [-bind-ip ip] [-bind-device dev] [-latency-log path]\n");
|
|
}
|
|
|
|
static int kcp_ping_compare_i64(const void *left, const void *right) {
|
|
const int64_t *a = (const int64_t *) left;
|
|
const int64_t *b = (const int64_t *) right;
|
|
if (*a < *b) {
|
|
return -1;
|
|
}
|
|
if (*a > *b) {
|
|
return 1;
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
static double kcp_ping_sqrt(double value) {
|
|
double x = value;
|
|
int i;
|
|
|
|
if (value <= 0.0) {
|
|
return 0.0;
|
|
}
|
|
if (x < 1.0) {
|
|
x = 1.0;
|
|
}
|
|
for (i = 0; i < 16; ++i) {
|
|
x = 0.5 * (x + value / x);
|
|
}
|
|
return x;
|
|
}
|
|
|
|
static int kcp_ping_build_payload(uint64_t seq, int64_t ts_ns, int size, char **out_body, size_t *out_len) {
|
|
cJSON *root = NULL;
|
|
char *json = NULL;
|
|
char *pad = NULL;
|
|
size_t base_len;
|
|
size_t pad_len;
|
|
|
|
*out_body = NULL;
|
|
*out_len = 0;
|
|
|
|
root = cJSON_CreateObject();
|
|
if (root == NULL) {
|
|
return -1;
|
|
}
|
|
cJSON_AddNumberToObject(root, "seq", (double) seq);
|
|
cJSON_AddNumberToObject(root, "ts_ns", (double) ts_ns);
|
|
cJSON_AddStringToObject(root, "pad", "");
|
|
json = cJSON_PrintUnformatted(root);
|
|
cJSON_Delete(root);
|
|
if (json == NULL) {
|
|
return -1;
|
|
}
|
|
base_len = strlen(json);
|
|
cJSON_free(json);
|
|
if ((int) base_len > size) {
|
|
errno = EMSGSIZE;
|
|
return -1;
|
|
}
|
|
|
|
pad_len = (size_t) size - base_len;
|
|
pad = (char *) malloc(pad_len + 1U);
|
|
if (pad == NULL) {
|
|
return -1;
|
|
}
|
|
memset(pad, 'A', pad_len);
|
|
pad[pad_len] = '\0';
|
|
|
|
root = cJSON_CreateObject();
|
|
if (root == NULL) {
|
|
free(pad);
|
|
return -1;
|
|
}
|
|
cJSON_AddNumberToObject(root, "seq", (double) seq);
|
|
cJSON_AddNumberToObject(root, "ts_ns", (double) ts_ns);
|
|
cJSON_AddStringToObject(root, "pad", pad);
|
|
free(pad);
|
|
json = cJSON_PrintUnformatted(root);
|
|
cJSON_Delete(root);
|
|
if (json == NULL) {
|
|
return -1;
|
|
}
|
|
if ((int) strlen(json) != size) {
|
|
cJSON_free(json);
|
|
errno = EINVAL;
|
|
return -1;
|
|
}
|
|
*out_body = json;
|
|
*out_len = (size_t) size;
|
|
return 0;
|
|
}
|
|
|
|
static int kcp_ping_parse_payload(const uint8_t *body, size_t body_len, uint64_t *seq, int64_t *ts_ns) {
|
|
char *text;
|
|
cJSON *root;
|
|
const cJSON *seq_item;
|
|
const cJSON *ts_item;
|
|
|
|
if (body == NULL || seq == NULL || ts_ns == NULL) {
|
|
errno = EINVAL;
|
|
return -1;
|
|
}
|
|
text = (char *) malloc(body_len + 1U);
|
|
if (text == NULL) {
|
|
return -1;
|
|
}
|
|
memcpy(text, body, body_len);
|
|
text[body_len] = '\0';
|
|
root = cJSON_Parse(text);
|
|
free(text);
|
|
if (root == NULL) {
|
|
errno = EPROTO;
|
|
return -1;
|
|
}
|
|
seq_item = cJSON_GetObjectItemCaseSensitive(root, "seq");
|
|
ts_item = cJSON_GetObjectItemCaseSensitive(root, "ts_ns");
|
|
if (!cJSON_IsNumber(seq_item) || !cJSON_IsNumber(ts_item) || seq_item->valuedouble <= 0 || ts_item->valuedouble <= 0) {
|
|
cJSON_Delete(root);
|
|
errno = EPROTO;
|
|
return -1;
|
|
}
|
|
*seq = (uint64_t) seq_item->valuedouble;
|
|
*ts_ns = (int64_t) ts_item->valuedouble;
|
|
cJSON_Delete(root);
|
|
return 0;
|
|
}
|
|
|
|
static void kcp_ping_receiver_ctx_init(kcp_ping_receiver_ctx_t *ctx, kcp_client_t *client) {
|
|
memset(ctx, 0, sizeof(*ctx));
|
|
ctx->client = client;
|
|
pthread_mutex_init(&ctx->mu, NULL);
|
|
}
|
|
|
|
static void kcp_ping_receiver_ctx_destroy(kcp_ping_receiver_ctx_t *ctx) {
|
|
kcp_ping_message_node_t *node;
|
|
kcp_ping_message_node_t *next;
|
|
|
|
if (ctx == NULL) {
|
|
return;
|
|
}
|
|
for (node = ctx->head; node != NULL; node = next) {
|
|
next = node->next;
|
|
protocol_message_clear(&node->msg);
|
|
free(node);
|
|
}
|
|
pthread_mutex_destroy(&ctx->mu);
|
|
}
|
|
|
|
static void *kcpping_receive_thread_main(void *arg) {
|
|
kcp_ping_receiver_ctx_t *ctx = (kcp_ping_receiver_ctx_t *) arg;
|
|
|
|
for (;;) {
|
|
message_t msg;
|
|
kcp_ping_message_node_t *node;
|
|
|
|
protocol_message_init(&msg);
|
|
if (kcp_client_receive(ctx->client, &msg) != 0) {
|
|
protocol_message_clear(&msg);
|
|
pthread_mutex_lock(&ctx->mu);
|
|
ctx->rc = ctx->stop_requested ? 0 : -1;
|
|
ctx->closed = 1;
|
|
pthread_mutex_unlock(&ctx->mu);
|
|
return NULL;
|
|
}
|
|
|
|
node = (kcp_ping_message_node_t *) calloc(1, sizeof(*node));
|
|
if (node == NULL) {
|
|
protocol_message_clear(&msg);
|
|
pthread_mutex_lock(&ctx->mu);
|
|
ctx->rc = -1;
|
|
ctx->closed = 1;
|
|
pthread_mutex_unlock(&ctx->mu);
|
|
return NULL;
|
|
}
|
|
node->msg = msg;
|
|
|
|
pthread_mutex_lock(&ctx->mu);
|
|
if (ctx->tail == NULL) {
|
|
ctx->head = node;
|
|
} else {
|
|
ctx->tail->next = node;
|
|
}
|
|
ctx->tail = node;
|
|
pthread_mutex_unlock(&ctx->mu);
|
|
}
|
|
}
|
|
|
|
static int kcp_ping_receiver_pop(kcp_ping_receiver_ctx_t *ctx, message_t *out_msg) {
|
|
kcp_ping_message_node_t *node;
|
|
|
|
pthread_mutex_lock(&ctx->mu);
|
|
node = ctx->head;
|
|
if (node != NULL) {
|
|
ctx->head = node->next;
|
|
if (ctx->head == NULL) {
|
|
ctx->tail = NULL;
|
|
}
|
|
}
|
|
pthread_mutex_unlock(&ctx->mu);
|
|
|
|
if (node == NULL) {
|
|
return 0;
|
|
}
|
|
*out_msg = node->msg;
|
|
free(node);
|
|
return 1;
|
|
}
|
|
|
|
static int kcp_ping_receiver_status(kcp_ping_receiver_ctx_t *ctx, int *closed, int *rc) {
|
|
pthread_mutex_lock(&ctx->mu);
|
|
*closed = ctx->closed;
|
|
*rc = ctx->rc;
|
|
pthread_mutex_unlock(&ctx->mu);
|
|
return 0;
|
|
}
|
|
|
|
static void kcp_ping_tracker_init(kcp_ping_tracker_t *tracker) {
|
|
memset(tracker, 0, sizeof(*tracker));
|
|
}
|
|
|
|
static void kcp_ping_tracker_destroy(kcp_ping_tracker_t *tracker) {
|
|
kcp_pending_ping_t *pending;
|
|
kcp_pending_ping_t *next;
|
|
|
|
for (pending = tracker->pending; pending != NULL; pending = next) {
|
|
next = pending->next;
|
|
free(pending);
|
|
}
|
|
free(tracker->samples_ns);
|
|
}
|
|
|
|
static int kcp_ping_tracker_mark_sent(kcp_ping_tracker_t *tracker, uint64_t seq, int64_t sent_at_ns, int64_t timeout_ns) {
|
|
kcp_pending_ping_t *pending = (kcp_pending_ping_t *) calloc(1, sizeof(*pending));
|
|
if (pending == NULL) {
|
|
return -1;
|
|
}
|
|
pending->seq = seq;
|
|
pending->deadline_ns = sent_at_ns + timeout_ns;
|
|
pending->next = tracker->pending;
|
|
tracker->pending = pending;
|
|
tracker->pending_count++;
|
|
tracker->sent++;
|
|
tracker->max_seq_sent = seq;
|
|
return 0;
|
|
}
|
|
|
|
static kcp_pending_ping_t *kcp_ping_tracker_find_pending(kcp_ping_tracker_t *tracker, uint64_t seq, kcp_pending_ping_t **out_prev) {
|
|
kcp_pending_ping_t *prev = NULL;
|
|
kcp_pending_ping_t *cur;
|
|
|
|
for (cur = tracker->pending; cur != NULL; cur = cur->next) {
|
|
if (cur->seq == seq) {
|
|
if (out_prev != NULL) {
|
|
*out_prev = prev;
|
|
}
|
|
return cur;
|
|
}
|
|
prev = cur;
|
|
}
|
|
if (out_prev != NULL) {
|
|
*out_prev = NULL;
|
|
}
|
|
return NULL;
|
|
}
|
|
|
|
static int kcp_ping_tracker_add_sample(kcp_ping_tracker_t *tracker, int64_t rtt_ns) {
|
|
int64_t *next_samples;
|
|
size_t next_cap;
|
|
|
|
if (tracker->sample_count == tracker->sample_cap) {
|
|
next_cap = tracker->sample_cap == 0 ? 16U : tracker->sample_cap * 2U;
|
|
next_samples = (int64_t *) realloc(tracker->samples_ns, next_cap * sizeof(*next_samples));
|
|
if (next_samples == NULL) {
|
|
return -1;
|
|
}
|
|
tracker->samples_ns = next_samples;
|
|
tracker->sample_cap = next_cap;
|
|
}
|
|
tracker->samples_ns[tracker->sample_count++] = rtt_ns;
|
|
return 0;
|
|
}
|
|
|
|
static int kcp_ping_tracker_observe_reply(kcp_ping_tracker_t *tracker, uint64_t seq, int64_t sent_ts_ns, int64_t received_ts_ns, int *disposition, int64_t *rtt_ns) {
|
|
kcp_pending_ping_t *prev = NULL;
|
|
kcp_pending_ping_t *pending;
|
|
|
|
if (seq == 0 || seq > tracker->max_seq_sent) {
|
|
*disposition = 2;
|
|
*rtt_ns = 0;
|
|
return 0;
|
|
}
|
|
pending = kcp_ping_tracker_find_pending(tracker, seq, &prev);
|
|
if (pending == NULL) {
|
|
tracker->duplicates++;
|
|
*disposition = 1;
|
|
*rtt_ns = 0;
|
|
return 0;
|
|
}
|
|
if (prev == NULL) {
|
|
tracker->pending = pending->next;
|
|
} else {
|
|
prev->next = pending->next;
|
|
}
|
|
tracker->pending_count--;
|
|
free(pending);
|
|
|
|
*rtt_ns = received_ts_ns - sent_ts_ns;
|
|
if (*rtt_ns < 0) {
|
|
*rtt_ns = 0;
|
|
}
|
|
if (kcp_ping_tracker_add_sample(tracker, *rtt_ns) != 0) {
|
|
return -1;
|
|
}
|
|
*disposition = 0;
|
|
return 0;
|
|
}
|
|
|
|
static void kcp_ping_tracker_expire(kcp_ping_tracker_t *tracker, int64_t now_ns, FILE *out) {
|
|
kcp_pending_ping_t *prev = NULL;
|
|
kcp_pending_ping_t *cur = tracker->pending;
|
|
|
|
while (cur != NULL) {
|
|
if (cur->deadline_ns <= now_ns) {
|
|
kcp_pending_ping_t *next = cur->next;
|
|
fprintf(out, "seq=%" PRIu64 " timeout\n", cur->seq);
|
|
if (prev == NULL) {
|
|
tracker->pending = next;
|
|
} else {
|
|
prev->next = next;
|
|
}
|
|
free(cur);
|
|
tracker->pending_count--;
|
|
cur = next;
|
|
continue;
|
|
}
|
|
prev = cur;
|
|
cur = cur->next;
|
|
}
|
|
}
|
|
|
|
static int64_t kcp_ping_percentile_ns(const int64_t *sorted, size_t count, double percentile) {
|
|
size_t index;
|
|
double raw_index;
|
|
|
|
if (count == 0) {
|
|
return 0;
|
|
}
|
|
if (percentile <= 0.0) {
|
|
return sorted[0];
|
|
}
|
|
if (percentile >= 1.0) {
|
|
return sorted[count - 1];
|
|
}
|
|
raw_index = percentile * (double) count;
|
|
index = (size_t) raw_index;
|
|
if ((double) index < raw_index) {
|
|
index++;
|
|
}
|
|
if (index > 0) {
|
|
index--;
|
|
}
|
|
if (index >= count) {
|
|
index = count - 1;
|
|
}
|
|
return sorted[index];
|
|
}
|
|
|
|
static void kcp_ping_print_summary(FILE *out, const char *target, const kcp_ping_tracker_t *tracker) {
|
|
int received = (int) tracker->sample_count;
|
|
double loss_pct = tracker->sent == 0 ? 0.0 : ((double) (tracker->sent - received) * 100.0 / (double) tracker->sent);
|
|
|
|
fprintf(out, "--- %s kcp ping statistics ---\n", target);
|
|
fprintf(out, "%d packets transmitted, %d received, %d duplicates, %.2f%% packet loss\n", tracker->sent, received, tracker->duplicates, loss_pct);
|
|
if (tracker->sample_count == 0) {
|
|
fprintf(out, "rtt min/avg/max/p50/p95/p99 = n/a/n/a/n/a/n/a/n/a/n/a, stddev=n/a\n");
|
|
return;
|
|
}
|
|
|
|
{
|
|
int64_t *sorted = (int64_t *) malloc(tracker->sample_count * sizeof(*sorted));
|
|
size_t i;
|
|
double sum = 0.0;
|
|
double variance = 0.0;
|
|
double avg;
|
|
int64_t min_ns;
|
|
int64_t max_ns;
|
|
int64_t p50_ns;
|
|
int64_t p95_ns;
|
|
int64_t p99_ns;
|
|
|
|
if (sorted == NULL) {
|
|
fprintf(out, "rtt summary unavailable: memory allocation failed\n");
|
|
return;
|
|
}
|
|
memcpy(sorted, tracker->samples_ns, tracker->sample_count * sizeof(*sorted));
|
|
qsort(sorted, tracker->sample_count, sizeof(*sorted), kcp_ping_compare_i64);
|
|
for (i = 0; i < tracker->sample_count; ++i) {
|
|
sum += (double) sorted[i];
|
|
}
|
|
avg = sum / (double) tracker->sample_count;
|
|
for (i = 0; i < tracker->sample_count; ++i) {
|
|
double delta = (double) sorted[i] - avg;
|
|
variance += delta * delta;
|
|
}
|
|
variance /= (double) tracker->sample_count;
|
|
|
|
min_ns = sorted[0];
|
|
max_ns = sorted[tracker->sample_count - 1];
|
|
p50_ns = kcp_ping_percentile_ns(sorted, tracker->sample_count, 0.50);
|
|
p95_ns = kcp_ping_percentile_ns(sorted, tracker->sample_count, 0.95);
|
|
p99_ns = kcp_ping_percentile_ns(sorted, tracker->sample_count, 0.99);
|
|
|
|
fprintf(
|
|
out,
|
|
"rtt min/avg/max/p50/p95/p99 = %.2fms/%.2fms/%.2fms/%.2fms/%.2fms/%.2fms, stddev=%.2fms\n",
|
|
(double) min_ns / 1000000.0,
|
|
avg / 1000000.0,
|
|
(double) max_ns / 1000000.0,
|
|
(double) p50_ns / 1000000.0,
|
|
(double) p95_ns / 1000000.0,
|
|
(double) p99_ns / 1000000.0,
|
|
kcp_ping_sqrt(variance) / 1000000.0
|
|
);
|
|
free(sorted);
|
|
}
|
|
}
|
|
|
|
static int kcp_ping_expiry_poll_ms(int timeout_ms) {
|
|
int interval = timeout_ms / 4;
|
|
if (interval < 10) {
|
|
return 10;
|
|
}
|
|
if (interval > 100) {
|
|
return 100;
|
|
}
|
|
return interval;
|
|
}
|
|
|
|
int main(int argc, char **argv) {
|
|
const char *peer_id = "pinger";
|
|
const char *server_addr = "127.0.0.1:9002";
|
|
const char *target_peer = "";
|
|
const char *bind_ip = "";
|
|
const char *bind_device = "";
|
|
const char *latency_log_path = "";
|
|
int echo_mode = 0;
|
|
int count = 100;
|
|
int interval_ms = 100;
|
|
int size = 64;
|
|
int timeout_ms = 3000;
|
|
latency_logger_t *latency_logger = NULL;
|
|
kcp_client_t *client = NULL;
|
|
kcp_ping_receiver_ctx_t receiver_ctx;
|
|
pthread_t receiver_thread;
|
|
int receiver_ctx_initialized = 0;
|
|
int receiver_thread_started = 0;
|
|
kcp_ping_tracker_t tracker;
|
|
int i;
|
|
int rc = 1;
|
|
|
|
kcp_ping_tracker_init(&tracker);
|
|
memset(&receiver_ctx, 0, sizeof(receiver_ctx));
|
|
|
|
for (i = 1; i < argc; ++i) {
|
|
const char *value = NULL;
|
|
int handled;
|
|
|
|
if ((handled = cli_parse_value_flag(argc, argv, &i, argv[i], "-id", &value)) < 0) {
|
|
fprintf(stderr, "kcpping: flag -id requires a value\n");
|
|
return 1;
|
|
} else if (handled) {
|
|
peer_id = value;
|
|
continue;
|
|
}
|
|
if ((handled = cli_parse_value_flag(argc, argv, &i, argv[i], "-server", &value)) < 0) {
|
|
fprintf(stderr, "kcpping: flag -server requires a value\n");
|
|
return 1;
|
|
} else if (handled) {
|
|
server_addr = value;
|
|
continue;
|
|
}
|
|
if ((handled = cli_parse_value_flag(argc, argv, &i, argv[i], "-to", &value)) < 0) {
|
|
fprintf(stderr, "kcpping: flag -to requires a value\n");
|
|
return 1;
|
|
} else if (handled) {
|
|
target_peer = value;
|
|
continue;
|
|
}
|
|
if ((handled = cli_parse_value_flag(argc, argv, &i, argv[i], "-count", &value)) < 0) {
|
|
fprintf(stderr, "kcpping: flag -count requires a value\n");
|
|
return 1;
|
|
} else if (handled) {
|
|
count = atoi(value);
|
|
continue;
|
|
}
|
|
if ((handled = cli_parse_value_flag(argc, argv, &i, argv[i], "-interval", &value)) < 0) {
|
|
fprintf(stderr, "kcpping: flag -interval requires a value\n");
|
|
return 1;
|
|
} else if (handled) {
|
|
if (omni_parse_duration_ms(value, interval_ms, &interval_ms) != 0) {
|
|
fprintf(stderr, "kcpping: invalid -interval value %s\n", value);
|
|
return 1;
|
|
}
|
|
continue;
|
|
}
|
|
if ((handled = cli_parse_value_flag(argc, argv, &i, argv[i], "-size", &value)) < 0) {
|
|
fprintf(stderr, "kcpping: flag -size requires a value\n");
|
|
return 1;
|
|
} else if (handled) {
|
|
size = atoi(value);
|
|
continue;
|
|
}
|
|
if ((handled = cli_parse_value_flag(argc, argv, &i, argv[i], "-timeout", &value)) < 0) {
|
|
fprintf(stderr, "kcpping: flag -timeout requires a value\n");
|
|
return 1;
|
|
} else if (handled) {
|
|
if (omni_parse_duration_ms(value, timeout_ms, &timeout_ms) != 0) {
|
|
fprintf(stderr, "kcpping: invalid -timeout value %s\n", value);
|
|
return 1;
|
|
}
|
|
continue;
|
|
}
|
|
if ((handled = cli_parse_value_flag(argc, argv, &i, argv[i], "-bind-ip", &value)) < 0) {
|
|
fprintf(stderr, "kcpping: flag -bind-ip requires a value\n");
|
|
return 1;
|
|
} else if (handled) {
|
|
bind_ip = value;
|
|
continue;
|
|
}
|
|
if ((handled = cli_parse_value_flag(argc, argv, &i, argv[i], "-bind-device", &value)) < 0) {
|
|
fprintf(stderr, "kcpping: flag -bind-device requires a value\n");
|
|
return 1;
|
|
} else if (handled) {
|
|
bind_device = value;
|
|
continue;
|
|
}
|
|
if ((handled = cli_parse_value_flag(argc, argv, &i, argv[i], "-latency-log", &value)) < 0) {
|
|
fprintf(stderr, "kcpping: flag -latency-log requires a value\n");
|
|
return 1;
|
|
} else if (handled) {
|
|
latency_log_path = value;
|
|
continue;
|
|
}
|
|
if ((handled = cli_parse_bool_flag(argv[i], "-echo", &echo_mode)) < 0) {
|
|
fprintf(stderr, "kcpping: invalid -echo value\n");
|
|
return 1;
|
|
} else if (handled) {
|
|
continue;
|
|
}
|
|
if (strcmp(argv[i], "-h") == 0 || strcmp(argv[i], "--help") == 0) {
|
|
kcpping_usage(stdout);
|
|
return 0;
|
|
}
|
|
fprintf(stderr, "kcpping: unknown argument %s\n", argv[i]);
|
|
kcpping_usage(stderr);
|
|
return 1;
|
|
}
|
|
|
|
if (peer_id[0] == '\0' || server_addr[0] == '\0') {
|
|
fprintf(stderr, "kcpping: flags -id and -server are required\n");
|
|
return 1;
|
|
}
|
|
if (!echo_mode && target_peer[0] == '\0') {
|
|
fprintf(stderr, "kcpping: flag -to is required unless -echo is set\n");
|
|
return 1;
|
|
}
|
|
if (count < 0 || interval_ms <= 0 || size <= 0 || timeout_ms <= 0) {
|
|
fprintf(stderr, "kcpping: invalid numeric flag value\n");
|
|
return 1;
|
|
}
|
|
|
|
signal(SIGINT, kcpping_on_signal);
|
|
|
|
if (latency_log_path[0] != '\0') {
|
|
latency_logger = latencylog_open_jsonl(latency_log_path);
|
|
if (latency_logger == NULL) {
|
|
fprintf(stderr, "kcpping: open latency logger %s failed\n", latency_log_path);
|
|
goto cleanup;
|
|
}
|
|
}
|
|
client = kcp_client_dial(server_addr, NULL, peer_id, bind_ip, bind_device, latency_logger, NULL, NULL, KCP_DEFAULT_STATS_INTERVAL_MS);
|
|
if (client == NULL) {
|
|
fprintf(stderr, "kcpping: dial kcp server %s failed\n", server_addr);
|
|
goto cleanup;
|
|
}
|
|
|
|
if (echo_mode) {
|
|
while (!g_kcpping_stop) {
|
|
message_t msg;
|
|
|
|
protocol_message_init(&msg);
|
|
if (kcp_client_receive(client, &msg) != 0) {
|
|
protocol_message_clear(&msg);
|
|
if (g_kcpping_stop) {
|
|
break;
|
|
}
|
|
fprintf(stderr, "kcpping: receive failed in echo mode\n");
|
|
goto cleanup;
|
|
}
|
|
if (msg.type == MSG_TYPE_TEXT) {
|
|
char *text = (char *) malloc(msg.body_len + 1U);
|
|
if (text == NULL) {
|
|
protocol_message_clear(&msg);
|
|
goto cleanup;
|
|
}
|
|
memcpy(text, msg.body, msg.body_len);
|
|
text[msg.body_len] = '\0';
|
|
if (kcp_client_send_text(client, msg.from, text) != 0) {
|
|
free(text);
|
|
protocol_message_clear(&msg);
|
|
fprintf(stderr, "kcpping: echo send back to %s failed\n", msg.from);
|
|
goto cleanup;
|
|
}
|
|
free(text);
|
|
} else if (msg.type == MSG_TYPE_ERROR) {
|
|
fprintf(stderr, "server error: %.*s\n", (int) msg.body_len, msg.body == NULL ? "" : (const char *) msg.body);
|
|
} else {
|
|
fprintf(stderr, "unexpected message type %s from %s ignored\n", protocol_message_type_name(msg.type), msg.from);
|
|
}
|
|
protocol_message_clear(&msg);
|
|
}
|
|
rc = 0;
|
|
goto cleanup;
|
|
}
|
|
|
|
fprintf(stdout, "KCP PING %s via %s (payload=%d bytes, KCP)\n", target_peer, server_addr, size);
|
|
kcp_ping_receiver_ctx_init(&receiver_ctx, client);
|
|
receiver_ctx_initialized = 1;
|
|
if (pthread_create(&receiver_thread, NULL, kcpping_receive_thread_main, &receiver_ctx) != 0) {
|
|
fprintf(stderr, "kcpping: create receive thread failed\n");
|
|
goto cleanup;
|
|
}
|
|
receiver_thread_started = 1;
|
|
|
|
{
|
|
uint64_t next_seq = 1;
|
|
int stop_sending = 0;
|
|
int64_t next_send_at_ns = omni_now_unix_nano();
|
|
int poll_ms = kcp_ping_expiry_poll_ms(timeout_ms);
|
|
int64_t timeout_ns = (int64_t) timeout_ms * 1000000LL;
|
|
|
|
while (!g_kcpping_stop || tracker.pending_count > 0 || !stop_sending) {
|
|
int64_t now_ns = omni_now_unix_nano();
|
|
message_t msg;
|
|
int popped;
|
|
int receiver_closed;
|
|
int receiver_status_rc;
|
|
|
|
if (!stop_sending && now_ns >= next_send_at_ns) {
|
|
char *payload = NULL;
|
|
size_t payload_len = 0;
|
|
|
|
if (count > 0 && tracker.sent >= count) {
|
|
stop_sending = 1;
|
|
} else {
|
|
if (kcp_ping_build_payload(next_seq, now_ns, size, &payload, &payload_len) != 0) {
|
|
fprintf(stderr, "kcpping: build payload for seq=%" PRIu64 " failed\n", next_seq);
|
|
free(payload);
|
|
goto cleanup;
|
|
}
|
|
if (kcp_client_send_text(client, target_peer, payload) != 0) {
|
|
fprintf(stderr, "kcpping: send ping seq=%" PRIu64 " failed\n", next_seq);
|
|
free(payload);
|
|
goto cleanup;
|
|
}
|
|
free(payload);
|
|
if (kcp_ping_tracker_mark_sent(&tracker, next_seq, now_ns, timeout_ns) != 0) {
|
|
goto cleanup;
|
|
}
|
|
next_seq++;
|
|
next_send_at_ns = now_ns + (int64_t) interval_ms * 1000000LL;
|
|
if (count > 0 && tracker.sent >= count) {
|
|
stop_sending = 1;
|
|
}
|
|
}
|
|
}
|
|
|
|
kcp_ping_tracker_expire(&tracker, now_ns, stdout);
|
|
|
|
do {
|
|
popped = kcp_ping_receiver_pop(&receiver_ctx, &msg);
|
|
if (popped == 1) {
|
|
if (msg.type == MSG_TYPE_TEXT) {
|
|
uint64_t seq;
|
|
int64_t sent_ts_ns;
|
|
int disposition;
|
|
int64_t rtt_ns;
|
|
|
|
if (kcp_ping_parse_payload(msg.body, msg.body_len, &seq, &sent_ts_ns) != 0) {
|
|
fprintf(stderr, "ignore non-ping text message from %s\n", msg.from);
|
|
} else if (kcp_ping_tracker_observe_reply(&tracker, seq, sent_ts_ns, omni_now_unix_nano(), &disposition, &rtt_ns) != 0) {
|
|
protocol_message_clear(&msg);
|
|
goto cleanup;
|
|
} else if (disposition == 0) {
|
|
fprintf(stdout, "seq=%" PRIu64 " rtt=%.2fms\n", seq, (double) rtt_ns / 1000000.0);
|
|
} else if (disposition == 1) {
|
|
fprintf(stderr, "seq=%" PRIu64 " duplicate or late reply ignored\n", seq);
|
|
} else {
|
|
fprintf(stderr, "seq=%" PRIu64 " unexpected reply ignored\n", seq);
|
|
}
|
|
} else if (msg.type == MSG_TYPE_ERROR) {
|
|
fprintf(stderr, "server error: %.*s\n", (int) msg.body_len, msg.body == NULL ? "" : (const char *) msg.body);
|
|
} else {
|
|
fprintf(stderr, "unexpected message type %s from %s ignored\n", protocol_message_type_name(msg.type), msg.from);
|
|
}
|
|
protocol_message_clear(&msg);
|
|
}
|
|
} while (popped == 1);
|
|
|
|
kcp_ping_receiver_status(&receiver_ctx, &receiver_closed, &receiver_status_rc);
|
|
if (receiver_closed && receiver_status_rc != 0) {
|
|
fprintf(stderr, "kcpping: receive loop failed\n");
|
|
goto cleanup;
|
|
}
|
|
if ((g_kcpping_stop || stop_sending) && tracker.pending_count == 0) {
|
|
break;
|
|
}
|
|
usleep((useconds_t) poll_ms * 1000U);
|
|
}
|
|
}
|
|
|
|
kcp_ping_print_summary(stdout, target_peer, &tracker);
|
|
rc = 0;
|
|
|
|
cleanup:
|
|
receiver_ctx.stop_requested = 1;
|
|
kcp_client_close(client);
|
|
if (receiver_thread_started) {
|
|
pthread_join(receiver_thread, NULL);
|
|
kcp_ping_receiver_ctx_destroy(&receiver_ctx);
|
|
} else if (receiver_ctx_initialized) {
|
|
kcp_ping_receiver_ctx_destroy(&receiver_ctx);
|
|
}
|
|
kcp_client_free(client);
|
|
latencylog_close(latency_logger);
|
|
kcp_ping_tracker_destroy(&tracker);
|
|
return rc;
|
|
}
|