#include "teleop_transport.h" #include #include #include #include #include #include static void teleop_transport_clear(teleop_transport_t *transport) { if (transport == NULL) { return; } memset(transport, 0, sizeof(*transport)); transport->mode = TELEOP_TRANSPORT_MODE_UDP; transport->udp_fd = -1; } int teleop_transport_parse_mode(const char *raw, teleop_transport_mode_t *out_mode) { if (raw == NULL || out_mode == NULL) { errno = EINVAL; return -1; } if (strcmp(raw, "udp") == 0) { *out_mode = TELEOP_TRANSPORT_MODE_UDP; return 0; } if (strcmp(raw, "kcp") == 0) { *out_mode = TELEOP_TRANSPORT_MODE_KCP; return 0; } errno = EINVAL; return -1; } const char *teleop_transport_mode_name(teleop_transport_mode_t mode) { return mode == TELEOP_TRANSPORT_MODE_KCP ? "kcp" : "udp"; } static void teleop_transport_log_incoming(const message_t *msg) { if (msg == NULL) { return; } switch (msg->type) { case MSG_TYPE_ERROR: fprintf(stderr, "teleop transport: server error from %s to %s: %.*s\n", msg->from, msg->to, (int)msg->body_len, msg->body == NULL ? "" : (const char *)msg->body); break; case MSG_TYPE_TEXT: fprintf(stderr, "teleop transport: dropped unexpected text from %s to %s: %.*s\n", msg->from, msg->to, (int)msg->body_len, msg->body == NULL ? "" : (const char *)msg->body); break; case MSG_TYPE_BINARY: fprintf(stderr, "teleop transport: dropped unexpected binary payload from %s to %s (%lu bytes)\n", msg->from, msg->to, (unsigned long)msg->body_len); break; case MSG_TYPE_FILE: fprintf(stderr, "teleop transport: dropped unexpected file from %s to %s: %s (%lu bytes)\n", msg->from, msg->to, msg->file_name, (unsigned long)msg->body_len); break; case MSG_TYPE_REGISTER: fprintf(stderr, "teleop transport: dropped unexpected register message from %s to %s\n", msg->from, msg->to); break; default: fprintf(stderr, "teleop transport: dropped unexpected message type %s from %s\n", protocol_message_type_name(msg->type), msg->from); break; } } static void *teleop_transport_kcp_recv_thread_main(void *arg) { teleop_transport_t *transport = (teleop_transport_t *)arg; for (;;) { message_t msg; int rc; if (transport->stop_requested) { return NULL; } protocol_message_init(&msg); rc = kcp_client_receive_timed(transport->kcp_client, &msg, 100); if (rc == 1) { protocol_message_clear(&msg); continue; } if (rc != 0) { protocol_message_clear(&msg); if (!transport->stop_requested) { int saved_errno = errno; fprintf(stderr, "teleop transport: KCP receive loop stopped: %s (errno=%d)\n", saved_errno != 0 ? strerror(saved_errno) : "unknown error", saved_errno); } return NULL; } teleop_transport_log_incoming(&msg); protocol_message_clear(&msg); } } static int teleop_transport_open_udp(teleop_transport_t *transport, const teleop_transport_config_t *config) { int sockfd; if (transport == NULL || config == NULL || config->udp_ip == NULL) { errno = EINVAL; return -1; } sockfd = socket(AF_INET, SOCK_DGRAM, 0); if (sockfd < 0) { perror("socket"); return -1; } memset(&transport->udp_dest, 0, sizeof(transport->udp_dest)); transport->udp_dest.sin_family = AF_INET; transport->udp_dest.sin_port = htons(config->udp_port); if (inet_pton(AF_INET, config->udp_ip, &transport->udp_dest.sin_addr) <= 0) { fprintf(stderr, "Invalid IP: %s\n", config->udp_ip); close(sockfd); errno = EINVAL; return -1; } transport->udp_fd = sockfd; return 0; } static int teleop_transport_open_kcp(teleop_transport_t *transport, const teleop_transport_config_t *config) { kcp_conn_options_t options; const char *relay_via; if (transport == NULL || config == NULL || config->server_addr == NULL || config->peer_id == NULL || config->target_peer == NULL) { errno = EINVAL; return -1; } kcp_conn_options_set_control_defaults(&options); relay_via = (config->relay_via != NULL && config->relay_via[0] != '\0') ? config->relay_via : NULL; transport->kcp_client = kcp_client_dial_with_options( config->server_addr, relay_via, config->peer_id, "", "", &options, NULL, NULL, NULL, KCP_DEFAULT_STATS_INTERVAL_MS ); if (transport->kcp_client == NULL) { int saved_errno = errno; fprintf(stderr, "teleop transport: failed to open KCP session as %s via %s%s%s: %s (errno=%d)\n", config->peer_id, config->server_addr, relay_via != NULL ? ", relay=" : "", relay_via != NULL ? relay_via : "", saved_errno != 0 ? strerror(saved_errno) : "unknown error", saved_errno); errno = saved_errno; return -1; } { int thread_rc = pthread_create(&transport->recv_thread, NULL, teleop_transport_kcp_recv_thread_main, transport); if (thread_rc != 0) { fprintf(stderr, "teleop transport: failed to start KCP receive thread: %s (errno=%d)\n", strerror(thread_rc), thread_rc); kcp_client_close(transport->kcp_client); kcp_client_free(transport->kcp_client); transport->kcp_client = NULL; errno = thread_rc; return -1; } } transport->recv_thread_started = 1; return 0; } int teleop_transport_open(teleop_transport_t *transport, const teleop_transport_config_t *config) { if (transport == NULL || config == NULL) { errno = EINVAL; return -1; } teleop_transport_clear(transport); transport->mode = config->mode; snprintf(transport->server_addr, sizeof(transport->server_addr), "%s", config->server_addr == NULL ? "" : config->server_addr); snprintf(transport->relay_via, sizeof(transport->relay_via), "%s", config->relay_via == NULL ? "" : config->relay_via); snprintf(transport->peer_id, sizeof(transport->peer_id), "%s", config->peer_id == NULL ? "" : config->peer_id); snprintf(transport->target_peer, sizeof(transport->target_peer), "%s", config->target_peer == NULL ? "" : config->target_peer); if (config->mode == TELEOP_TRANSPORT_MODE_KCP) { return teleop_transport_open_kcp(transport, config); } return teleop_transport_open_udp(transport, config); } int teleop_transport_send_twist(teleop_transport_t *transport, const twist_cmd_t *cmd) { if (transport == NULL || cmd == NULL) { errno = EINVAL; return -1; } if (transport->mode == TELEOP_TRANSPORT_MODE_KCP) { if (kcp_client_send_binary(transport->kcp_client, transport->target_peer, cmd, TWIST_CMD_SIZE) != 0) { int saved_errno = errno; fprintf(stderr, "teleop transport: failed to send KCP payload to %s: %s (errno=%d)\n", transport->target_peer, saved_errno != 0 ? strerror(saved_errno) : "unknown error", saved_errno); errno = saved_errno; return -1; } return 0; } { ssize_t sent = sendto(transport->udp_fd, cmd, TWIST_CMD_SIZE, 0, (const struct sockaddr *)&transport->udp_dest, sizeof(transport->udp_dest)); if (sent < 0) { perror("sendto"); return -1; } if ((size_t)sent != TWIST_CMD_SIZE) { fprintf(stderr, "sendto: short send (%zd/%zu)\n", sent, (size_t)TWIST_CMD_SIZE); errno = EIO; return -1; } } return 0; } void teleop_transport_close(teleop_transport_t *transport) { if (transport == NULL) { return; } transport->stop_requested = 1; if (transport->kcp_client != NULL) { kcp_client_close(transport->kcp_client); } if (transport->recv_thread_started) { pthread_join(transport->recv_thread, NULL); transport->recv_thread_started = 0; } if (transport->kcp_client != NULL) { kcp_client_free(transport->kcp_client); transport->kcp_client = NULL; } if (transport->udp_fd >= 0) { close(transport->udp_fd); transport->udp_fd = -1; } }