From 70e835ed49cbe67dc851e162d7e052e9752568cf Mon Sep 17 00:00:00 2001 From: Mock Date: Sat, 4 Apr 2026 23:25:43 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E8=A7=86=E9=A2=91=E4=B8=8E=E6=8E=A7?= =?UTF-8?q?=E5=88=B6=E7=A8=8B=E5=BA=8F=E5=90=88=E5=B9=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .claude/settings.local.json | 3 +- Makefile | 20 +- cmd/b_side_omnid.c | 428 +++++++++ cmd/v1_camera_pipeline_ifdef.c | 697 +------------- include/control_protocol.h | 7 + include/peer_kcp_client.h | 1 + include/transport_kcp.h | 12 + include/video_pipeline.h | 52 ++ python/omnisocket/_omnisocket.c | 33 + python/omnisocket/omnisocket_client.c | 26 + python/omnisocket/omnisocket_client.h | 13 + python/tests/test_sessions.py | 7 + ros-control-py/README.md | 11 + ros-control-py/ROS2 Teleop over UDP.md | 6 + .../launch/robot_udp_receiver.launch.py | 2 + .../udp_teleop_bridge/udp_cmd_vel_receiver.py | 140 ++- src/peer_kcp_client.c | 12 + src/transport_kcp.c | 27 + src/video_pipeline.c | 883 ++++++++++++++++++ 19 files changed, 1674 insertions(+), 706 deletions(-) create mode 100644 cmd/b_side_omnid.c create mode 100644 include/control_protocol.h create mode 100644 include/video_pipeline.h create mode 100644 src/video_pipeline.c diff --git a/.claude/settings.local.json b/.claude/settings.local.json index 48ce4f6..04445de 100644 --- a/.claude/settings.local.json +++ b/.claude/settings.local.json @@ -11,7 +11,8 @@ "Bash(git pull:*)", "Bash(wc:*)", "Bash(python3 -c \"import dis, marshal, types; f = open\\(''''C:/Users/64187/Desktop/Workspace/OmniSocketGo/__pycache__/omnisocket_video_sender.cpython-312.pyc'''',''''rb''''\\); f.read\\(16\\); code=marshal.load\\(f\\); dis.dis\\(code\\)\")", - "Bash(gh pr:*)" + "Bash(gh pr:*)", + "Bash(python3 -c ':*)" ] } } diff --git a/Makefile b/Makefile index c2ea417..8c9d6e1 100644 --- a/Makefile +++ b/Makefile @@ -41,8 +41,8 @@ TARGETS := \ $(BIN_DIR)/kcpping CAMERA_VIDEO_SENDER := $(BIN_DIR)/camera_video_sender -CAMERA_VIDEO_SENDER_SRCS := \ - $(CMD_DIR)/v1_camera_pipeline_ifdef.c \ +FFMPEG_PIPELINE_COMMON_SRCS := \ + $(SRC_DIR)/video_pipeline.c \ $(SRC_DIR)/omni_common.c \ $(SRC_DIR)/protocol.c \ $(SRC_DIR)/latencylog.c \ @@ -54,6 +54,15 @@ CAMERA_VIDEO_SENDER_SRCS := \ third_party/cjson/cJSON.c \ third_party/kcp/ikcp.c +CAMERA_VIDEO_SENDER_SRCS := \ + $(CMD_DIR)/v1_camera_pipeline_ifdef.c \ + $(FFMPEG_PIPELINE_COMMON_SRCS) + +B_SIDE_OMNID := $(BIN_DIR)/b_side_omnid +B_SIDE_OMNID_SRCS := \ + $(CMD_DIR)/b_side_omnid.c \ + $(FFMPEG_PIPELINE_COMMON_SRCS) + all: $(TARGETS) $(BIN_DIR): @@ -85,6 +94,11 @@ $(CAMERA_VIDEO_SENDER): $(CAMERA_VIDEO_SENDER_SRCS) | $(BIN_DIR) camera_video_sender: $(CAMERA_VIDEO_SENDER) +$(B_SIDE_OMNID): $(B_SIDE_OMNID_SRCS) | $(BIN_DIR) + $(CC) $(CFLAGS) $(CPPFLAGS) $$(pkg-config --cflags libavformat libavcodec libavutil libswscale) -o $@ $^ $(LDFLAGS) $$(pkg-config --libs libavformat libavcodec libavutil libswscale) + +b_side_omnid: $(B_SIDE_OMNID) + clean: rm -rf $(BIN_DIR) @@ -94,4 +108,4 @@ python-ext: python-install: cd python && $(PYTHON) -m pip install -e . -.PHONY: all clean python-ext python-install camera_video_sender +.PHONY: all clean python-ext python-install camera_video_sender b_side_omnid diff --git a/cmd/b_side_omnid.c b/cmd/b_side_omnid.c new file mode 100644 index 0000000..f47c4e0 --- /dev/null +++ b/cmd/b_side_omnid.c @@ -0,0 +1,428 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "control_protocol.h" +#include "video_pipeline.h" + +#define CONTROL_DEFAULT_PEER_ID "peer-b-ctrl" +#define CONTROL_DEFAULT_EXPECTED_SENDER "peer-a-ctrl" +#define CONTROL_DEFAULT_UNIX_SOCKET "/tmp/omnisocket-b-side-cmd.sock" + +typedef struct unix_dgram_client { + int fd; + char bind_path[108]; + struct sockaddr_un dest_addr; + socklen_t dest_len; +} unix_dgram_client_t; + +typedef struct control_bridge_stats { + pthread_mutex_t mutex; + uint64_t packets_forwarded; + uint64_t invalid_packets; + uint64_t unix_send_errors; + int connected; + char last_error[256]; + kcp_runtime_stats_t transport; +} control_bridge_stats_t; + +typedef struct daemon_state { + volatile sig_atomic_t *stop_requested; + video_pipeline_config_t video_config; + video_pipeline_stats_t video_stats; + const char *control_server_addr; + const char *control_relay_via; + const char *control_bind_ip; + const char *control_bind_device; + const char *control_peer_id; + const char *control_expected_sender; + const char *control_unix_socket; + unix_dgram_client_t unix_client; + control_bridge_stats_t control_stats; +} daemon_state_t; + +static volatile sig_atomic_t g_stop_requested = 0; + +static void handle_signal(int signum) { + (void) signum; + g_stop_requested = 1; +} + +static int install_signal_handler(int signum) { + struct sigaction action; + + memset(&action, 0, sizeof(action)); + action.sa_handler = handle_signal; + action.sa_flags = SA_RESTART; + if (sigemptyset(&action.sa_mask) != 0) { + return -1; + } + return sigaction(signum, &action, NULL); +} + +static const char *env_or_default(const char *name, const char *fallback) { + const char *value = getenv(name); + if (value != NULL && value[0] != '\0') { + return value; + } + return fallback; +} + +static const char *env_first_nonempty(const char *first, const char *second, const char *fallback) { + const char *value = getenv(first); + if (value != NULL && value[0] != '\0') { + return value; + } + value = getenv(second); + if (value != NULL && value[0] != '\0') { + return value; + } + return fallback; +} + +static int control_bridge_stats_init(control_bridge_stats_t *stats) { + int rc; + if (stats == NULL) { + errno = EINVAL; + return -1; + } + memset(stats, 0, sizeof(*stats)); + rc = pthread_mutex_init(&stats->mutex, NULL); + if (rc != 0) { + errno = rc; + return -1; + } + return 0; +} + +static void control_bridge_stats_destroy(control_bridge_stats_t *stats) { + if (stats == NULL) { + return; + } + pthread_mutex_destroy(&stats->mutex); +} + +static void control_bridge_set_error(control_bridge_stats_t *stats, const char *message) { + if (stats == NULL) { + return; + } + pthread_mutex_lock(&stats->mutex); + snprintf(stats->last_error, sizeof(stats->last_error), "%s", message == NULL ? "" : message); + pthread_mutex_unlock(&stats->mutex); +} + +static void control_bridge_set_errno_error(control_bridge_stats_t *stats, const char *prefix) { + char buffer[256]; + int saved_errno = errno; + + snprintf( + buffer, + sizeof(buffer), + "%s: %s (errno=%d)", + prefix == NULL ? "control bridge error" : prefix, + saved_errno != 0 ? strerror(saved_errno) : "unknown error", + saved_errno + ); + control_bridge_set_error(stats, buffer); +} + +static void control_bridge_stats_snapshot(control_bridge_stats_t *stats, control_bridge_stats_t *out_stats) { + if (stats == NULL || out_stats == NULL) { + return; + } + memset(out_stats, 0, sizeof(*out_stats)); + pthread_mutex_lock(&stats->mutex); + out_stats->packets_forwarded = stats->packets_forwarded; + out_stats->invalid_packets = stats->invalid_packets; + out_stats->unix_send_errors = stats->unix_send_errors; + out_stats->connected = stats->connected; + snprintf(out_stats->last_error, sizeof(out_stats->last_error), "%s", stats->last_error); + out_stats->transport = stats->transport; + pthread_mutex_unlock(&stats->mutex); +} + +static int unix_dgram_client_init(unix_dgram_client_t *client, const char *dest_path) { + struct sockaddr_un bind_addr; + pid_t pid; + + if (client == NULL || dest_path == NULL || dest_path[0] == '\0') { + errno = EINVAL; + return -1; + } + + memset(client, 0, sizeof(*client)); + client->fd = socket(AF_UNIX, SOCK_DGRAM, 0); + if (client->fd < 0) { + return -1; + } + + memset(&bind_addr, 0, sizeof(bind_addr)); + bind_addr.sun_family = AF_UNIX; + pid = getpid(); + snprintf(client->bind_path, sizeof(client->bind_path), "/tmp/omnisocket-b-side-cmd-client-%ld.sock", (long) pid); + unlink(client->bind_path); + snprintf(bind_addr.sun_path, sizeof(bind_addr.sun_path), "%s", client->bind_path); + if (bind(client->fd, (const struct sockaddr *) &bind_addr, sizeof(bind_addr)) != 0) { + close(client->fd); + unlink(client->bind_path); + client->fd = -1; + return -1; + } + + memset(&client->dest_addr, 0, sizeof(client->dest_addr)); + client->dest_addr.sun_family = AF_UNIX; + snprintf(client->dest_addr.sun_path, sizeof(client->dest_addr.sun_path), "%s", dest_path); + client->dest_len = (socklen_t) sizeof(client->dest_addr); + return 0; +} + +static int unix_dgram_client_send(unix_dgram_client_t *client, const void *data, size_t len) { + ssize_t written; + if (client == NULL || client->fd < 0 || (data == NULL && len > 0)) { + errno = EINVAL; + return -1; + } + written = sendto(client->fd, data, len, 0, (const struct sockaddr *) &client->dest_addr, client->dest_len); + if (written < 0 || (size_t) written != len) { + if (written >= 0) { + errno = EIO; + } + return -1; + } + return 0; +} + +static void unix_dgram_client_close(unix_dgram_client_t *client) { + if (client == NULL) { + return; + } + if (client->fd >= 0) { + close(client->fd); + client->fd = -1; + } + if (client->bind_path[0] != '\0') { + unlink(client->bind_path); + client->bind_path[0] = '\0'; + } +} + +static void *video_thread_main(void *arg) { + daemon_state_t *state = (daemon_state_t *) arg; + + while (!*state->stop_requested) { + if (video_pipeline_run(&state->video_config, &state->video_stats, state->stop_requested) == 0) { + break; + } + if (!*state->stop_requested) { + sleep(1); + } + } + return NULL; +} + +static void *control_thread_main(void *arg) { + daemon_state_t *state = (daemon_state_t *) arg; + + while (!*state->stop_requested) { + kcp_conn_options_t options; + kcp_client_t *client = NULL; + + kcp_conn_options_set_control_defaults(&options); + client = kcp_client_dial_with_options( + state->control_server_addr, + state->control_relay_via, + state->control_peer_id, + state->control_bind_ip, + state->control_bind_device, + &options, + NULL, + NULL, + NULL, + KCP_DEFAULT_STATS_INTERVAL_MS + ); + if (client == NULL) { + control_bridge_set_errno_error(&state->control_stats, "failed to connect control session"); + sleep(1); + continue; + } + + pthread_mutex_lock(&state->control_stats.mutex); + state->control_stats.connected = 1; + state->control_stats.last_error[0] = '\0'; + pthread_mutex_unlock(&state->control_stats.mutex); + + while (!*state->stop_requested) { + message_t msg; + int rc; + + protocol_message_init(&msg); + rc = kcp_client_receive_timed(client, &msg, 100); + if (rc == 1) { + protocol_message_clear(&msg); + continue; + } + if (rc != 0) { + control_bridge_set_errno_error(&state->control_stats, "control receive loop stopped"); + protocol_message_clear(&msg); + break; + } + + if (state->control_expected_sender[0] != '\0' && strcmp(msg.from, state->control_expected_sender) != 0) { + pthread_mutex_lock(&state->control_stats.mutex); + state->control_stats.invalid_packets += 1; + pthread_mutex_unlock(&state->control_stats.mutex); + protocol_message_clear(&msg); + continue; + } + + if (msg.type != MSG_TYPE_BINARY || msg.body_len != OMNI_CONTROL_PACKET_SIZE) { + pthread_mutex_lock(&state->control_stats.mutex); + state->control_stats.invalid_packets += 1; + pthread_mutex_unlock(&state->control_stats.mutex); + protocol_message_clear(&msg); + continue; + } + + if (unix_dgram_client_send(&state->unix_client, msg.body, msg.body_len) != 0) { + pthread_mutex_lock(&state->control_stats.mutex); + state->control_stats.unix_send_errors += 1; + pthread_mutex_unlock(&state->control_stats.mutex); + control_bridge_set_errno_error(&state->control_stats, "failed to forward command to unix socket"); + protocol_message_clear(&msg); + continue; + } + + pthread_mutex_lock(&state->control_stats.mutex); + state->control_stats.packets_forwarded += 1; + kcp_client_runtime_stats_snapshot(client, &state->control_stats.transport); + pthread_mutex_unlock(&state->control_stats.mutex); + protocol_message_clear(&msg); + } + + pthread_mutex_lock(&state->control_stats.mutex); + state->control_stats.connected = 0; + pthread_mutex_unlock(&state->control_stats.mutex); + kcp_client_close(client); + kcp_client_free(client); + if (!*state->stop_requested) { + sleep(1); + } + } + + return NULL; +} + +static void print_stats(daemon_state_t *state) { + video_pipeline_stats_t video_stats; + control_bridge_stats_t control_stats; + + memset(&video_stats, 0, sizeof(video_stats)); + memset(&control_stats, 0, sizeof(control_stats)); + video_pipeline_stats_snapshot(&state->video_stats, &video_stats); + control_bridge_stats_snapshot(&state->control_stats, &control_stats); + + fprintf( + stderr, + "[b_side_omnid] video connected=%d frames=%llu bytes=%llu srtt=%dms | control connected=%d forwarded=%llu invalid=%llu unix_err=%llu srtt=%dms\n", + video_stats.connected, + (unsigned long long) video_stats.frames_sent, + (unsigned long long) video_stats.bytes_sent, + video_stats.transport.srtt_ms, + control_stats.connected, + (unsigned long long) control_stats.packets_forwarded, + (unsigned long long) control_stats.invalid_packets, + (unsigned long long) control_stats.unix_send_errors, + control_stats.transport.srtt_ms + ); +} + +int main(void) { + daemon_state_t state; + pthread_t video_thread; + pthread_t control_thread; + + memset(&state, 0, sizeof(state)); + state.stop_requested = &g_stop_requested; + + video_pipeline_config_init(&state.video_config); + video_pipeline_config_load_env(&state.video_config); + state.control_server_addr = env_first_nonempty("OMNI_CONTROL_SERVER_ADDR", "OMNISOCKET_SERVER_ADDR", ""); + state.control_relay_via = env_first_nonempty("OMNI_CONTROL_RELAY_VIA", "OMNISOCKET_RELAY_VIA", ""); + state.control_bind_ip = env_first_nonempty("OMNI_CONTROL_BIND_IP", "OMNISOCKET_BIND_IP", ""); + state.control_bind_device = env_first_nonempty("OMNI_CONTROL_BIND_DEVICE", "OMNISOCKET_BIND_DEVICE", ""); + state.control_peer_id = env_or_default("OMNI_CONTROL_PEER_ID", CONTROL_DEFAULT_PEER_ID); + state.control_expected_sender = env_or_default("OMNI_CONTROL_EXPECTED_SENDER", CONTROL_DEFAULT_EXPECTED_SENDER); + state.control_unix_socket = env_or_default("OMNI_CONTROL_UNIX_SOCKET_PATH", CONTROL_DEFAULT_UNIX_SOCKET); + + if (state.video_config.server_addr == NULL || state.video_config.server_addr[0] == '\0' || + state.control_server_addr == NULL || state.control_server_addr[0] == '\0') { + fprintf(stderr, "OMNISOCKET_SERVER_ADDR (or session-specific overrides) is required\n"); + return 1; + } + + if (video_pipeline_stats_init(&state.video_stats) != 0) { + perror("video_pipeline_stats_init"); + return 1; + } + if (control_bridge_stats_init(&state.control_stats) != 0) { + perror("control_bridge_stats_init"); + video_pipeline_stats_destroy(&state.video_stats); + return 1; + } + if (unix_dgram_client_init(&state.unix_client, state.control_unix_socket) != 0) { + perror("unix_dgram_client_init"); + control_bridge_stats_destroy(&state.control_stats); + video_pipeline_stats_destroy(&state.video_stats); + return 1; + } + + fprintf( + stderr, + "[b_side_omnid] control forwarding target is unix_dgram://%s\n", + state.control_unix_socket + ); + + if (install_signal_handler(SIGINT) != 0 || install_signal_handler(SIGTERM) != 0) { + perror("install_signal_handler"); + unix_dgram_client_close(&state.unix_client); + control_bridge_stats_destroy(&state.control_stats); + video_pipeline_stats_destroy(&state.video_stats); + return 1; + } + + if (pthread_create(&video_thread, NULL, video_thread_main, &state) != 0) { + perror("pthread_create(video_thread)"); + unix_dgram_client_close(&state.unix_client); + control_bridge_stats_destroy(&state.control_stats); + video_pipeline_stats_destroy(&state.video_stats); + return 1; + } + if (pthread_create(&control_thread, NULL, control_thread_main, &state) != 0) { + perror("pthread_create(control_thread)"); + g_stop_requested = 1; + pthread_join(video_thread, NULL); + unix_dgram_client_close(&state.unix_client); + control_bridge_stats_destroy(&state.control_stats); + video_pipeline_stats_destroy(&state.video_stats); + return 1; + } + + while (!g_stop_requested) { + sleep(1); + print_stats(&state); + } + + pthread_join(video_thread, NULL); + pthread_join(control_thread, NULL); + unix_dgram_client_close(&state.unix_client); + control_bridge_stats_destroy(&state.control_stats); + video_pipeline_stats_destroy(&state.video_stats); + return 0; +} diff --git a/cmd/v1_camera_pipeline_ifdef.c b/cmd/v1_camera_pipeline_ifdef.c index a151dfc..bd32de6 100644 --- a/cmd/v1_camera_pipeline_ifdef.c +++ b/cmd/v1_camera_pipeline_ifdef.c @@ -1,691 +1,28 @@ -// camera_pipeline_ifdef_fixed.c #include #include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -// FFmpeg头文件 - 使用纯C包含 -#include -#include -#include -#include -#include -#include +#include "video_pipeline.h" -#include "peer_kcp_client.h" +int main(void) { + video_pipeline_config_t config; + video_pipeline_stats_t stats; -// ========================================== -// 1. 配置区域:在这里开启或关闭时间打印 -// ========================================== -#define DEBUG_TIMING // 注释掉这一行,所有时间打印和计算都会消失 - -// 定义打印宏 -#ifdef DEBUG_TIMING -#define PRINT_TIME(fmt, ...) printf(fmt, ##__VA_ARGS__) -#else -#define PRINT_TIME(fmt, ...) // 什么都不做,编译器会优化掉 -#endif - -#define WIDTH 1280 -#define HEIGHT 720 -#define OUTPUT_WIDTH 640 -#define OUTPUT_HEIGHT 360 -#define NUM_BUFFERS 4 -#define CLEAR(x) memset(&(x), 0, sizeof(x)) - -#define VIDEO_SERVER_ADDR_ENV "OMNI_VIDEO_SERVER_ADDR" -#define VIDEO_RELAY_ADDR_ENV "OMNI_VIDEO_RELAY_VIA" -#define VIDEO_BIND_IP_ENV "OMNI_VIDEO_BIND_IP" -#define VIDEO_BIND_DEVICE_ENV "OMNI_VIDEO_BIND_DEVICE" -#define VIDEO_PEER_ID_ENV "OMNI_VIDEO_PEER_ID" -#define VIDEO_TARGET_PEER_ENV "OMNI_VIDEO_TARGET_PEER" -#define VIDEO_DEFAULT_PEER_ID "peer-b-video" -#define VIDEO_DEFAULT_TARGET_PEER "peer-a-video" - -typedef struct -{ - kcp_client_t *client; - char target_peer[OMNI_MAX_PEER_ID]; -} VideoSender; - -static int video_sender_init(VideoSender *sender); -static int video_sender_send_packet(VideoSender *sender, const AVPacket *encoded_pkt, uint64_t timestamp); -static void video_sender_close(VideoSender *sender); - -typedef struct -{ - void *start; - size_t length; -} Buffer; - -double get_time_ms() -{ - struct timespec ts; - clock_gettime(CLOCK_MONOTONIC, &ts); - return ts.tv_sec * 1000.0 + ts.tv_nsec / 1000000.0; -} - -int64_t get_realtime_ms() -{ - struct timespec ts; - clock_gettime(CLOCK_REALTIME, &ts); - return (int64_t)ts.tv_sec * 1000 + ts.tv_nsec / 1000000; -} - -static const char *env_or_default(const char *name, const char *fallback) -{ - const char *value = getenv(name); - - if (value != NULL && value[0] != '\0') - return value; - return fallback; -} - -static int video_sender_init(VideoSender *sender) -{ - const char *server_addr = getenv(VIDEO_SERVER_ADDR_ENV); - const char *relay_addr = env_or_default(VIDEO_RELAY_ADDR_ENV, ""); - const char *bind_ip = env_or_default(VIDEO_BIND_IP_ENV, ""); - const char *bind_device = env_or_default(VIDEO_BIND_DEVICE_ENV, ""); - const char *peer_id = env_or_default(VIDEO_PEER_ID_ENV, VIDEO_DEFAULT_PEER_ID); - const char *target_peer = env_or_default(VIDEO_TARGET_PEER_ENV, VIDEO_DEFAULT_TARGET_PEER); - kcp_conn_options_t options; - - if (sender == NULL) - { - errno = EINVAL; - return -1; + video_pipeline_config_init(&config); + video_pipeline_config_load_env(&config); + if (getenv("OMNI_VIDEO_DEBUG_TIMING") == NULL) { + config.enable_timing_logs = 1; } - if (server_addr == NULL || server_addr[0] == '\0') - { - errno = EINVAL; - fprintf(stderr, "%s is required\n", VIDEO_SERVER_ADDR_ENV); - return -1; + if (video_pipeline_stats_init(&stats) != 0) { + perror("video_pipeline_stats_init"); + return 1; } - memset(sender, 0, sizeof(*sender)); - snprintf(sender->target_peer, sizeof(sender->target_peer), "%s", target_peer); - - kcp_conn_options_set_video_defaults(&options); - sender->client = kcp_client_dial_with_options( - server_addr, - relay_addr, - peer_id, - bind_ip, - bind_device, - &options, - NULL, - NULL, - NULL, - KCP_DEFAULT_STATS_INTERVAL_MS); - if (sender->client == NULL) - return -1; - - fprintf(stderr, "Video sender connected as %s -> %s\n", - kcp_client_id(sender->client), sender->target_peer); + if (video_pipeline_run(&config, &stats, NULL) != 0) { + perror("video_pipeline_run"); + video_pipeline_stats_destroy(&stats); + return 1; + } + video_pipeline_stats_destroy(&stats); return 0; } - -static int video_sender_send_packet(VideoSender *sender, const AVPacket *encoded_pkt, uint64_t timestamp) -{ - uint8_t *senddata; - size_t payload_len; - int rc; - - if (sender == NULL || sender->client == NULL || encoded_pkt == NULL) - { - errno = EINVAL; - return -1; - } - if (encoded_pkt->size < 0) - { - errno = EINVAL; - return -1; - } - - payload_len = (size_t)encoded_pkt->size + sizeof(timestamp); - senddata = (uint8_t *)malloc(payload_len); - if (senddata == NULL) - { - errno = ENOMEM; - return -1; - } - - memcpy(senddata, encoded_pkt->data, (size_t)encoded_pkt->size); - memcpy(senddata + encoded_pkt->size, ×tamp, sizeof(timestamp)); - - rc = kcp_client_send_binary( - sender->client, - sender->target_peer, - senddata, - payload_len); - free(senddata); - return rc; -} - -static void video_sender_close(VideoSender *sender) -{ - if (sender == NULL || sender->client == NULL) - return; - - kcp_client_close(sender->client); - kcp_client_free(sender->client); - sender->client = NULL; -} - -int open_v4l2_device(const char *device) -{ - int fd = open(device, O_RDWR | O_NONBLOCK); - if (fd < 0) - { - perror("open device"); - return -1; - } - return fd; -} - -int init_v4l2_device(int fd) -{ - struct v4l2_format fmt = {0}; - fmt.type = V4L2_BUF_TYPE_VIDEO_CAPTURE; - fmt.fmt.pix.width = WIDTH; - fmt.fmt.pix.height = HEIGHT; - fmt.fmt.pix.pixelformat = V4L2_PIX_FMT_MJPEG; - fmt.fmt.pix.field = V4L2_FIELD_NONE; - - if (ioctl(fd, VIDIOC_S_FMT, &fmt) < 0) - { - perror("VIDIOC_S_FMT"); - return -1; - } - - PRINT_TIME("Set format: %dx%d MJPEG\n", WIDTH, HEIGHT); - return 0; -} - -int init_mmap(int fd, Buffer **buffers, int *num_buffers) -{ - struct v4l2_requestbuffers req = {0}; - req.count = NUM_BUFFERS; - req.type = V4L2_BUF_TYPE_VIDEO_CAPTURE; - req.memory = V4L2_MEMORY_MMAP; - - if (ioctl(fd, VIDIOC_REQBUFS, &req) < 0) - { - perror("VIDIOC_REQBUFS"); - return -1; - } - - *num_buffers = req.count; - *buffers = (Buffer *)calloc(req.count, sizeof(Buffer)); - - for (int i = 0; i < req.count; i++) - { - struct v4l2_buffer buf = {0}; - buf.type = V4L2_BUF_TYPE_VIDEO_CAPTURE; - buf.memory = V4L2_MEMORY_MMAP; - buf.index = i; - - if (ioctl(fd, VIDIOC_QUERYBUF, &buf) < 0) - { - perror("VIDIOC_QUERYBUF"); - return -1; - } - - (*buffers)[i].length = buf.length; - (*buffers)[i].start = mmap(NULL, buf.length, - PROT_READ | PROT_WRITE, - MAP_SHARED, fd, buf.m.offset); - - if ((*buffers)[i].start == MAP_FAILED) - { - perror("mmap"); - return -1; - } - } - - return 0; -} - -AVCodecContext *create_mjpeg_decoder() -{ - const AVCodec *decoder = avcodec_find_decoder(AV_CODEC_ID_MJPEG); - if (!decoder) - { - fprintf(stderr, "MJPEG decoder not found\n"); - return NULL; - } - - AVCodecContext *ctx = avcodec_alloc_context3(decoder); - ctx->width = WIDTH; - ctx->height = HEIGHT; - ctx->pix_fmt = AV_PIX_FMT_YUVJ420P; // Use YUVJ420P for MJPEG compatibility. - ctx->color_range = AVCOL_RANGE_JPEG; // JPEG范围 - ctx->thread_count = 1; - - AVDictionary *opts = NULL; - av_dict_set(&opts, "flags2", "+fast", 0); - - if (avcodec_open2(ctx, decoder, &opts) < 0) - { - avcodec_free_context(&ctx); - av_dict_free(&opts); - return NULL; - } - - av_dict_free(&opts); - printf("Decoder created with format: YUVJ420P\n"); - return ctx; -} - -AVCodecContext *create_mjpeg_encoder() -{ - const AVCodec *encoder = avcodec_find_encoder(AV_CODEC_ID_MJPEG); - if (!encoder) - { - fprintf(stderr, "MJPEG encoder not found\n"); - return NULL; - } - - AVCodecContext *ctx = avcodec_alloc_context3(encoder); - ctx->width = OUTPUT_WIDTH; - ctx->height = OUTPUT_HEIGHT; - ctx->pix_fmt = AV_PIX_FMT_YUVJ420P; // Use YUVJ420P for MJPEG compatibility. - ctx->time_base = (AVRational){1, 30}; - ctx->qmin = 8; - ctx->qmax = 31; - ctx->flags |= AV_CODEC_FLAG_QSCALE; - ctx->global_quality = FF_QP2LAMBDA * 5; - - AVDictionary *opts = NULL; - av_dict_set(&opts, "huffman", "default", 0); - - if (avcodec_open2(ctx, encoder, &opts) < 0) - { - avcodec_free_context(&ctx); - av_dict_free(&opts); - return NULL; - } - - av_dict_free(&opts); - printf("Encoder created with format: YUVJ420P\n"); - return ctx; -} - -int decode_mjpeg_frame(AVCodecContext *decoder, const uint8_t *data, int size, AVFrame **frame) -{ - if (frame == NULL) - return -1; - - *frame = NULL; - AVPacket *pkt = av_packet_alloc(); - if (!pkt) - return -1; - - pkt->data = (uint8_t *)data; - pkt->size = size; - - int ret = avcodec_send_packet(decoder, pkt); - if (ret < 0) - { - av_packet_free(&pkt); - return -1; - } - - *frame = av_frame_alloc(); - if (!*frame) - { - av_packet_free(&pkt); - return -1; - } - ret = avcodec_receive_frame(decoder, *frame); - - av_packet_free(&pkt); - if (ret < 0) - av_frame_free(frame); - return ret; -} - -int scale_frame(AVFrame *src, AVFrame **dst) -{ - // 简单缩放,不设置复杂的色彩空间参数 - struct SwsContext *sws_ctx = sws_getContext( - src->width, src->height, src->format, - OUTPUT_WIDTH, OUTPUT_HEIGHT, AV_PIX_FMT_YUVJ420P, - SWS_BILINEAR, NULL, NULL, NULL); - - if (!sws_ctx) - { - fprintf(stderr, "Failed to create sws context\n"); - return -1; - } - - *dst = av_frame_alloc(); - if (!*dst) - { - sws_freeContext(sws_ctx); - return -1; - } - - (*dst)->width = OUTPUT_WIDTH; - (*dst)->height = OUTPUT_HEIGHT; - (*dst)->format = AV_PIX_FMT_YUVJ420P; - - if (av_frame_get_buffer(*dst, 0) < 0) - { - fprintf(stderr, "Failed to allocate frame buffer\n"); - av_frame_free(dst); - sws_freeContext(sws_ctx); - return -1; - } - - int ret = sws_scale(sws_ctx, (const uint8_t *const *)src->data, src->linesize, - 0, src->height, (*dst)->data, (*dst)->linesize); - - sws_freeContext(sws_ctx); - - if (ret < 0) - { - fprintf(stderr, "sws_scale failed\n"); - av_frame_free(dst); - return -1; - } - - return 0; -} - -int encode_frame(AVCodecContext *encoder, AVFrame *frame, AVPacket **pkt) -{ - if (pkt == NULL) - return -1; - - *pkt = NULL; - *pkt = av_packet_alloc(); - if (!*pkt) - return -1; - - int ret = avcodec_send_frame(encoder, frame); - if (ret < 0) - { - av_packet_free(pkt); - return -1; - } - - ret = avcodec_receive_packet(encoder, *pkt); - if (ret < 0) - av_packet_free(pkt); - return ret; -} - -int main() -{ - VideoSender sender; - - memset(&sender, 0, sizeof(sender)); - -#ifdef QUIET_FFMPEG_LOGS - av_log_set_level(AV_LOG_ERROR); -#endif - - PRINT_TIME("=== V4L2 Direct Capture + FFmpeg Processing ===\n"); - - // 1. Open V4L2 device - int fd = open_v4l2_device("/dev/video0"); - if (fd < 0) - { - fprintf(stderr, "Failed to open camera device\n"); - return 1; - } - - // 2. Initialize V4L2 - if (init_v4l2_device(fd) < 0) - { - close(fd); - return 1; - } - - // 3. Setup MMAP buffers - Buffer *buffers = NULL; - int num_buffers = 0; - if (init_mmap(fd, &buffers, &num_buffers) < 0) - { - close(fd); - return 1; - } - - // 4. Create FFmpeg codecs - AVCodecContext *decoder = create_mjpeg_decoder(); - AVCodecContext *encoder = create_mjpeg_encoder(); - if (!decoder || !encoder) - { - fprintf(stderr, "Failed to create codecs\n"); - close(fd); - return 1; - } - - if (video_sender_init(&sender) < 0) - { - perror("video_sender_init"); - video_sender_close(&sender); - avcodec_free_context(&encoder); - avcodec_free_context(&decoder); - close(fd); - return 1; - } - - // 5. Queue buffers - for (int i = 0; i < num_buffers; i++) - { - struct v4l2_buffer buf = {0}; - buf.type = V4L2_BUF_TYPE_VIDEO_CAPTURE; - buf.memory = V4L2_MEMORY_MMAP; - buf.index = i; - - if (ioctl(fd, VIDIOC_QBUF, &buf) < 0) - { - perror("VIDIOC_QBUF"); - close(fd); - return 1; - } - } - - // 6. Start streaming - enum v4l2_buf_type type = V4L2_BUF_TYPE_VIDEO_CAPTURE; - if (ioctl(fd, VIDIOC_STREAMON, &type) < 0) - { - perror("VIDIOC_STREAMON"); - close(fd); - return 1; - } - - // 7. Benchmark - // 使用宏控制打印表头 - PRINT_TIME("\nRunning benchmark (100 frames)...\n"); - PRINT_TIME("Frame | Capture | Decode | Scale | Encode | Total | Size | Marker\n"); - PRINT_TIME("------|---------|--------|-------|--------|-------|------|--------\n"); - - for (int i = 0; i < 10000; i++) - { -// 只有在开启 DEBUG_TIMING 时才声明这些时间变量 -#ifdef DEBUG_TIMING - double total_start = get_time_ms(); - double capture_start, capture_end; - double decode_start, decode_end; - double scale_start, scale_end; - double encode_start, encode_end; -#endif - - // Wait for frame - fd_set fds; - FD_ZERO(&fds); - FD_SET(fd, &fds); - - struct timeval tv = {2, 0}; - int r = select(fd + 1, &fds, NULL, NULL, &tv); - if (r <= 0) - { - PRINT_TIME("Timeout waiting for frame\n"); - break; - } - -#ifdef DEBUG_TIMING - capture_start = get_time_ms(); -#endif - - // Dequeue buffer - struct v4l2_buffer buf = {0}; - buf.type = V4L2_BUF_TYPE_VIDEO_CAPTURE; - buf.memory = V4L2_MEMORY_MMAP; - - if (ioctl(fd, VIDIOC_DQBUF, &buf) < 0) - { - perror("VIDIOC_DQBUF"); - break; - } - -#ifdef DEBUG_TIMING - capture_end = get_time_ms(); -#endif - -// Decode -#ifdef DEBUG_TIMING - decode_start = get_time_ms(); -#endif - - AVFrame *decoded_frame = NULL; - int ret = decode_mjpeg_frame(decoder, - (uint8_t *)buffers[buf.index].start, buf.bytesused, &decoded_frame); - -#ifdef DEBUG_TIMING - decode_end = get_time_ms(); -#endif - - if (ret < 0 || !decoded_frame) - { - PRINT_TIME("Frame %d: Decode failed\n", i + 1); - ioctl(fd, VIDIOC_QBUF, &buf); - continue; - } - -// Scale -#ifdef DEBUG_TIMING - scale_start = get_time_ms(); -#endif - - AVFrame *scaled_frame = NULL; - if (scale_frame(decoded_frame, &scaled_frame) < 0) - { - PRINT_TIME("Frame %d: Scale failed\n", i + 1); - av_frame_free(&decoded_frame); - ioctl(fd, VIDIOC_QBUF, &buf); - continue; - } - -#ifdef DEBUG_TIMING - scale_end = get_time_ms(); -#endif - -// Encode -#ifdef DEBUG_TIMING - encode_start = get_time_ms(); -#endif - - AVPacket *encoded_pkt = NULL; - if (encode_frame(encoder, scaled_frame, &encoded_pkt) < 0) - { - PRINT_TIME("Frame %d: Encode failed\n", i + 1); - } -#ifdef DEBUG_TIMING - if (encoded_pkt && i % 50 == 0) - { - char filename[100]; - sprintf(filename, "frame_%04d.jpg", i + 1); - - FILE *f = fopen(filename, "wb"); - if (f) - { - fwrite(encoded_pkt->data, 1, encoded_pkt->size, f); - fclose(f); - PRINT_TIME("Saved as %s\n", filename); - } - } -#endif - -#ifdef DEBUG_TIMING - encode_end = get_time_ms(); - double total_end = get_time_ms(); -#endif - -// 打印结果 -#ifdef DEBUG_TIMING - PRINT_TIME("%5d | %7.1f | %6.1f | %5.1f | %6.1f | %5.1f | %4d KB | 0x%02x\n", - i + 1, - capture_end - capture_start, - decode_end - decode_start, - scale_end - scale_start, - encode_end - encode_start, - total_end - total_start, - encoded_pkt ? encoded_pkt->size / 1024 : 0, - encoded_pkt && encoded_pkt->size > 1 ? encoded_pkt->data[1] : 0); -#else - // 如果不开启宏,也打印一些基本信息 - printf("Frame %d processed\n", i + 1); -#endif - int64_t unixtime_ms; - if (encoded_pkt) - { - unixtime_ms = get_realtime_ms(); - } - if (encoded_pkt && video_sender_send_packet(&sender, encoded_pkt, unixtime_ms) != 0) - { - perror("video_sender_send_packet"); - av_frame_free(&decoded_frame); - av_frame_free(&scaled_frame); - av_packet_free(&encoded_pkt); - ioctl(fd, VIDIOC_QBUF, &buf); - break; - } - - // Cleanup - av_frame_free(&decoded_frame); - av_frame_free(&scaled_frame); - if (encoded_pkt) - av_packet_free(&encoded_pkt); - - // Requeue buffer - if (ioctl(fd, VIDIOC_QBUF, &buf) < 0) - { - perror("VIDIOC_QBUF"); - break; - } - } - - // 8. Stop streaming - ioctl(fd, VIDIOC_STREAMOFF, &type); - - // 9. Cleanup - for (int i = 0; i < num_buffers; i++) - { - if (buffers[i].start != MAP_FAILED) - { - munmap(buffers[i].start, buffers[i].length); - } - } - free(buffers); - video_sender_close(&sender); - avcodec_free_context(&encoder); - avcodec_free_context(&decoder); - close(fd); - - return 0; -} -// gcc -o v1_camera_pipeline_ifdef v1_camera_pipeline_ifdef.c $(pkg-config --cflags --libs libavformat libavcodec libavutil libswscale) diff --git a/include/control_protocol.h b/include/control_protocol.h new file mode 100644 index 0000000..c589f1e --- /dev/null +++ b/include/control_protocol.h @@ -0,0 +1,7 @@ +#ifndef OMNI_CONTROL_PROTOCOL_H +#define OMNI_CONTROL_PROTOCOL_H + +#define OMNI_CONTROL_PACKET_FLOATS 6 +#define OMNI_CONTROL_PACKET_SIZE (OMNI_CONTROL_PACKET_FLOATS * sizeof(float)) + +#endif diff --git a/include/peer_kcp_client.h b/include/peer_kcp_client.h index d61111a..e492941 100644 --- a/include/peer_kcp_client.h +++ b/include/peer_kcp_client.h @@ -27,6 +27,7 @@ int kcp_client_receive_timed(kcp_client_t *client, message_t *out_msg, int timeo int kcp_client_receive(kcp_client_t *client, message_t *out_msg); int kcp_client_receive_binary_into(kcp_client_t *client, void *buffer, size_t buffer_len, kcp_client_recv_meta_t *out_meta, int timeout_ms); int kcp_client_persist_message(kcp_client_t *client, const message_t *msg, const char *inbox_dir, char *out_path, size_t out_path_len); +void kcp_client_runtime_stats_snapshot(kcp_client_t *client, kcp_runtime_stats_t *out_stats); int kcp_client_close(kcp_client_t *client); void kcp_client_free(kcp_client_t *client); diff --git a/include/transport_kcp.h b/include/transport_kcp.h index d7c3c95..874169e 100644 --- a/include/transport_kcp.h +++ b/include/transport_kcp.h @@ -43,6 +43,17 @@ extern "C" { typedef struct kcp_conn kcp_conn_t; typedef struct kcp_listener kcp_listener_t; +typedef struct kcp_runtime_stats { + int connected; + uint32_t conv; + uint32_t rto_ms; + int32_t srtt_ms; + int32_t srttvar_ms; + uint32_t snd_queue; + uint32_t rcv_queue; + uint32_t snd_buffer; + uint32_t xmit_total; +} kcp_runtime_stats_t; typedef struct kcp_conn_options { int nodelay; int interval_ms; @@ -68,6 +79,7 @@ int kcp_conn_close(kcp_conn_t *conn); void kcp_conn_free(kcp_conn_t *conn); uint32_t kcp_conn_conv(const kcp_conn_t *conn); int kcp_conn_local_addr(const kcp_conn_t *conn, struct sockaddr_storage *addr, socklen_t *addr_len); +void kcp_conn_runtime_stats_snapshot(kcp_conn_t *conn, kcp_runtime_stats_t *out_stats); kcp_listener_t *kcp_listener_listen(const char *listen_addr, const char *bind_device, kcp_packet_debug_logger_t *packet_logger, const char *node_role, const char *node_id); kcp_conn_t *kcp_listener_accept(kcp_listener_t *listener); diff --git a/include/video_pipeline.h b/include/video_pipeline.h new file mode 100644 index 0000000..996d906 --- /dev/null +++ b/include/video_pipeline.h @@ -0,0 +1,52 @@ +#ifndef OMNI_VIDEO_PIPELINE_H +#define OMNI_VIDEO_PIPELINE_H + +#include +#include +#include + +#include "peer_kcp_client.h" + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct video_pipeline_config { + const char *camera_device; + const char *server_addr; + const char *relay_via; + const char *bind_ip; + const char *bind_device; + const char *peer_id; + const char *target_peer; + int capture_width; + int capture_height; + int output_width; + int output_height; + int max_frames; + int enable_timing_logs; +} video_pipeline_config_t; + +typedef struct video_pipeline_stats { + pthread_mutex_t mutex; + uint64_t frames_sent; + uint64_t bytes_sent; + uint64_t send_errors; + uint64_t last_frame_bytes; + int connected; + char last_error[256]; + kcp_runtime_stats_t transport; +} video_pipeline_stats_t; + +void video_pipeline_config_init(video_pipeline_config_t *config); +void video_pipeline_config_load_env(video_pipeline_config_t *config); +int video_pipeline_stats_init(video_pipeline_stats_t *stats); +void video_pipeline_stats_destroy(video_pipeline_stats_t *stats); +void video_pipeline_stats_snapshot(video_pipeline_stats_t *stats, video_pipeline_stats_t *out_stats); +int video_pipeline_run(const video_pipeline_config_t *config, video_pipeline_stats_t *stats, volatile sig_atomic_t *stop_requested); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/python/omnisocket/_omnisocket.c b/python/omnisocket/_omnisocket.c index 5536688..0739ec7 100644 --- a/python/omnisocket/_omnisocket.c +++ b/python/omnisocket/_omnisocket.c @@ -87,6 +87,30 @@ static PyObject *build_stats_dict(const omnisocket_session_stats_t *stats) { ); } +static PyObject *build_kcp_stats_dict(const omnisocket_session_kcp_stats_t *stats) { + return Py_BuildValue( + "{s:i,s:I,s:I,s:i,s:i,s:I,s:I,s:I,s:I}", + "connected", + stats->connected, + "conv", + stats->conv, + "rto_ms", + stats->rto_ms, + "srtt_ms", + stats->srtt_ms, + "srttvar_ms", + stats->srttvar_ms, + "snd_queue", + stats->snd_queue, + "rcv_queue", + stats->rcv_queue, + "snd_buffer", + stats->snd_buffer, + "xmit_total", + stats->xmit_total + ); +} + static PyObject *PyOmniSession_new(PyTypeObject *type, PyObject *args, PyObject *kwargs) { PyOmniSession *self; (void) args; @@ -314,6 +338,14 @@ static PyObject *PyOmniSession_stats(PyOmniSession *self, PyObject *Py_UNUSED(ig return build_stats_dict(&stats); } +static PyObject *PyOmniSession_kcp_stats(PyOmniSession *self, PyObject *Py_UNUSED(ignored)) { + omnisocket_session_kcp_stats_t stats; + + memset(&stats, 0, sizeof(stats)); + omnisocket_session_kcp_stats_snapshot(&self->session, &stats); + return build_kcp_stats_dict(&stats); +} + static PyMethodDef PyOmniSession_methods[] = { {"connect", (PyCFunction) PyOmniSession_connect, METH_VARARGS | METH_KEYWORDS, NULL}, {"close", (PyCFunction) PyOmniSession_close, METH_NOARGS, NULL}, @@ -321,6 +353,7 @@ static PyMethodDef PyOmniSession_methods[] = { {"recv", (PyCFunction) PyOmniSession_recv, METH_VARARGS | METH_KEYWORDS, PyOmniSession_recv_doc}, {"recv_into", (PyCFunction) PyOmniSession_recv_into, METH_VARARGS | METH_KEYWORDS, PyOmniSession_recv_into_doc}, {"stats", (PyCFunction) PyOmniSession_stats, METH_NOARGS, NULL}, + {"kcp_stats", (PyCFunction) PyOmniSession_kcp_stats, METH_NOARGS, NULL}, {NULL, NULL, 0, NULL} }; diff --git a/python/omnisocket/omnisocket_client.c b/python/omnisocket/omnisocket_client.c index 3166476..2df0b3d 100644 --- a/python/omnisocket/omnisocket_client.c +++ b/python/omnisocket/omnisocket_client.c @@ -247,6 +247,32 @@ void omnisocket_session_stats_snapshot(omnisocket_session_t *session, omnisocket pthread_mutex_unlock(&session->mutex); } +void omnisocket_session_kcp_stats_snapshot(omnisocket_session_t *session, omnisocket_session_kcp_stats_t *out_stats) { + kcp_runtime_stats_t runtime_stats; + + if (session == NULL || out_stats == NULL) { + return; + } + + memset(&runtime_stats, 0, sizeof(runtime_stats)); + pthread_mutex_lock(&session->mutex); + if (session->client != NULL) { + kcp_client_runtime_stats_snapshot(session->client, &runtime_stats); + } + pthread_mutex_unlock(&session->mutex); + + memset(out_stats, 0, sizeof(*out_stats)); + out_stats->connected = runtime_stats.connected; + out_stats->conv = runtime_stats.conv; + out_stats->rto_ms = runtime_stats.rto_ms; + out_stats->srtt_ms = runtime_stats.srtt_ms; + out_stats->srttvar_ms = runtime_stats.srttvar_ms; + out_stats->snd_queue = runtime_stats.snd_queue; + out_stats->rcv_queue = runtime_stats.rcv_queue; + out_stats->snd_buffer = runtime_stats.snd_buffer; + out_stats->xmit_total = runtime_stats.xmit_total; +} + int omnisocket_udp_session_init(omnisocket_udp_session_t *session) { int rc; diff --git a/python/omnisocket/omnisocket_client.h b/python/omnisocket/omnisocket_client.h index 2ca5fa4..cc7d011 100644 --- a/python/omnisocket/omnisocket_client.h +++ b/python/omnisocket/omnisocket_client.h @@ -15,6 +15,18 @@ typedef struct omnisocket_session_stats { int connected; } omnisocket_session_stats_t; +typedef struct omnisocket_session_kcp_stats { + int connected; + uint32_t conv; + uint32_t rto_ms; + int32_t srtt_ms; + int32_t srttvar_ms; + uint32_t snd_queue; + uint32_t rcv_queue; + uint32_t snd_buffer; + uint32_t xmit_total; +} omnisocket_session_kcp_stats_t; + typedef struct omnisocket_session { pthread_mutex_t mutex; pthread_cond_t idle_cond; @@ -57,6 +69,7 @@ int omnisocket_session_recv_into( int timeout_ms ); void omnisocket_session_stats_snapshot(omnisocket_session_t *session, omnisocket_session_stats_t *out_stats); +void omnisocket_session_kcp_stats_snapshot(omnisocket_session_t *session, omnisocket_session_kcp_stats_t *out_stats); int omnisocket_udp_session_init(omnisocket_udp_session_t *session); void omnisocket_udp_session_destroy(omnisocket_udp_session_t *session); diff --git a/python/tests/test_sessions.py b/python/tests/test_sessions.py index fc56919..68ff920 100644 --- a/python/tests/test_sessions.py +++ b/python/tests/test_sessions.py @@ -123,6 +123,13 @@ def test_control_sessions_smoke(transport: str, binary_name: str, session_cls) - assert receiver_stats['connected'] == 1 assert sender_stats['send_calls'] >= 2 assert receiver_stats['recv_calls'] >= 2 + if transport == 'kcp': + sender_kcp_stats = sender.kcp_stats() + receiver_kcp_stats = receiver.kcp_stats() + assert sender_kcp_stats['connected'] == 1 + assert receiver_kcp_stats['connected'] == 1 + assert 'srtt_ms' in sender_kcp_stats + assert 'snd_queue' in receiver_kcp_stats finally: sender.close() receiver.close() diff --git a/ros-control-py/README.md b/ros-control-py/README.md index 1e95736..dd6f71d 100644 --- a/ros-control-py/README.md +++ b/ros-control-py/README.md @@ -131,6 +131,17 @@ ros2 launch udp_teleop_bridge robot_udp_receiver.launch.py \ expected_sender:=ros-keyboard-ctrl ``` +Local daemon handoff via Unix datagram: + +```bash +ros2 launch udp_teleop_bridge robot_udp_receiver.launch.py \ + transport:=unix_dgram \ + local_socket_path:=/tmp/omnisocket-b-side-cmd.sock \ + output_topic:=/hric/robot/cmd_vel \ + frame_id:=pelvis \ + watchdog_timeout:=0.5 +``` + ## 控制端键盘运行 终端 A,启动 sender: diff --git a/ros-control-py/ROS2 Teleop over UDP.md b/ros-control-py/ROS2 Teleop over UDP.md index 6f10d33..fe62a27 100644 --- a/ros-control-py/ROS2 Teleop over UDP.md +++ b/ros-control-py/ROS2 Teleop over UDP.md @@ -63,6 +63,12 @@ KCP 只需把 `transport` 和 `server_addr` 改成: transport:=kcp server_addr:=127.0.0.1:9002 ``` +如果控制命令来自本机 `b_side_omnid`,可以改为: + +```bash +transport:=unix_dgram local_socket_path:=/tmp/omnisocket-b-side-cmd.sock +``` + 只接受指定 sender: ```bash diff --git a/ros-control-py/udp_teleop_bridge/launch/robot_udp_receiver.launch.py b/ros-control-py/udp_teleop_bridge/launch/robot_udp_receiver.launch.py index c27acc4..4a9d1e5 100644 --- a/ros-control-py/udp_teleop_bridge/launch/robot_udp_receiver.launch.py +++ b/ros-control-py/udp_teleop_bridge/launch/robot_udp_receiver.launch.py @@ -12,6 +12,7 @@ def generate_launch_description() -> LaunchDescription: DeclareLaunchArgument('relay_via', default_value=''), DeclareLaunchArgument('peer_id', default_value='ros-bridge-ctrl'), DeclareLaunchArgument('expected_sender', default_value=''), + DeclareLaunchArgument('local_socket_path', default_value='/tmp/omnisocket-b-side-cmd.sock'), DeclareLaunchArgument('output_topic', default_value='/hric/robot/cmd_vel'), DeclareLaunchArgument('frame_id', default_value='pelvis'), DeclareLaunchArgument('watchdog_timeout', default_value='0.5'), @@ -27,6 +28,7 @@ def generate_launch_description() -> LaunchDescription: 'relay_via': LaunchConfiguration('relay_via'), 'peer_id': LaunchConfiguration('peer_id'), 'expected_sender': LaunchConfiguration('expected_sender'), + 'local_socket_path': LaunchConfiguration('local_socket_path'), 'output_topic': LaunchConfiguration('output_topic'), 'frame_id': LaunchConfiguration('frame_id'), 'watchdog_timeout': ParameterValue(LaunchConfiguration('watchdog_timeout'), value_type=float), diff --git a/ros-control-py/udp_teleop_bridge/udp_teleop_bridge/udp_cmd_vel_receiver.py b/ros-control-py/udp_teleop_bridge/udp_teleop_bridge/udp_cmd_vel_receiver.py index 5ead458..e620516 100644 --- a/ros-control-py/udp_teleop_bridge/udp_teleop_bridge/udp_cmd_vel_receiver.py +++ b/ros-control-py/udp_teleop_bridge/udp_teleop_bridge/udp_cmd_vel_receiver.py @@ -2,6 +2,8 @@ from __future__ import annotations +import os +import socket import threading import time from typing import Dict, Optional, Tuple @@ -10,7 +12,6 @@ import rclpy from geometry_msgs.msg import TwistStamped from rclpy.node import Node -from .omni_transport import MSG_TYPE_BINARY, MSG_TYPE_ERROR, OmniTransport from .protocol import ( DEFAULT_BRIDGE_PEER_ID, DEFAULT_FRAME_ID, @@ -40,6 +41,7 @@ class UdpCmdVelReceiver(Node): self.declare_parameter('relay_via', '') self.declare_parameter('peer_id', DEFAULT_BRIDGE_PEER_ID) self.declare_parameter('expected_sender', '') + self.declare_parameter('local_socket_path', '/tmp/omnisocket-b-side-cmd.sock') self.declare_parameter('output_topic', DEFAULT_OUTPUT_TOPIC) self.declare_parameter('frame_id', DEFAULT_FRAME_ID) self.declare_parameter('watchdog_timeout', DEFAULT_WATCHDOG_TIMEOUT) @@ -51,12 +53,15 @@ class UdpCmdVelReceiver(Node): self._relay_via = str(self.get_parameter('relay_via').value) self._peer_id = str(self.get_parameter('peer_id').value) self._expected_sender = str(self.get_parameter('expected_sender').value).strip() + self._local_socket_path = str(self.get_parameter('local_socket_path').value).strip() self._output_topic = str(self.get_parameter('output_topic').value) self._frame_id = str(self.get_parameter('frame_id').value) self._watchdog_timeout = float(self.get_parameter('watchdog_timeout').value) self._publish_rate_hz = float(self.get_parameter('publish_rate_hz').value) self._queue_depth = int(self.get_parameter('queue_depth').value) + if self._transport_name not in ('udp', 'kcp', 'unix_dgram'): + raise ValueError("transport must be one of: udp, kcp, unix_dgram") if self._watchdog_timeout <= 0.0: raise ValueError('watchdog_timeout must be > 0') if self._publish_rate_hz <= 0.0: @@ -65,12 +70,23 @@ class UdpCmdVelReceiver(Node): raise ValueError('queue_depth must be > 0') self._publisher = self.create_publisher(TwistStamped, self._output_topic, self._queue_depth) - self._transport = OmniTransport( - transport=self._transport_name, - server_addr=self._server_addr, - relay_via=self._relay_via, - peer_id=self._peer_id, - ) + self._transport = None + self._unix_socket: socket.socket | None = None + self._msg_type_binary = 0 + self._msg_type_error = 0 + if self._transport_name == 'unix_dgram': + self._setup_unix_socket() + else: + from .omni_transport import MSG_TYPE_BINARY, MSG_TYPE_ERROR, OmniTransport + + self._msg_type_binary = MSG_TYPE_BINARY + self._msg_type_error = MSG_TYPE_ERROR + self._transport = OmniTransport( + transport=self._transport_name, + server_addr=self._server_addr, + relay_via=self._relay_via, + peer_id=self._peer_id, + ) self._lock = threading.Lock() self._last_log_times: Dict[str, float] = {} @@ -82,22 +98,58 @@ class UdpCmdVelReceiver(Node): self.create_timer(1.0 / self._publish_rate_hz, self._publish_tick) - self._recv_thread = threading.Thread(target=self._recv_loop, daemon=True) + recv_target = self._recv_loop_unix_dgram if self._transport_name == 'unix_dgram' else self._recv_loop + self._recv_thread = threading.Thread(target=recv_target, daemon=True) self._recv_thread.start() - self.get_logger().info( - 'Receiving teleop commands via %s://%s as %s and publishing TwistStamped to %s ' - 'at %.1f Hz (frame_id=%s, watchdog %.2f s)' - % ( - self._transport.transport, - self._transport.server_addr, - self._peer_id, - self._output_topic, - self._publish_rate_hz, - self._frame_id, - self._watchdog_timeout, + if self._transport_name == 'unix_dgram': + self.get_logger().info( + 'Receiving teleop commands via unix_dgram://%s and publishing TwistStamped to %s ' + 'at %.1f Hz (frame_id=%s, watchdog %.2f s)' + % ( + self._local_socket_path, + self._output_topic, + self._publish_rate_hz, + self._frame_id, + self._watchdog_timeout, + ) ) - ) + else: + assert self._transport is not None + self.get_logger().info( + 'Receiving teleop commands via %s://%s as %s and publishing TwistStamped to %s ' + 'at %.1f Hz (frame_id=%s, watchdog %.2f s)' + % ( + self._transport.transport, + self._transport.server_addr, + self._peer_id, + self._output_topic, + self._publish_rate_hz, + self._frame_id, + self._watchdog_timeout, + ) + ) + + def _setup_unix_socket(self) -> None: + if not self._local_socket_path: + raise ValueError('local_socket_path must not be empty for unix_dgram transport') + + socket_dir = os.path.dirname(self._local_socket_path) + if socket_dir: + os.makedirs(socket_dir, exist_ok=True) + if os.path.exists(self._local_socket_path): + self.get_logger().warning( + 'Removing existing unix datagram socket path before bind: %s' + % self._local_socket_path + ) + try: + os.unlink(self._local_socket_path) + except FileNotFoundError: + pass + + self._unix_socket = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) + self._unix_socket.bind(self._local_socket_path) + self._unix_socket.settimeout(0.1) def _should_log(self, key: str, throttle_sec: float) -> bool: now = time.monotonic() @@ -128,6 +180,7 @@ class UdpCmdVelReceiver(Node): def _recv_loop(self) -> None: while not self._closing.is_set() and rclpy.ok(): try: + assert self._transport is not None meta = self._transport.recv_into(buffer=self._recv_buffer, timeout_ms=100) except BufferError as exc: if self._should_log('buffer_error', 2.0): @@ -145,7 +198,7 @@ class UdpCmdVelReceiver(Node): msg_type = int(meta['msg_type']) body_len = int(meta['body_len']) - if msg_type == MSG_TYPE_ERROR: + if msg_type == self._msg_type_error: self._handle_error_message(from_peer, body_len) continue @@ -157,7 +210,7 @@ class UdpCmdVelReceiver(Node): ) continue - if msg_type != MSG_TYPE_BINARY: + if msg_type != self._msg_type_binary: if self._should_log('unexpected_type', 2.0): self.get_logger().warning( 'Ignoring unexpected message type %d from %s (%d bytes)' @@ -184,6 +237,38 @@ class UdpCmdVelReceiver(Node): self._latest_command = command self._last_packet_monotonic = time.monotonic() + def _recv_loop_unix_dgram(self) -> None: + assert self._unix_socket is not None + + while not self._closing.is_set() and rclpy.ok(): + try: + payload = self._unix_socket.recv(DEFAULT_RECV_BUFFER_BYTES) + except socket.timeout: + continue + except OSError as exc: + if not self._closing.is_set() and self._should_log('unix_recv_error', 2.0): + self.get_logger().error(f'Unix datagram receive loop stopped: {exc}') + return + + if len(payload) != PACKET_SIZE: + if self._should_log('unix_packet_size', 2.0): + self.get_logger().warning( + 'Dropped unix datagram payload with invalid size %d (expected %d)' + % (len(payload), PACKET_SIZE) + ) + continue + + try: + command = unpack_command(payload) + except ValueError as exc: + if self._should_log('unix_decode_error', 2.0): + self.get_logger().warning(f'Dropped malformed unix datagram payload: {exc}') + continue + + with self._lock: + self._latest_command = command + self._last_packet_monotonic = time.monotonic() + def _command_for_publish_tick(self) -> tuple[CommandTuple, Optional[float], bool]: with self._lock: latest_command = self._latest_command @@ -218,6 +303,17 @@ class UdpCmdVelReceiver(Node): if self._should_log('close_error', 2.0): self.get_logger().warning(f'Closing OmniSocket transport failed: {exc}') self._transport = None + if self._unix_socket is not None: + try: + self._unix_socket.close() + except OSError as exc: + if self._should_log('unix_close_error', 2.0): + self.get_logger().warning(f'Closing unix socket failed: {exc}') + self._unix_socket = None + try: + os.unlink(self._local_socket_path) + except FileNotFoundError: + pass if hasattr(self, '_recv_thread') and self._recv_thread.is_alive(): self._recv_thread.join(timeout=0.5) diff --git a/src/peer_kcp_client.c b/src/peer_kcp_client.c index ec969c0..9db5aa8 100644 --- a/src/peer_kcp_client.c +++ b/src/peer_kcp_client.c @@ -294,6 +294,18 @@ int kcp_client_persist_message(kcp_client_t *client, const message_t *msg, const return 0; } +void kcp_client_runtime_stats_snapshot(kcp_client_t *client, kcp_runtime_stats_t *out_stats) { + if (out_stats == NULL) { + return; + } + + memset(out_stats, 0, sizeof(*out_stats)); + if (client == NULL || client->conn == NULL) { + return; + } + kcp_conn_runtime_stats_snapshot(client->conn, out_stats); +} + int kcp_client_close(kcp_client_t *client) { return client == NULL ? 0 : kcp_conn_close(client->conn); } diff --git a/src/transport_kcp.c b/src/transport_kcp.c index 5c473fb..47bb55e 100644 --- a/src/transport_kcp.c +++ b/src/transport_kcp.c @@ -1788,6 +1788,33 @@ int kcp_conn_local_addr(const kcp_conn_t *conn, struct sockaddr_storage *addr, s return 0; } +void kcp_conn_runtime_stats_snapshot(kcp_conn_t *conn, kcp_runtime_stats_t *out_stats) { + if (out_stats == NULL) { + return; + } + + memset(out_stats, 0, sizeof(*out_stats)); + if (conn == NULL) { + return; + } + + out_stats->connected = atomic_load(&conn->closed) ? 0 : 1; + pthread_mutex_lock(&conn->kcp_mu); + if (conn->kcp != NULL) { + out_stats->conv = conn->kcp->conv; + out_stats->rto_ms = conn->kcp->rx_rto; + out_stats->srtt_ms = conn->kcp->rx_srtt; + out_stats->srttvar_ms = conn->kcp->rx_rttval; + out_stats->snd_queue = conn->kcp->nsnd_que; + out_stats->rcv_queue = conn->kcp->nrcv_que; + out_stats->snd_buffer = conn->kcp->nsnd_buf; + out_stats->xmit_total = conn->kcp->xmit; + } else { + out_stats->connected = 0; + } + pthread_mutex_unlock(&conn->kcp_mu); +} + int kcp_conn_close(kcp_conn_t *conn) { if (conn == NULL) { return 0; diff --git a/src/video_pipeline.c b/src/video_pipeline.c new file mode 100644 index 0000000..18b13c7 --- /dev/null +++ b/src/video_pipeline.c @@ -0,0 +1,883 @@ +#include "video_pipeline.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#define VIDEO_CAPTURE_WIDTH_DEFAULT 1280 +#define VIDEO_CAPTURE_HEIGHT_DEFAULT 720 +#define VIDEO_OUTPUT_WIDTH_DEFAULT 640 +#define VIDEO_OUTPUT_HEIGHT_DEFAULT 360 +#define VIDEO_NUM_BUFFERS 4 +#define VIDEO_DEFAULT_CAMERA_DEVICE "/dev/video0" +#define VIDEO_DEFAULT_PEER_ID "peer-b-video" +#define VIDEO_DEFAULT_TARGET_PEER "peer-a-video" + +typedef struct video_buffer { + void *start; + size_t length; +} video_buffer_t; + +typedef struct video_sender { + kcp_client_t *client; + char target_peer[OMNI_MAX_PEER_ID]; + uint8_t *send_buffer; + size_t send_buffer_cap; +} video_sender_t; + +static int video_pipeline_stop_requested(volatile sig_atomic_t *stop_requested) { + return stop_requested != NULL && *stop_requested != 0; +} + +static int env_flag_or_default(const char *name, int fallback) { + const char *value = getenv(name); + + if (value == NULL || value[0] == '\0') { + return fallback; + } + if ( + strcmp(value, "1") == 0 || strcmp(value, "true") == 0 || strcmp(value, "TRUE") == 0 + || strcmp(value, "yes") == 0 || strcmp(value, "on") == 0 + ) { + return 1; + } + if ( + strcmp(value, "0") == 0 || strcmp(value, "false") == 0 || strcmp(value, "FALSE") == 0 + || strcmp(value, "no") == 0 || strcmp(value, "off") == 0 + ) { + return 0; + } + return fallback; +} + +static double video_pipeline_now_ms(void) { + struct timespec ts; + + clock_gettime(CLOCK_MONOTONIC, &ts); + return ts.tv_sec * 1000.0 + ts.tv_nsec / 1000000.0; +} + +static void video_pipeline_print_timing_header(void) { + fprintf(stderr, "Frame | Capture | Decode | Scale | Encode | Send | Total | Size | Marker\n"); + fprintf(stderr, "------|---------|--------|-------|--------|------|-------|------|--------\n"); +} + +static void video_pipeline_print_timing_failure(int frame_number, const char *stage) { + fprintf(stderr, "Frame %d: %s failed\n", frame_number, stage); +} + +static void video_pipeline_print_timing_row( + int frame_number, + double capture_ms, + double decode_ms, + double scale_ms, + double encode_ms, + double send_ms, + double total_ms, + const AVPacket *encoded_pkt +) { + size_t size_kb = 0; + unsigned int marker = 0; + + if (encoded_pkt != NULL) { + size_kb = (size_t) encoded_pkt->size / 1024; + if (encoded_pkt->size > 1) { + marker = encoded_pkt->data[1]; + } + } + + fprintf( + stderr, + "%5d | %7.1f | %6.1f | %5.1f | %6.1f | %4.1f | %5.1f | %4zu KB | 0x%02x\n", + frame_number, + capture_ms, + decode_ms, + scale_ms, + encode_ms, + send_ms, + total_ms, + size_kb, + marker + ); +} + +static const char *env_or_default(const char *name, const char *fallback) { + const char *value = getenv(name); + if (value != NULL && value[0] != '\0') { + return value; + } + return fallback; +} + +static const char *env_first_nonempty(const char *first, const char *second, const char *fallback) { + const char *value = getenv(first); + if (value != NULL && value[0] != '\0') { + return value; + } + value = getenv(second); + if (value != NULL && value[0] != '\0') { + return value; + } + return fallback; +} + +static void video_pipeline_set_error(video_pipeline_stats_t *stats, const char *message) { + if (stats == NULL) { + return; + } + pthread_mutex_lock(&stats->mutex); + snprintf(stats->last_error, sizeof(stats->last_error), "%s", message == NULL ? "" : message); + pthread_mutex_unlock(&stats->mutex); +} + +static void video_pipeline_set_errno_error(video_pipeline_stats_t *stats, const char *prefix) { + char buffer[256]; + int saved_errno = errno; + + snprintf( + buffer, + sizeof(buffer), + "%s: %s (errno=%d)", + prefix == NULL ? "video pipeline error" : prefix, + saved_errno != 0 ? strerror(saved_errno) : "unknown error", + saved_errno + ); + video_pipeline_set_error(stats, buffer); +} + +void video_pipeline_config_init(video_pipeline_config_t *config) { + if (config == NULL) { + return; + } + memset(config, 0, sizeof(*config)); + config->camera_device = VIDEO_DEFAULT_CAMERA_DEVICE; + config->server_addr = ""; + config->relay_via = ""; + config->bind_ip = ""; + config->bind_device = ""; + config->peer_id = VIDEO_DEFAULT_PEER_ID; + config->target_peer = VIDEO_DEFAULT_TARGET_PEER; + config->capture_width = VIDEO_CAPTURE_WIDTH_DEFAULT; + config->capture_height = VIDEO_CAPTURE_HEIGHT_DEFAULT; + config->output_width = VIDEO_OUTPUT_WIDTH_DEFAULT; + config->output_height = VIDEO_OUTPUT_HEIGHT_DEFAULT; + config->max_frames = 0; + config->enable_timing_logs = 0; +} + +void video_pipeline_config_load_env(video_pipeline_config_t *config) { + if (config == NULL) { + return; + } + config->camera_device = env_or_default("OMNI_CAMERA_DEVICE", config->camera_device); + config->server_addr = env_first_nonempty("OMNI_VIDEO_SERVER_ADDR", "OMNISOCKET_SERVER_ADDR", config->server_addr); + config->relay_via = env_first_nonempty("OMNI_VIDEO_RELAY_VIA", "OMNISOCKET_RELAY_VIA", config->relay_via); + config->bind_ip = env_first_nonempty("OMNI_VIDEO_BIND_IP", "OMNISOCKET_BIND_IP", config->bind_ip); + config->bind_device = env_first_nonempty("OMNI_VIDEO_BIND_DEVICE", "OMNISOCKET_BIND_DEVICE", config->bind_device); + config->peer_id = env_or_default("OMNI_VIDEO_PEER_ID", config->peer_id); + config->target_peer = env_or_default("OMNI_VIDEO_TARGET_PEER", config->target_peer); + if (getenv("OMNI_VIDEO_MAX_FRAMES") != NULL) { + config->max_frames = atoi(getenv("OMNI_VIDEO_MAX_FRAMES")); + } + config->enable_timing_logs = env_flag_or_default("OMNI_VIDEO_DEBUG_TIMING", config->enable_timing_logs); +} + +int video_pipeline_stats_init(video_pipeline_stats_t *stats) { + int rc; + if (stats == NULL) { + errno = EINVAL; + return -1; + } + memset(stats, 0, sizeof(*stats)); + rc = pthread_mutex_init(&stats->mutex, NULL); + if (rc != 0) { + errno = rc; + return -1; + } + return 0; +} + +void video_pipeline_stats_destroy(video_pipeline_stats_t *stats) { + if (stats == NULL) { + return; + } + pthread_mutex_destroy(&stats->mutex); +} + +void video_pipeline_stats_snapshot(video_pipeline_stats_t *stats, video_pipeline_stats_t *out_stats) { + if (stats == NULL || out_stats == NULL) { + return; + } + memset(out_stats, 0, sizeof(*out_stats)); + pthread_mutex_lock(&stats->mutex); + out_stats->frames_sent = stats->frames_sent; + out_stats->bytes_sent = stats->bytes_sent; + out_stats->send_errors = stats->send_errors; + out_stats->last_frame_bytes = stats->last_frame_bytes; + out_stats->connected = stats->connected; + snprintf(out_stats->last_error, sizeof(out_stats->last_error), "%s", stats->last_error); + out_stats->transport = stats->transport; + pthread_mutex_unlock(&stats->mutex); +} + +static int open_v4l2_device(const char *device) { + return open(device, O_RDWR | O_NONBLOCK); +} + +static int init_v4l2_device(int fd, int width, int height) { + struct v4l2_format fmt; + + memset(&fmt, 0, sizeof(fmt)); + fmt.type = V4L2_BUF_TYPE_VIDEO_CAPTURE; + fmt.fmt.pix.width = width; + fmt.fmt.pix.height = height; + fmt.fmt.pix.pixelformat = V4L2_PIX_FMT_MJPEG; + fmt.fmt.pix.field = V4L2_FIELD_NONE; + return ioctl(fd, VIDIOC_S_FMT, &fmt); +} + +static int init_mmap(int fd, video_buffer_t **buffers, int *num_buffers) { + struct v4l2_requestbuffers req; + int i; + + memset(&req, 0, sizeof(req)); + req.count = VIDEO_NUM_BUFFERS; + req.type = V4L2_BUF_TYPE_VIDEO_CAPTURE; + req.memory = V4L2_MEMORY_MMAP; + if (ioctl(fd, VIDIOC_REQBUFS, &req) < 0) { + return -1; + } + + *num_buffers = (int) req.count; + *buffers = (video_buffer_t *) calloc(req.count, sizeof(video_buffer_t)); + if (*buffers == NULL) { + return -1; + } + + for (i = 0; i < (int) req.count; i++) { + struct v4l2_buffer buf; + + memset(&buf, 0, sizeof(buf)); + buf.type = V4L2_BUF_TYPE_VIDEO_CAPTURE; + buf.memory = V4L2_MEMORY_MMAP; + buf.index = (unsigned int) i; + if (ioctl(fd, VIDIOC_QUERYBUF, &buf) < 0) { + return -1; + } + + (*buffers)[i].length = buf.length; + (*buffers)[i].start = mmap(NULL, buf.length, PROT_READ | PROT_WRITE, MAP_SHARED, fd, buf.m.offset); + if ((*buffers)[i].start == MAP_FAILED) { + return -1; + } + } + + return 0; +} + +static AVCodecContext *create_mjpeg_decoder(int width, int height) { + const AVCodec *decoder = avcodec_find_decoder(AV_CODEC_ID_MJPEG); + AVCodecContext *ctx; + AVDictionary *opts = NULL; + + if (decoder == NULL) { + errno = ENOENT; + return NULL; + } + + ctx = avcodec_alloc_context3(decoder); + if (ctx == NULL) { + return NULL; + } + ctx->width = width; + ctx->height = height; + ctx->pix_fmt = AV_PIX_FMT_YUVJ420P; + ctx->color_range = AVCOL_RANGE_JPEG; + ctx->thread_count = 1; + + av_dict_set(&opts, "flags2", "+fast", 0); + if (avcodec_open2(ctx, decoder, &opts) < 0) { + avcodec_free_context(&ctx); + av_dict_free(&opts); + errno = EINVAL; + return NULL; + } + av_dict_free(&opts); + return ctx; +} + +static AVCodecContext *create_mjpeg_encoder(int width, int height) { + const AVCodec *encoder = avcodec_find_encoder(AV_CODEC_ID_MJPEG); + AVCodecContext *ctx; + AVDictionary *opts = NULL; + + if (encoder == NULL) { + errno = ENOENT; + return NULL; + } + + ctx = avcodec_alloc_context3(encoder); + if (ctx == NULL) { + return NULL; + } + ctx->width = width; + ctx->height = height; + ctx->pix_fmt = AV_PIX_FMT_YUVJ420P; + ctx->time_base = (AVRational){1, 30}; + ctx->qmin = 8; + ctx->qmax = 31; + ctx->flags |= AV_CODEC_FLAG_QSCALE; + ctx->global_quality = FF_QP2LAMBDA * 5; + + av_dict_set(&opts, "huffman", "default", 0); + if (avcodec_open2(ctx, encoder, &opts) < 0) { + avcodec_free_context(&ctx); + av_dict_free(&opts); + errno = EINVAL; + return NULL; + } + av_dict_free(&opts); + return ctx; +} + +static int decode_mjpeg_frame(AVCodecContext *decoder, const uint8_t *data, int size, AVFrame **frame) { + AVPacket *pkt; + int ret; + + if (frame == NULL) { + errno = EINVAL; + return -1; + } + + *frame = NULL; + pkt = av_packet_alloc(); + if (pkt == NULL) { + return -1; + } + pkt->data = (uint8_t *) data; + pkt->size = size; + + ret = avcodec_send_packet(decoder, pkt); + if (ret < 0) { + av_packet_free(&pkt); + errno = EINVAL; + return -1; + } + + *frame = av_frame_alloc(); + if (*frame == NULL) { + av_packet_free(&pkt); + return -1; + } + + ret = avcodec_receive_frame(decoder, *frame); + av_packet_free(&pkt); + if (ret < 0) { + av_frame_free(frame); + errno = EINVAL; + return -1; + } + return 0; +} + +static int ensure_scale_context( + struct SwsContext **sws_ctx, + int *cached_src_width, + int *cached_src_height, + int *cached_src_format, + const AVFrame *src, + int output_width, + int output_height +) { + if ( + *sws_ctx != NULL + && *cached_src_width == src->width + && *cached_src_height == src->height + && *cached_src_format == src->format + ) { + return 0; + } + + sws_freeContext(*sws_ctx); + *sws_ctx = sws_getContext( + src->width, + src->height, + src->format, + output_width, + output_height, + AV_PIX_FMT_YUVJ420P, + SWS_BILINEAR, + NULL, + NULL, + NULL + ); + if (*sws_ctx == NULL) { + errno = EINVAL; + return -1; + } + + *cached_src_width = src->width; + *cached_src_height = src->height; + *cached_src_format = src->format; + return 0; +} + +static int scale_frame(AVFrame *src, AVFrame **dst, struct SwsContext *sws_ctx, int output_width, int output_height) { + int ret; + + if (sws_ctx == NULL) { + errno = EINVAL; + return -1; + } + + *dst = av_frame_alloc(); + if (*dst == NULL) { + return -1; + } + (*dst)->width = output_width; + (*dst)->height = output_height; + (*dst)->format = AV_PIX_FMT_YUVJ420P; + if (av_frame_get_buffer(*dst, 0) < 0) { + av_frame_free(dst); + errno = ENOMEM; + return -1; + } + + ret = sws_scale( + sws_ctx, + (const uint8_t *const *) src->data, + src->linesize, + 0, + src->height, + (*dst)->data, + (*dst)->linesize + ); + if (ret < 0) { + av_frame_free(dst); + errno = EINVAL; + return -1; + } + return 0; +} + +static int video_sender_ensure_buffer_capacity(video_sender_t *sender, size_t min_capacity) { + uint8_t *resized_buffer; + size_t next_capacity; + + if (sender == NULL) { + errno = EINVAL; + return -1; + } + if (sender->send_buffer_cap >= min_capacity) { + return 0; + } + + next_capacity = sender->send_buffer_cap == 0 ? min_capacity : sender->send_buffer_cap; + while (next_capacity < min_capacity) { + next_capacity *= 2; + } + + resized_buffer = (uint8_t *) realloc(sender->send_buffer, next_capacity); + if (resized_buffer == NULL) { + return -1; + } + + sender->send_buffer = resized_buffer; + sender->send_buffer_cap = next_capacity; + return 0; +} + +static int encode_frame(AVCodecContext *encoder, AVFrame *frame, AVPacket **pkt) { + int ret; + + if (pkt == NULL) { + errno = EINVAL; + return -1; + } + + *pkt = av_packet_alloc(); + if (*pkt == NULL) { + return -1; + } + ret = avcodec_send_frame(encoder, frame); + if (ret < 0) { + av_packet_free(pkt); + errno = EINVAL; + return -1; + } + + ret = avcodec_receive_packet(encoder, *pkt); + if (ret < 0) { + av_packet_free(pkt); + errno = EINVAL; + return -1; + } + return 0; +} + +static int64_t get_realtime_ms(void) { + struct timespec ts; + clock_gettime(CLOCK_REALTIME, &ts); + return (int64_t) ts.tv_sec * 1000 + ts.tv_nsec / 1000000; +} + +static int video_sender_init(video_sender_t *sender, const video_pipeline_config_t *config) { + kcp_conn_options_t options; + + if (sender == NULL || config == NULL || config->server_addr == NULL || config->server_addr[0] == '\0') { + errno = EINVAL; + return -1; + } + + memset(sender, 0, sizeof(*sender)); + snprintf(sender->target_peer, sizeof(sender->target_peer), "%s", config->target_peer); + kcp_conn_options_set_video_defaults(&options); + sender->client = kcp_client_dial_with_options( + config->server_addr, + config->relay_via, + config->peer_id, + config->bind_ip, + config->bind_device, + &options, + NULL, + NULL, + NULL, + KCP_DEFAULT_STATS_INTERVAL_MS + ); + if (sender->client == NULL) { + return -1; + } + return 0; +} + +static int video_sender_send_packet(video_sender_t *sender, const AVPacket *encoded_pkt, uint64_t timestamp) { + uint8_t *payload; + size_t payload_len; + int rc; + + if (sender == NULL || sender->client == NULL || encoded_pkt == NULL) { + errno = EINVAL; + return -1; + } + + payload_len = (size_t) encoded_pkt->size + sizeof(timestamp); + if (video_sender_ensure_buffer_capacity(sender, payload_len) != 0) { + return -1; + } + payload = sender->send_buffer; + + memcpy(payload, encoded_pkt->data, (size_t) encoded_pkt->size); + memcpy(payload + encoded_pkt->size, ×tamp, sizeof(timestamp)); + rc = kcp_client_send_binary(sender->client, sender->target_peer, payload, payload_len); + return rc; +} + +static void video_sender_close(video_sender_t *sender) { + if (sender == NULL) { + return; + } + if (sender->client != NULL) { + kcp_client_close(sender->client); + kcp_client_free(sender->client); + sender->client = NULL; + } + free(sender->send_buffer); + sender->send_buffer = NULL; + sender->send_buffer_cap = 0; +} + +static void video_pipeline_cleanup_buffers(video_buffer_t *buffers, int num_buffers) { + int i; + if (buffers == NULL) { + return; + } + for (i = 0; i < num_buffers; i++) { + if (buffers[i].start != NULL && buffers[i].start != MAP_FAILED) { + munmap(buffers[i].start, buffers[i].length); + } + } + free(buffers); +} + +int video_pipeline_run(const video_pipeline_config_t *config, video_pipeline_stats_t *stats, volatile sig_atomic_t *stop_requested) { + video_pipeline_config_t defaults; + video_sender_t sender; + video_buffer_t *buffers = NULL; + AVCodecContext *decoder = NULL; + AVCodecContext *encoder = NULL; + struct SwsContext *sws_ctx = NULL; + enum v4l2_buf_type type = V4L2_BUF_TYPE_VIDEO_CAPTURE; + int num_buffers = 0; + int fd = -1; + int frame_index = 0; + int rc = -1; + int sws_src_width = 0; + int sws_src_height = 0; + int sws_src_format = -1; + + memset(&sender, 0, sizeof(sender)); + if (stats == NULL) { + errno = EINVAL; + return -1; + } + + video_pipeline_config_init(&defaults); + if (config == NULL) { + config = &defaults; + } + +#ifdef QUIET_FFMPEG_LOGS + av_log_set_level(AV_LOG_ERROR); +#endif + + if (config->server_addr == NULL || config->server_addr[0] == '\0') { + errno = EINVAL; + video_pipeline_set_error(stats, "video server address is required"); + return -1; + } + + fd = open_v4l2_device(config->camera_device); + if (fd < 0) { + video_pipeline_set_errno_error(stats, "failed to open camera device"); + goto cleanup; + } + if (init_v4l2_device(fd, config->capture_width, config->capture_height) < 0) { + video_pipeline_set_errno_error(stats, "failed to configure V4L2"); + goto cleanup; + } + if (init_mmap(fd, &buffers, &num_buffers) < 0) { + video_pipeline_set_errno_error(stats, "failed to initialize V4L2 mmap"); + goto cleanup; + } + + decoder = create_mjpeg_decoder(config->capture_width, config->capture_height); + encoder = create_mjpeg_encoder(config->output_width, config->output_height); + if (decoder == NULL || encoder == NULL) { + video_pipeline_set_errno_error(stats, "failed to initialize codecs"); + goto cleanup; + } + + if (video_sender_init(&sender, config) < 0) { + video_pipeline_set_errno_error(stats, "failed to start video sender"); + goto cleanup; + } + + pthread_mutex_lock(&stats->mutex); + stats->connected = 1; + stats->last_error[0] = '\0'; + pthread_mutex_unlock(&stats->mutex); + + for (frame_index = 0; frame_index < num_buffers; frame_index++) { + struct v4l2_buffer buf; + + memset(&buf, 0, sizeof(buf)); + buf.type = V4L2_BUF_TYPE_VIDEO_CAPTURE; + buf.memory = V4L2_MEMORY_MMAP; + buf.index = (unsigned int) frame_index; + if (ioctl(fd, VIDIOC_QBUF, &buf) < 0) { + video_pipeline_set_errno_error(stats, "failed to queue V4L2 buffer"); + goto cleanup; + } + } + + if (ioctl(fd, VIDIOC_STREAMON, &type) < 0) { + video_pipeline_set_errno_error(stats, "failed to start V4L2 streaming"); + goto cleanup; + } + if (config->enable_timing_logs) { + fprintf(stderr, "\nRunning video pipeline timing benchmark...\n"); + video_pipeline_print_timing_header(); + } + + frame_index = 0; + while (!video_pipeline_stop_requested(stop_requested)) { + fd_set fds; + struct timeval timeout; + struct v4l2_buffer buf; + AVFrame *decoded_frame = NULL; + AVFrame *scaled_frame = NULL; + AVPacket *encoded_pkt = NULL; + int select_rc; + double total_start_ms = 0.0; + double capture_start_ms = 0.0; + double capture_end_ms = 0.0; + double decode_start_ms = 0.0; + double decode_end_ms = 0.0; + double scale_start_ms = 0.0; + double scale_end_ms = 0.0; + double encode_start_ms = 0.0; + double encode_end_ms = 0.0; + double send_start_ms = 0.0; + double send_end_ms = 0.0; + int frame_number = frame_index + 1; + + if (config->max_frames > 0 && frame_index >= config->max_frames) { + break; + } + if (config->enable_timing_logs) { + total_start_ms = video_pipeline_now_ms(); + } + + FD_ZERO(&fds); + FD_SET(fd, &fds); + timeout.tv_sec = 2; + timeout.tv_usec = 0; + select_rc = select(fd + 1, &fds, NULL, NULL, &timeout); + if (select_rc <= 0) { + if (select_rc == 0) { + errno = ETIMEDOUT; + } + video_pipeline_set_errno_error(stats, "failed waiting for camera frame"); + goto cleanup; + } + if (config->enable_timing_logs) { + capture_start_ms = video_pipeline_now_ms(); + } + + memset(&buf, 0, sizeof(buf)); + buf.type = V4L2_BUF_TYPE_VIDEO_CAPTURE; + buf.memory = V4L2_MEMORY_MMAP; + if (ioctl(fd, VIDIOC_DQBUF, &buf) < 0) { + video_pipeline_set_errno_error(stats, "failed to dequeue V4L2 buffer"); + goto cleanup; + } + if (config->enable_timing_logs) { + capture_end_ms = video_pipeline_now_ms(); + decode_start_ms = capture_end_ms; + } + + if (decode_mjpeg_frame(decoder, (const uint8_t *) buffers[buf.index].start, (int) buf.bytesused, &decoded_frame) != 0) { + if (config->enable_timing_logs) { + video_pipeline_print_timing_failure(frame_number, "decode"); + } + (void) ioctl(fd, VIDIOC_QBUF, &buf); + continue; + } + if (config->enable_timing_logs) { + decode_end_ms = video_pipeline_now_ms(); + scale_start_ms = decode_end_ms; + } + if ( + ensure_scale_context( + &sws_ctx, + &sws_src_width, + &sws_src_height, + &sws_src_format, + decoded_frame, + config->output_width, + config->output_height + ) != 0 + || scale_frame(decoded_frame, &scaled_frame, sws_ctx, config->output_width, config->output_height) != 0 + ) { + if (config->enable_timing_logs) { + video_pipeline_print_timing_failure(frame_number, "scale"); + } + av_frame_free(&decoded_frame); + (void) ioctl(fd, VIDIOC_QBUF, &buf); + continue; + } + if (config->enable_timing_logs) { + scale_end_ms = video_pipeline_now_ms(); + encode_start_ms = scale_end_ms; + } + if (encode_frame(encoder, scaled_frame, &encoded_pkt) != 0) { + if (config->enable_timing_logs) { + video_pipeline_print_timing_failure(frame_number, "encode"); + } + av_frame_free(&decoded_frame); + av_frame_free(&scaled_frame); + (void) ioctl(fd, VIDIOC_QBUF, &buf); + continue; + } + if (config->enable_timing_logs) { + encode_end_ms = video_pipeline_now_ms(); + send_start_ms = encode_end_ms; + } + + if (video_sender_send_packet(&sender, encoded_pkt, (uint64_t) get_realtime_ms()) != 0) { + pthread_mutex_lock(&stats->mutex); + stats->send_errors += 1; + pthread_mutex_unlock(&stats->mutex); + if (config->enable_timing_logs) { + video_pipeline_print_timing_failure(frame_number, "send"); + } + video_pipeline_set_errno_error(stats, "failed to send video packet"); + av_frame_free(&decoded_frame); + av_frame_free(&scaled_frame); + av_packet_free(&encoded_pkt); + (void) ioctl(fd, VIDIOC_QBUF, &buf); + goto cleanup; + } + if (config->enable_timing_logs) { + send_end_ms = video_pipeline_now_ms(); + } + + pthread_mutex_lock(&stats->mutex); + stats->frames_sent += 1; + stats->bytes_sent += (uint64_t) encoded_pkt->size; + stats->last_frame_bytes = (uint64_t) encoded_pkt->size; + kcp_client_runtime_stats_snapshot(sender.client, &stats->transport); + pthread_mutex_unlock(&stats->mutex); + if (config->enable_timing_logs) { + video_pipeline_print_timing_row( + frame_number, + capture_end_ms - capture_start_ms, + decode_end_ms - decode_start_ms, + scale_end_ms - scale_start_ms, + encode_end_ms - encode_start_ms, + send_end_ms - send_start_ms, + send_end_ms - total_start_ms, + encoded_pkt + ); + } + + av_frame_free(&decoded_frame); + av_frame_free(&scaled_frame); + av_packet_free(&encoded_pkt); + + if (ioctl(fd, VIDIOC_QBUF, &buf) < 0) { + video_pipeline_set_errno_error(stats, "failed to requeue V4L2 buffer"); + goto cleanup; + } + frame_index += 1; + } + + rc = 0; + +cleanup: + pthread_mutex_lock(&stats->mutex); + stats->connected = 0; + pthread_mutex_unlock(&stats->mutex); + if (fd >= 0) { + (void) ioctl(fd, VIDIOC_STREAMOFF, &type); + } + video_sender_close(&sender); + if (encoder != NULL) { + avcodec_free_context(&encoder); + } + if (decoder != NULL) { + avcodec_free_context(&decoder); + } + sws_freeContext(sws_ctx); + video_pipeline_cleanup_buffers(buffers, num_buffers); + if (fd >= 0) { + close(fd); + } + return rc; +}