Files
OmniSocketGo/ros-control-c/common/teleop_transport.c
2026-04-03 12:04:39 +08:00

301 lines
9.1 KiB
C

#include "teleop_transport.h"
#include <arpa/inet.h>
#include <errno.h>
#include <stdio.h>
#include <string.h>
#include <sys/socket.h>
#include <unistd.h>
static void teleop_transport_clear(teleop_transport_t *transport)
{
if (transport == NULL) {
return;
}
memset(transport, 0, sizeof(*transport));
transport->mode = TELEOP_TRANSPORT_MODE_UDP;
transport->udp_fd = -1;
}
int teleop_transport_parse_mode(const char *raw, teleop_transport_mode_t *out_mode)
{
if (raw == NULL || out_mode == NULL) {
errno = EINVAL;
return -1;
}
if (strcmp(raw, "udp") == 0) {
*out_mode = TELEOP_TRANSPORT_MODE_UDP;
return 0;
}
if (strcmp(raw, "kcp") == 0) {
*out_mode = TELEOP_TRANSPORT_MODE_KCP;
return 0;
}
errno = EINVAL;
return -1;
}
const char *teleop_transport_mode_name(teleop_transport_mode_t mode)
{
return mode == TELEOP_TRANSPORT_MODE_KCP ? "kcp" : "udp";
}
static void teleop_transport_log_incoming(const message_t *msg)
{
if (msg == NULL) {
return;
}
switch (msg->type) {
case MSG_TYPE_ERROR:
fprintf(stderr,
"teleop transport: server error from %s to %s: %.*s\n",
msg->from,
msg->to,
(int)msg->body_len,
msg->body == NULL ? "" : (const char *)msg->body);
break;
case MSG_TYPE_TEXT:
fprintf(stderr,
"teleop transport: dropped unexpected text from %s to %s: %.*s\n",
msg->from,
msg->to,
(int)msg->body_len,
msg->body == NULL ? "" : (const char *)msg->body);
break;
case MSG_TYPE_BINARY:
fprintf(stderr,
"teleop transport: dropped unexpected binary payload from %s to %s (%lu bytes)\n",
msg->from,
msg->to,
(unsigned long)msg->body_len);
break;
case MSG_TYPE_FILE:
fprintf(stderr,
"teleop transport: dropped unexpected file from %s to %s: %s (%lu bytes)\n",
msg->from,
msg->to,
msg->file_name,
(unsigned long)msg->body_len);
break;
case MSG_TYPE_REGISTER:
fprintf(stderr,
"teleop transport: dropped unexpected register message from %s to %s\n",
msg->from,
msg->to);
break;
default:
fprintf(stderr,
"teleop transport: dropped unexpected message type %s from %s\n",
protocol_message_type_name(msg->type),
msg->from);
break;
}
}
static void *teleop_transport_kcp_recv_thread_main(void *arg)
{
teleop_transport_t *transport = (teleop_transport_t *)arg;
for (;;) {
message_t msg;
int rc;
if (transport->stop_requested) {
return NULL;
}
protocol_message_init(&msg);
rc = kcp_client_receive_timed(transport->kcp_client, &msg, 100);
if (rc == 1) {
protocol_message_clear(&msg);
continue;
}
if (rc != 0) {
protocol_message_clear(&msg);
if (!transport->stop_requested) {
int saved_errno = errno;
fprintf(stderr,
"teleop transport: KCP receive loop stopped: %s (errno=%d)\n",
saved_errno != 0 ? strerror(saved_errno) : "unknown error",
saved_errno);
}
return NULL;
}
teleop_transport_log_incoming(&msg);
protocol_message_clear(&msg);
}
}
static int teleop_transport_open_udp(teleop_transport_t *transport, const teleop_transport_config_t *config)
{
int sockfd;
if (transport == NULL || config == NULL || config->udp_ip == NULL) {
errno = EINVAL;
return -1;
}
sockfd = socket(AF_INET, SOCK_DGRAM, 0);
if (sockfd < 0) {
perror("socket");
return -1;
}
memset(&transport->udp_dest, 0, sizeof(transport->udp_dest));
transport->udp_dest.sin_family = AF_INET;
transport->udp_dest.sin_port = htons(config->udp_port);
if (inet_pton(AF_INET, config->udp_ip, &transport->udp_dest.sin_addr) <= 0) {
fprintf(stderr, "Invalid IP: %s\n", config->udp_ip);
close(sockfd);
errno = EINVAL;
return -1;
}
transport->udp_fd = sockfd;
return 0;
}
static int teleop_transport_open_kcp(teleop_transport_t *transport, const teleop_transport_config_t *config)
{
kcp_conn_options_t options;
const char *relay_via;
if (transport == NULL || config == NULL ||
config->server_addr == NULL || config->peer_id == NULL || config->target_peer == NULL) {
errno = EINVAL;
return -1;
}
kcp_conn_options_set_control_defaults(&options);
relay_via = (config->relay_via != NULL && config->relay_via[0] != '\0') ? config->relay_via : NULL;
transport->kcp_client = kcp_client_dial_with_options(
config->server_addr,
relay_via,
config->peer_id,
"",
"",
&options,
NULL,
NULL,
NULL,
KCP_DEFAULT_STATS_INTERVAL_MS
);
if (transport->kcp_client == NULL) {
int saved_errno = errno;
fprintf(stderr,
"teleop transport: failed to open KCP session as %s via %s%s%s: %s (errno=%d)\n",
config->peer_id,
config->server_addr,
relay_via != NULL ? ", relay=" : "",
relay_via != NULL ? relay_via : "",
saved_errno != 0 ? strerror(saved_errno) : "unknown error",
saved_errno);
errno = saved_errno;
return -1;
}
{
int thread_rc = pthread_create(&transport->recv_thread, NULL, teleop_transport_kcp_recv_thread_main, transport);
if (thread_rc != 0) {
fprintf(stderr,
"teleop transport: failed to start KCP receive thread: %s (errno=%d)\n",
strerror(thread_rc),
thread_rc);
kcp_client_close(transport->kcp_client);
kcp_client_free(transport->kcp_client);
transport->kcp_client = NULL;
errno = thread_rc;
return -1;
}
}
transport->recv_thread_started = 1;
return 0;
}
int teleop_transport_open(teleop_transport_t *transport, const teleop_transport_config_t *config)
{
if (transport == NULL || config == NULL) {
errno = EINVAL;
return -1;
}
teleop_transport_clear(transport);
transport->mode = config->mode;
snprintf(transport->server_addr, sizeof(transport->server_addr), "%s",
config->server_addr == NULL ? "" : config->server_addr);
snprintf(transport->relay_via, sizeof(transport->relay_via), "%s",
config->relay_via == NULL ? "" : config->relay_via);
snprintf(transport->peer_id, sizeof(transport->peer_id), "%s",
config->peer_id == NULL ? "" : config->peer_id);
snprintf(transport->target_peer, sizeof(transport->target_peer), "%s",
config->target_peer == NULL ? "" : config->target_peer);
if (config->mode == TELEOP_TRANSPORT_MODE_KCP) {
return teleop_transport_open_kcp(transport, config);
}
return teleop_transport_open_udp(transport, config);
}
int teleop_transport_send_twist(teleop_transport_t *transport, const twist_cmd_t *cmd)
{
if (transport == NULL || cmd == NULL) {
errno = EINVAL;
return -1;
}
if (transport->mode == TELEOP_TRANSPORT_MODE_KCP) {
if (kcp_client_send_binary(transport->kcp_client, transport->target_peer, cmd, TWIST_CMD_SIZE) != 0) {
int saved_errno = errno;
fprintf(stderr,
"teleop transport: failed to send KCP payload to %s: %s (errno=%d)\n",
transport->target_peer,
saved_errno != 0 ? strerror(saved_errno) : "unknown error",
saved_errno);
errno = saved_errno;
return -1;
}
return 0;
}
{
ssize_t sent = sendto(transport->udp_fd, cmd, TWIST_CMD_SIZE, 0,
(const struct sockaddr *)&transport->udp_dest, sizeof(transport->udp_dest));
if (sent < 0) {
perror("sendto");
return -1;
}
if ((size_t)sent != TWIST_CMD_SIZE) {
fprintf(stderr, "sendto: short send (%zd/%zu)\n", sent, (size_t)TWIST_CMD_SIZE);
errno = EIO;
return -1;
}
}
return 0;
}
void teleop_transport_close(teleop_transport_t *transport)
{
if (transport == NULL) {
return;
}
transport->stop_requested = 1;
if (transport->kcp_client != NULL) {
kcp_client_close(transport->kcp_client);
}
if (transport->recv_thread_started) {
pthread_join(transport->recv_thread, NULL);
transport->recv_thread_started = 0;
}
if (transport->kcp_client != NULL) {
kcp_client_free(transport->kcp_client);
transport->kcp_client = NULL;
}
if (transport->udp_fd >= 0) {
close(transport->udp_fd);
transport->udp_fd = -1;
}
}