#include "cli_parse.h" #include "interactive.h" #include "peer_udp_client.h" #include typedef struct udppeer_receive_ctx { udp_client_t *client; const char *inbox_dir; volatile int stop_requested; int rc; } udppeer_receive_ctx_t; static void udppeer_usage(FILE *out) { fprintf(out, "usage: udppeer [-id peer-a] [-server 127.0.0.1:9001] [-to peer] [-text msg | -file path]\n"); fprintf(out, " [-bind-ip ip] [-inbox-dir dir] [-latency-log path] [-tx-ts-debug-log path]\n"); fprintf(out, " [-interactive[=true|false]]\n"); } static void *udppeer_receive_thread_main(void *arg) { udppeer_receive_ctx_t *ctx = (udppeer_receive_ctx_t *) arg; for (;;) { message_t msg; char persisted_path[512]; protocol_message_init(&msg); if (udp_client_receive(ctx->client, &msg) != 0) { protocol_message_clear(&msg); ctx->rc = ctx->stop_requested ? 0 : -1; return NULL; } switch (msg.type) { case MSG_TYPE_TEXT: if (udp_client_persist_message(ctx->client, &msg, ctx->inbox_dir, persisted_path, sizeof(persisted_path)) != 0) { fprintf(stderr, "udppeer: persist text from %s to %s failed\n", msg.from, msg.to); protocol_message_clear(&msg); ctx->rc = -1; return NULL; } fprintf(stderr, "received text from %s to %s and persisted to %s\n", msg.from, msg.to, persisted_path); break; case MSG_TYPE_FILE: if (udp_client_persist_message(ctx->client, &msg, ctx->inbox_dir, persisted_path, sizeof(persisted_path)) != 0) { fprintf(stderr, "udppeer: persist file from %s to %s failed\n", msg.from, msg.to); protocol_message_clear(&msg); ctx->rc = -1; return NULL; } fprintf(stderr, "received file from %s to %s: %s (%lu bytes) -> %s\n", msg.from, msg.to, msg.file_name, (unsigned long) msg.body_len, persisted_path); break; case MSG_TYPE_BINARY: if (udp_client_persist_message(ctx->client, &msg, ctx->inbox_dir, persisted_path, sizeof(persisted_path)) != 0) { fprintf(stderr, "udppeer: persist binary payload from %s to %s failed\n", msg.from, msg.to); protocol_message_clear(&msg); ctx->rc = -1; return NULL; } fprintf(stderr, "received binary payload from %s to %s (%lu bytes) -> %s\n", msg.from, msg.to, (unsigned long) msg.body_len, persisted_path); break; case MSG_TYPE_ERROR: fprintf(stderr, "received error from %s to %s: %.*s\n", msg.from, msg.to, (int) msg.body_len, msg.body == NULL ? "" : (const char *) msg.body); break; default: fprintf(stderr, "received unexpected message type %s from %s\n", protocol_message_type_name(msg.type), msg.from); protocol_message_clear(&msg); ctx->rc = -1; return NULL; } protocol_message_clear(&msg); } } int main(int argc, char **argv) { const char *peer_id = "peer-a"; const char *server_addr = "127.0.0.1:9001"; const char *target_peer = ""; const char *text = ""; const char *file_path = ""; const char *bind_ip = ""; const char *inbox_dir = "inbox"; const char *latency_log_path = ""; const char *tx_debug_log_path = ""; int interactive = 1; latency_logger_t *latency_logger = NULL; tx_timestamp_debug_logger_t *debug_logger = NULL; udp_client_t *client = NULL; udppeer_receive_ctx_t receive_ctx; pthread_t receive_thread; int receive_thread_started = 0; int i; int rc = 1; memset(&receive_ctx, 0, sizeof(receive_ctx)); for (i = 1; i < argc; ++i) { const char *value = NULL; int handled; if ((handled = cli_parse_value_flag(argc, argv, &i, argv[i], "-id", &value)) < 0) { fprintf(stderr, "udppeer: flag -id requires a value\n"); return 1; } else if (handled) { peer_id = value; continue; } if ((handled = cli_parse_value_flag(argc, argv, &i, argv[i], "-server", &value)) < 0) { fprintf(stderr, "udppeer: flag -server requires a value\n"); return 1; } else if (handled) { server_addr = value; continue; } if ((handled = cli_parse_value_flag(argc, argv, &i, argv[i], "-to", &value)) < 0) { fprintf(stderr, "udppeer: flag -to requires a value\n"); return 1; } else if (handled) { target_peer = value; continue; } if ((handled = cli_parse_value_flag(argc, argv, &i, argv[i], "-text", &value)) < 0) { fprintf(stderr, "udppeer: flag -text requires a value\n"); return 1; } else if (handled) { text = value; continue; } if ((handled = cli_parse_value_flag(argc, argv, &i, argv[i], "-file", &value)) < 0) { fprintf(stderr, "udppeer: flag -file requires a value\n"); return 1; } else if (handled) { file_path = value; continue; } if ((handled = cli_parse_value_flag(argc, argv, &i, argv[i], "-bind-ip", &value)) < 0) { fprintf(stderr, "udppeer: flag -bind-ip requires a value\n"); return 1; } else if (handled) { bind_ip = value; continue; } if ((handled = cli_parse_value_flag(argc, argv, &i, argv[i], "-inbox-dir", &value)) < 0) { fprintf(stderr, "udppeer: flag -inbox-dir requires a value\n"); return 1; } else if (handled) { inbox_dir = value; continue; } if ((handled = cli_parse_value_flag(argc, argv, &i, argv[i], "-latency-log", &value)) < 0) { fprintf(stderr, "udppeer: flag -latency-log requires a value\n"); return 1; } else if (handled) { latency_log_path = value; continue; } if ((handled = cli_parse_value_flag(argc, argv, &i, argv[i], "-tx-ts-debug-log", &value)) < 0) { fprintf(stderr, "udppeer: flag -tx-ts-debug-log requires a value\n"); return 1; } else if (handled) { tx_debug_log_path = value; continue; } if ((handled = cli_parse_bool_flag(argv[i], "-interactive", &interactive)) < 0) { fprintf(stderr, "udppeer: invalid -interactive value\n"); return 1; } else if (handled) { continue; } if (strcmp(argv[i], "-h") == 0 || strcmp(argv[i], "--help") == 0) { udppeer_usage(stdout); return 0; } fprintf(stderr, "udppeer: unknown argument %s\n", argv[i]); udppeer_usage(stderr); return 1; } if (text[0] != '\0' && file_path[0] != '\0') { fprintf(stderr, "udppeer: only one of -text or -file may be specified\n"); return 1; } if ((text[0] != '\0' || file_path[0] != '\0') && target_peer[0] == '\0') { fprintf(stderr, "udppeer: flag -to is required when sending text or file\n"); return 1; } if (latency_log_path[0] != '\0') { latency_logger = latencylog_open_jsonl(latency_log_path); if (latency_logger == NULL) { fprintf(stderr, "udppeer: open latency logger %s failed\n", latency_log_path); goto cleanup; } } if (tx_debug_log_path[0] != '\0') { debug_logger = tx_timestamp_debug_open_jsonl(tx_debug_log_path); if (debug_logger == NULL) { fprintf(stderr, "udppeer: open tx timestamp debug logger %s failed\n", tx_debug_log_path); goto cleanup; } } client = udp_client_dial(server_addr, peer_id, bind_ip, latency_logger, debug_logger, tx_debug_log_path[0] != '\0'); if (client == NULL) { fprintf(stderr, "udppeer: dial udp server %s failed\n", server_addr); goto cleanup; } fprintf(stderr, "connected to %s as %s (UDP)\n", server_addr, udp_client_id(client)); receive_ctx.client = client; receive_ctx.inbox_dir = inbox_dir; if (pthread_create(&receive_thread, NULL, udppeer_receive_thread_main, &receive_ctx) != 0) { fprintf(stderr, "udppeer: create receive thread failed\n"); goto cleanup; } receive_thread_started = 1; if (target_peer[0] != '\0' && text[0] != '\0') { if (udp_client_send_text(client, target_peer, text) != 0) { fprintf(stderr, "udppeer: send text to %s failed\n", target_peer); goto cleanup; } fprintf(stderr, "sent text to %s\n", target_peer); } if (target_peer[0] != '\0' && file_path[0] != '\0') { if (udp_client_send_file_path(client, target_peer, file_path) != 0) { fprintf(stderr, "udppeer: send file %s to %s failed\n", file_path, target_peer); goto cleanup; } fprintf(stderr, "sent file %s to %s\n", file_path, target_peer); } if (interactive) { char line[2048]; char prompt[128]; snprintf(prompt, sizeof(prompt), "%s> ", udp_client_id(client)); interactive_print_help(stdout, "UDP"); while (fputs(prompt, stdout) >= 0 && fflush(stdout) == 0 && fgets(line, sizeof(line), stdin) != NULL) { interactive_command_t command; char err[128]; omni_trim_newline(line); if (interactive_parse_command(line, &command, err, sizeof(err)) != 0) { if (strstr(err, "empty command") == NULL) { fprintf(stderr, "%s\n", err); } continue; } if (command.type == INTERACTIVE_CMD_HELP) { interactive_print_help(stdout, "UDP"); continue; } if (command.type == INTERACTIVE_CMD_QUIT) { break; } if (command.type == INTERACTIVE_CMD_TEXT) { if (udp_client_send_text(client, command.to, command.value) != 0) { fprintf(stderr, "udppeer: send text to %s failed\n", command.to); continue; } fprintf(stderr, "sent text to %s\n", command.to); continue; } if (command.type == INTERACTIVE_CMD_FILE) { if (udp_client_send_file_path(client, command.to, command.value) != 0) { fprintf(stderr, "udppeer: send file %s to %s failed\n", command.value, command.to); continue; } fprintf(stderr, "sent file %s to %s\n", command.value, command.to); continue; } } } rc = 0; cleanup: receive_ctx.stop_requested = 1; udp_client_close(client); if (receive_thread_started) { pthread_join(receive_thread, NULL); if (rc == 0 && receive_ctx.rc != 0) { rc = 1; } } udp_client_free(client); tx_timestamp_debug_close(debug_logger); latencylog_close(latency_logger); return rc; }