#include "cli_parse.h" #include "peer_udp_client.h" #include "cJSON.h" #include #include typedef struct ping_message_node { struct ping_message_node *next; message_t msg; } ping_message_node_t; typedef struct ping_receiver_ctx { udp_client_t *client; pthread_mutex_t mu; ping_message_node_t *head; ping_message_node_t *tail; volatile int stop_requested; int closed; int rc; } ping_receiver_ctx_t; typedef struct pending_ping { struct pending_ping *next; uint64_t seq; int64_t deadline_ns; } pending_ping_t; typedef struct ping_tracker { pending_ping_t *pending; int pending_count; int sent; int duplicates; uint64_t max_seq_sent; int64_t *samples_ns; size_t sample_count; size_t sample_cap; } ping_tracker_t; static volatile sig_atomic_t g_udpping_stop = 0; static void udpping_on_signal(int signo) { (void) signo; g_udpping_stop = 1; } static void udpping_usage(FILE *out) { fprintf(out, "usage: udpping [-id pinger] [-server 127.0.0.1:9001] [-to peer] [-echo]\n"); fprintf(out, " [-count 100] [-interval 100ms] [-size 64] [-timeout 3s]\n"); fprintf(out, " [-bind-ip ip] [-latency-log path]\n"); } static int ping_compare_i64(const void *left, const void *right) { const int64_t *a = (const int64_t *) left; const int64_t *b = (const int64_t *) right; if (*a < *b) { return -1; } if (*a > *b) { return 1; } return 0; } static double ping_sqrt(double value) { double x = value; int i; if (value <= 0.0) { return 0.0; } if (x < 1.0) { x = 1.0; } for (i = 0; i < 16; ++i) { x = 0.5 * (x + value / x); } return x; } static int ping_build_payload(uint64_t seq, int64_t ts_ns, int size, char **out_body, size_t *out_len) { cJSON *root = NULL; char *json = NULL; char *pad = NULL; size_t base_len; size_t pad_len; *out_body = NULL; *out_len = 0; root = cJSON_CreateObject(); if (root == NULL) { return -1; } cJSON_AddNumberToObject(root, "seq", (double) seq); cJSON_AddNumberToObject(root, "ts_ns", (double) ts_ns); cJSON_AddStringToObject(root, "pad", ""); json = cJSON_PrintUnformatted(root); cJSON_Delete(root); if (json == NULL) { return -1; } base_len = strlen(json); cJSON_free(json); if ((int) base_len > size) { errno = EMSGSIZE; return -1; } pad_len = (size_t) size - base_len; pad = (char *) malloc(pad_len + 1U); if (pad == NULL) { return -1; } memset(pad, 'A', pad_len); pad[pad_len] = '\0'; root = cJSON_CreateObject(); if (root == NULL) { free(pad); return -1; } cJSON_AddNumberToObject(root, "seq", (double) seq); cJSON_AddNumberToObject(root, "ts_ns", (double) ts_ns); cJSON_AddStringToObject(root, "pad", pad); free(pad); json = cJSON_PrintUnformatted(root); cJSON_Delete(root); if (json == NULL) { return -1; } if ((int) strlen(json) != size) { cJSON_free(json); errno = EINVAL; return -1; } *out_body = json; *out_len = (size_t) size; return 0; } static int ping_parse_payload(const uint8_t *body, size_t body_len, uint64_t *seq, int64_t *ts_ns) { char *text; cJSON *root; const cJSON *seq_item; const cJSON *ts_item; if (body == NULL || seq == NULL || ts_ns == NULL) { errno = EINVAL; return -1; } text = (char *) malloc(body_len + 1U); if (text == NULL) { return -1; } memcpy(text, body, body_len); text[body_len] = '\0'; root = cJSON_Parse(text); free(text); if (root == NULL) { errno = EPROTO; return -1; } seq_item = cJSON_GetObjectItemCaseSensitive(root, "seq"); ts_item = cJSON_GetObjectItemCaseSensitive(root, "ts_ns"); if (!cJSON_IsNumber(seq_item) || !cJSON_IsNumber(ts_item) || seq_item->valuedouble <= 0 || ts_item->valuedouble <= 0) { cJSON_Delete(root); errno = EPROTO; return -1; } *seq = (uint64_t) seq_item->valuedouble; *ts_ns = (int64_t) ts_item->valuedouble; cJSON_Delete(root); return 0; } static void ping_receiver_ctx_init(ping_receiver_ctx_t *ctx, udp_client_t *client) { memset(ctx, 0, sizeof(*ctx)); ctx->client = client; pthread_mutex_init(&ctx->mu, NULL); } static void ping_receiver_ctx_destroy(ping_receiver_ctx_t *ctx) { ping_message_node_t *node; ping_message_node_t *next; if (ctx == NULL) { return; } for (node = ctx->head; node != NULL; node = next) { next = node->next; protocol_message_clear(&node->msg); free(node); } pthread_mutex_destroy(&ctx->mu); } static void *udpping_receive_thread_main(void *arg) { ping_receiver_ctx_t *ctx = (ping_receiver_ctx_t *) arg; for (;;) { message_t msg; ping_message_node_t *node; protocol_message_init(&msg); if (udp_client_receive(ctx->client, &msg) != 0) { protocol_message_clear(&msg); pthread_mutex_lock(&ctx->mu); ctx->rc = ctx->stop_requested ? 0 : -1; ctx->closed = 1; pthread_mutex_unlock(&ctx->mu); return NULL; } node = (ping_message_node_t *) calloc(1, sizeof(*node)); if (node == NULL) { protocol_message_clear(&msg); pthread_mutex_lock(&ctx->mu); ctx->rc = -1; ctx->closed = 1; pthread_mutex_unlock(&ctx->mu); return NULL; } node->msg = msg; pthread_mutex_lock(&ctx->mu); if (ctx->tail == NULL) { ctx->head = node; } else { ctx->tail->next = node; } ctx->tail = node; pthread_mutex_unlock(&ctx->mu); } } static int ping_receiver_pop(ping_receiver_ctx_t *ctx, message_t *out_msg) { ping_message_node_t *node; pthread_mutex_lock(&ctx->mu); node = ctx->head; if (node != NULL) { ctx->head = node->next; if (ctx->head == NULL) { ctx->tail = NULL; } } pthread_mutex_unlock(&ctx->mu); if (node == NULL) { return 0; } *out_msg = node->msg; free(node); return 1; } static int ping_receiver_status(ping_receiver_ctx_t *ctx, int *closed, int *rc) { pthread_mutex_lock(&ctx->mu); *closed = ctx->closed; *rc = ctx->rc; pthread_mutex_unlock(&ctx->mu); return 0; } static void ping_tracker_init(ping_tracker_t *tracker) { memset(tracker, 0, sizeof(*tracker)); } static void ping_tracker_destroy(ping_tracker_t *tracker) { pending_ping_t *pending; pending_ping_t *next; for (pending = tracker->pending; pending != NULL; pending = next) { next = pending->next; free(pending); } free(tracker->samples_ns); } static int ping_tracker_mark_sent(ping_tracker_t *tracker, uint64_t seq, int64_t sent_at_ns, int64_t timeout_ns) { pending_ping_t *pending = (pending_ping_t *) calloc(1, sizeof(*pending)); if (pending == NULL) { return -1; } pending->seq = seq; pending->deadline_ns = sent_at_ns + timeout_ns; pending->next = tracker->pending; tracker->pending = pending; tracker->pending_count++; tracker->sent++; tracker->max_seq_sent = seq; return 0; } static pending_ping_t *ping_tracker_find_pending(ping_tracker_t *tracker, uint64_t seq, pending_ping_t **out_prev) { pending_ping_t *prev = NULL; pending_ping_t *cur; for (cur = tracker->pending; cur != NULL; cur = cur->next) { if (cur->seq == seq) { if (out_prev != NULL) { *out_prev = prev; } return cur; } prev = cur; } if (out_prev != NULL) { *out_prev = NULL; } return NULL; } static int ping_tracker_add_sample(ping_tracker_t *tracker, int64_t rtt_ns) { int64_t *next_samples; size_t next_cap; if (tracker->sample_count == tracker->sample_cap) { next_cap = tracker->sample_cap == 0 ? 16U : tracker->sample_cap * 2U; next_samples = (int64_t *) realloc(tracker->samples_ns, next_cap * sizeof(*next_samples)); if (next_samples == NULL) { return -1; } tracker->samples_ns = next_samples; tracker->sample_cap = next_cap; } tracker->samples_ns[tracker->sample_count++] = rtt_ns; return 0; } static int ping_tracker_observe_reply(ping_tracker_t *tracker, uint64_t seq, int64_t sent_ts_ns, int64_t received_ts_ns, int *disposition, int64_t *rtt_ns) { pending_ping_t *prev = NULL; pending_ping_t *pending; if (seq == 0 || seq > tracker->max_seq_sent) { *disposition = 2; *rtt_ns = 0; return 0; } pending = ping_tracker_find_pending(tracker, seq, &prev); if (pending == NULL) { tracker->duplicates++; *disposition = 1; *rtt_ns = 0; return 0; } if (prev == NULL) { tracker->pending = pending->next; } else { prev->next = pending->next; } tracker->pending_count--; free(pending); *rtt_ns = received_ts_ns - sent_ts_ns; if (*rtt_ns < 0) { *rtt_ns = 0; } if (ping_tracker_add_sample(tracker, *rtt_ns) != 0) { return -1; } *disposition = 0; return 0; } static void ping_tracker_expire(ping_tracker_t *tracker, int64_t now_ns, FILE *out) { pending_ping_t *prev = NULL; pending_ping_t *cur = tracker->pending; while (cur != NULL) { if (cur->deadline_ns <= now_ns) { pending_ping_t *next = cur->next; fprintf(out, "seq=%" PRIu64 " timeout\n", cur->seq); if (prev == NULL) { tracker->pending = next; } else { prev->next = next; } free(cur); tracker->pending_count--; cur = next; continue; } prev = cur; cur = cur->next; } } static int64_t ping_percentile_ns(const int64_t *sorted, size_t count, double percentile) { size_t index; double raw_index; if (count == 0) { return 0; } if (percentile <= 0.0) { return sorted[0]; } if (percentile >= 1.0) { return sorted[count - 1]; } raw_index = percentile * (double) count; index = (size_t) raw_index; if ((double) index < raw_index) { index++; } if (index > 0) { index--; } if (index >= count) { index = count - 1; } return sorted[index]; } static void ping_print_summary(FILE *out, const char *target, const ping_tracker_t *tracker) { int received = (int) tracker->sample_count; double loss_pct = tracker->sent == 0 ? 0.0 : ((double) (tracker->sent - received) * 100.0 / (double) tracker->sent); fprintf(out, "--- %s udp ping statistics ---\n", target); fprintf(out, "%d packets transmitted, %d received, %d duplicates, %.2f%% packet loss\n", tracker->sent, received, tracker->duplicates, loss_pct); if (tracker->sample_count == 0) { fprintf(out, "rtt min/avg/max/p50/p95/p99 = n/a/n/a/n/a/n/a/n/a/n/a, stddev=n/a\n"); return; } { int64_t *sorted = (int64_t *) malloc(tracker->sample_count * sizeof(*sorted)); size_t i; double sum = 0.0; double variance = 0.0; double avg; int64_t min_ns; int64_t max_ns; int64_t p50_ns; int64_t p95_ns; int64_t p99_ns; if (sorted == NULL) { fprintf(out, "rtt summary unavailable: memory allocation failed\n"); return; } memcpy(sorted, tracker->samples_ns, tracker->sample_count * sizeof(*sorted)); qsort(sorted, tracker->sample_count, sizeof(*sorted), ping_compare_i64); for (i = 0; i < tracker->sample_count; ++i) { sum += (double) sorted[i]; } avg = sum / (double) tracker->sample_count; for (i = 0; i < tracker->sample_count; ++i) { double delta = (double) sorted[i] - avg; variance += delta * delta; } variance /= (double) tracker->sample_count; min_ns = sorted[0]; max_ns = sorted[tracker->sample_count - 1]; p50_ns = ping_percentile_ns(sorted, tracker->sample_count, 0.50); p95_ns = ping_percentile_ns(sorted, tracker->sample_count, 0.95); p99_ns = ping_percentile_ns(sorted, tracker->sample_count, 0.99); fprintf( out, "rtt min/avg/max/p50/p95/p99 = %.2fms/%.2fms/%.2fms/%.2fms/%.2fms/%.2fms, stddev=%.2fms\n", (double) min_ns / 1000000.0, avg / 1000000.0, (double) max_ns / 1000000.0, (double) p50_ns / 1000000.0, (double) p95_ns / 1000000.0, (double) p99_ns / 1000000.0, ping_sqrt(variance) / 1000000.0 ); free(sorted); } } static int ping_expiry_poll_ms(int timeout_ms) { int interval = timeout_ms / 4; if (interval < 10) { return 10; } if (interval > 100) { return 100; } return interval; } int main(int argc, char **argv) { const char *peer_id = "pinger"; const char *server_addr = "127.0.0.1:9001"; const char *target_peer = ""; const char *bind_ip = ""; const char *latency_log_path = ""; int echo_mode = 0; int count = 100; int interval_ms = 100; int size = 64; int timeout_ms = 3000; latency_logger_t *latency_logger = NULL; udp_client_t *client = NULL; ping_receiver_ctx_t receiver_ctx; pthread_t receiver_thread; int receiver_ctx_initialized = 0; int receiver_thread_started = 0; ping_tracker_t tracker; int i; int rc = 1; ping_tracker_init(&tracker); memset(&receiver_ctx, 0, sizeof(receiver_ctx)); for (i = 1; i < argc; ++i) { const char *value = NULL; int handled; if ((handled = cli_parse_value_flag(argc, argv, &i, argv[i], "-id", &value)) < 0) { fprintf(stderr, "udpping: flag -id requires a value\n"); return 1; } else if (handled) { peer_id = value; continue; } if ((handled = cli_parse_value_flag(argc, argv, &i, argv[i], "-server", &value)) < 0) { fprintf(stderr, "udpping: flag -server requires a value\n"); return 1; } else if (handled) { server_addr = value; continue; } if ((handled = cli_parse_value_flag(argc, argv, &i, argv[i], "-to", &value)) < 0) { fprintf(stderr, "udpping: flag -to requires a value\n"); return 1; } else if (handled) { target_peer = value; continue; } if ((handled = cli_parse_value_flag(argc, argv, &i, argv[i], "-count", &value)) < 0) { fprintf(stderr, "udpping: flag -count requires a value\n"); return 1; } else if (handled) { count = atoi(value); continue; } if ((handled = cli_parse_value_flag(argc, argv, &i, argv[i], "-interval", &value)) < 0) { fprintf(stderr, "udpping: flag -interval requires a value\n"); return 1; } else if (handled) { if (omni_parse_duration_ms(value, interval_ms, &interval_ms) != 0) { fprintf(stderr, "udpping: invalid -interval value %s\n", value); return 1; } continue; } if ((handled = cli_parse_value_flag(argc, argv, &i, argv[i], "-size", &value)) < 0) { fprintf(stderr, "udpping: flag -size requires a value\n"); return 1; } else if (handled) { size = atoi(value); continue; } if ((handled = cli_parse_value_flag(argc, argv, &i, argv[i], "-timeout", &value)) < 0) { fprintf(stderr, "udpping: flag -timeout requires a value\n"); return 1; } else if (handled) { if (omni_parse_duration_ms(value, timeout_ms, &timeout_ms) != 0) { fprintf(stderr, "udpping: invalid -timeout value %s\n", value); return 1; } continue; } if ((handled = cli_parse_value_flag(argc, argv, &i, argv[i], "-bind-ip", &value)) < 0) { fprintf(stderr, "udpping: flag -bind-ip requires a value\n"); return 1; } else if (handled) { bind_ip = value; continue; } if ((handled = cli_parse_value_flag(argc, argv, &i, argv[i], "-latency-log", &value)) < 0) { fprintf(stderr, "udpping: flag -latency-log requires a value\n"); return 1; } else if (handled) { latency_log_path = value; continue; } if ((handled = cli_parse_bool_flag(argv[i], "-echo", &echo_mode)) < 0) { fprintf(stderr, "udpping: invalid -echo value\n"); return 1; } else if (handled) { continue; } if (strcmp(argv[i], "-h") == 0 || strcmp(argv[i], "--help") == 0) { udpping_usage(stdout); return 0; } fprintf(stderr, "udpping: unknown argument %s\n", argv[i]); udpping_usage(stderr); return 1; } if (peer_id[0] == '\0' || server_addr[0] == '\0') { fprintf(stderr, "udpping: flags -id and -server are required\n"); return 1; } if (!echo_mode && target_peer[0] == '\0') { fprintf(stderr, "udpping: flag -to is required unless -echo is set\n"); return 1; } if (count < 0 || interval_ms <= 0 || size <= 0 || timeout_ms <= 0) { fprintf(stderr, "udpping: invalid numeric flag value\n"); return 1; } signal(SIGINT, udpping_on_signal); if (latency_log_path[0] != '\0') { latency_logger = latencylog_open_jsonl(latency_log_path); if (latency_logger == NULL) { fprintf(stderr, "udpping: open latency logger %s failed\n", latency_log_path); goto cleanup; } } client = udp_client_dial(server_addr, peer_id, bind_ip, latency_logger, NULL, 0); if (client == NULL) { fprintf(stderr, "udpping: dial udp server %s failed\n", server_addr); goto cleanup; } if (echo_mode) { while (!g_udpping_stop) { message_t msg; protocol_message_init(&msg); if (udp_client_receive(client, &msg) != 0) { protocol_message_clear(&msg); if (g_udpping_stop) { break; } fprintf(stderr, "udpping: receive failed in echo mode\n"); goto cleanup; } if (msg.type == MSG_TYPE_TEXT) { char *text = (char *) malloc(msg.body_len + 1U); if (text == NULL) { protocol_message_clear(&msg); goto cleanup; } memcpy(text, msg.body, msg.body_len); text[msg.body_len] = '\0'; if (udp_client_send_text(client, msg.from, text) != 0) { free(text); protocol_message_clear(&msg); fprintf(stderr, "udpping: echo send back to %s failed\n", msg.from); goto cleanup; } free(text); } else if (msg.type == MSG_TYPE_ERROR) { fprintf(stderr, "server error: %.*s\n", (int) msg.body_len, msg.body == NULL ? "" : (const char *) msg.body); } else { fprintf(stderr, "unexpected message type %s from %s ignored\n", protocol_message_type_name(msg.type), msg.from); } protocol_message_clear(&msg); } rc = 0; goto cleanup; } fprintf(stdout, "UDP PING %s via %s (payload=%d bytes, UDP)\n", target_peer, server_addr, size); ping_receiver_ctx_init(&receiver_ctx, client); receiver_ctx_initialized = 1; if (pthread_create(&receiver_thread, NULL, udpping_receive_thread_main, &receiver_ctx) != 0) { fprintf(stderr, "udpping: create receive thread failed\n"); goto cleanup; } receiver_thread_started = 1; { uint64_t next_seq = 1; int stop_sending = 0; int64_t next_send_at_ns = omni_now_unix_nano(); int poll_ms = ping_expiry_poll_ms(timeout_ms); int64_t timeout_ns = (int64_t) timeout_ms * 1000000LL; while (!g_udpping_stop || tracker.pending_count > 0 || !stop_sending) { int64_t now_ns = omni_now_unix_nano(); message_t msg; int popped; int receiver_closed; int receiver_status_rc; if (!stop_sending && now_ns >= next_send_at_ns) { char *payload = NULL; size_t payload_len = 0; if (count > 0 && tracker.sent >= count) { stop_sending = 1; } else { if (ping_build_payload(next_seq, now_ns, size, &payload, &payload_len) != 0) { fprintf(stderr, "udpping: build payload for seq=%" PRIu64 " failed\n", next_seq); free(payload); goto cleanup; } if (udp_client_send_text(client, target_peer, payload) != 0) { fprintf(stderr, "udpping: send ping seq=%" PRIu64 " failed\n", next_seq); free(payload); goto cleanup; } free(payload); if (ping_tracker_mark_sent(&tracker, next_seq, now_ns, timeout_ns) != 0) { goto cleanup; } next_seq++; next_send_at_ns = now_ns + (int64_t) interval_ms * 1000000LL; if (count > 0 && tracker.sent >= count) { stop_sending = 1; } } } ping_tracker_expire(&tracker, now_ns, stdout); do { popped = ping_receiver_pop(&receiver_ctx, &msg); if (popped == 1) { if (msg.type == MSG_TYPE_TEXT) { uint64_t seq; int64_t sent_ts_ns; int disposition; int64_t rtt_ns; if (ping_parse_payload(msg.body, msg.body_len, &seq, &sent_ts_ns) != 0) { fprintf(stderr, "ignore non-ping text message from %s\n", msg.from); } else if (ping_tracker_observe_reply(&tracker, seq, sent_ts_ns, omni_now_unix_nano(), &disposition, &rtt_ns) != 0) { protocol_message_clear(&msg); goto cleanup; } else if (disposition == 0) { fprintf(stdout, "seq=%" PRIu64 " rtt=%.2fms\n", seq, (double) rtt_ns / 1000000.0); } else if (disposition == 1) { fprintf(stderr, "seq=%" PRIu64 " duplicate or late reply ignored\n", seq); } else { fprintf(stderr, "seq=%" PRIu64 " unexpected reply ignored\n", seq); } } else if (msg.type == MSG_TYPE_ERROR) { fprintf(stderr, "server error: %.*s\n", (int) msg.body_len, msg.body == NULL ? "" : (const char *) msg.body); } else { fprintf(stderr, "unexpected message type %s from %s ignored\n", protocol_message_type_name(msg.type), msg.from); } protocol_message_clear(&msg); } } while (popped == 1); ping_receiver_status(&receiver_ctx, &receiver_closed, &receiver_status_rc); if (receiver_closed && receiver_status_rc != 0) { fprintf(stderr, "udpping: receive loop failed\n"); goto cleanup; } if ((g_udpping_stop || stop_sending) && tracker.pending_count == 0) { break; } usleep((useconds_t) poll_ms * 1000U); } } ping_print_summary(stdout, target_peer, &tracker); rc = 0; cleanup: receiver_ctx.stop_requested = 1; udp_client_close(client); if (receiver_thread_started) { pthread_join(receiver_thread, NULL); ping_receiver_ctx_destroy(&receiver_ctx); } else if (receiver_ctx_initialized) { ping_receiver_ctx_destroy(&receiver_ctx); } udp_client_free(client); latencylog_close(latency_logger); ping_tracker_destroy(&tracker); return rc; }