diff --git a/Makefile b/Makefile index 8c9d6e1..1fc8978 100644 --- a/Makefile +++ b/Makefile @@ -43,6 +43,7 @@ TARGETS := \ CAMERA_VIDEO_SENDER := $(BIN_DIR)/camera_video_sender FFMPEG_PIPELINE_COMMON_SRCS := \ $(SRC_DIR)/video_pipeline.c \ + $(SRC_DIR)/gps_buffer.c \ $(SRC_DIR)/omni_common.c \ $(SRC_DIR)/protocol.c \ $(SRC_DIR)/latencylog.c \ @@ -90,12 +91,12 @@ $(BIN_DIR)/kcpping: $(CMD_DIR)/kcpping.c $(COMMON_SRCS) | $(BIN_DIR) $(CC) $(CFLAGS) $(CPPFLAGS) -o $@ $^ $(LDFLAGS) $(CAMERA_VIDEO_SENDER): $(CAMERA_VIDEO_SENDER_SRCS) | $(BIN_DIR) - $(CC) $(CFLAGS) $(CPPFLAGS) $$(pkg-config --cflags libavformat libavcodec libavutil libswscale) -o $@ $^ $(LDFLAGS) $$(pkg-config --libs libavformat libavcodec libavutil libswscale) + $(CC) $(CFLAGS) $(CPPFLAGS) $$(pkg-config --cflags libavformat libavcodec libavutil libswscale) -o $@ $^ $(LDFLAGS) $$(pkg-config --libs libavformat libavcodec libavutil libswscale) -lm camera_video_sender: $(CAMERA_VIDEO_SENDER) $(B_SIDE_OMNID): $(B_SIDE_OMNID_SRCS) | $(BIN_DIR) - $(CC) $(CFLAGS) $(CPPFLAGS) $$(pkg-config --cflags libavformat libavcodec libavutil libswscale) -o $@ $^ $(LDFLAGS) $$(pkg-config --libs libavformat libavcodec libavutil libswscale) + $(CC) $(CFLAGS) $(CPPFLAGS) $$(pkg-config --cflags libavformat libavcodec libavutil libswscale) -o $@ $^ $(LDFLAGS) $$(pkg-config --libs libavformat libavcodec libavutil libswscale) -lm b_side_omnid: $(B_SIDE_OMNID) diff --git a/include/gps_buffer.h b/include/gps_buffer.h new file mode 100644 index 0000000..13c8363 --- /dev/null +++ b/include/gps_buffer.h @@ -0,0 +1,11 @@ +#ifndef GPS_BUFFER_H +#define GPS_BUFFER_H + +#include + +uint64_t get_latest_gps_for_video(void); + + +int gps_buffer_init(const char* host); +void gps_buffer_cleanup(void); +#endif \ No newline at end of file diff --git a/include/video_pipeline.h b/include/video_pipeline.h index f5c061a..06ee388 100644 --- a/include/video_pipeline.h +++ b/include/video_pipeline.h @@ -5,12 +5,18 @@ #include #include +#include "gps_buffer.h" #include "peer_kcp_client.h" #ifdef __cplusplus extern "C" { #endif +typedef struct video_pipeline_packet_metadata { + uint64_t timestamp_ms; + uint64_t gps_data; +} video_pipeline_packet_metadata_t; + typedef struct video_pipeline_config { const char *camera_device; const char *server_addr; diff --git a/src/gps_buffer.c b/src/gps_buffer.c new file mode 100644 index 0000000..6193dcb --- /dev/null +++ b/src/gps_buffer.c @@ -0,0 +1,280 @@ +#include "gps_buffer.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include // 确保包含 errno + +// 全局共享变量 +static volatile uint64_t g_current_gps_data = 0; +static volatile int g_running = 0; +static pthread_t g_gps_thread; + +// 将经纬度打包进 uint64_t +static uint64_t pack_gps(double latitude, double longitude) { + if (!isfinite(latitude) || !isfinite(longitude)) { + return 0; + } + // 过滤掉 0,0 这种无效坐标 + if (fabs(latitude) < 1e-6 && fabs(longitude) < 1e-6) { + return 0; + } + + // 转换为整数微度 (micro-degrees) + int32_t lat_i = (int32_t)(latitude * 1000000.0); + int32_t lon_i = (int32_t)(longitude * 1000000.0); + + return ((uint64_t)(uint32_t)lat_i << 32) | (uint64_t)(uint32_t)lon_i; +} + +// ================================================================= +// 以下是借鉴 gps_parse.c 实现的底层解析函数 +// ================================================================= + +// 1. 辅助函数:在 JSON 字符串中查找键对应的值的起始位置 +static const char* find_json_value(const char* json, const char* key) { + char pattern[64]; + int written; + const char* position; + + if (json == NULL || key == NULL) return NULL; + + // 构建搜索模式: "key": + written = snprintf(pattern, sizeof(pattern), "\"%s\":", key); + if (written < 0 || (size_t)written >= sizeof(pattern)) { + return NULL; + } + + position = strstr(json, pattern); + if (position == NULL) { + return NULL; + } + + // 跳过 "key": + position += written; + + // 跳过可能存在的空格 + while (*position == ' ' || *position == '\t') { + position++; + } + + return position; +} + +// 2. 解析函数:从 JSON 字符串中提取 Double 类型的值 +static int json_extract_double(const char* json, const char* key, double* value) { + const char* position; + char* endptr = NULL; + double parsed; + + position = find_json_value(json, key); + if (position == NULL) { + return 0; // 键不存在 + } + + // 处理负号 + if (*position == '-') { + position++; + } + + // 确保当前位置是数字 + if (!(*position >= '0' && *position <= '9')) { + return 0; + } + + // 重置 errno 以检测错误 + errno = 0; + parsed = strtod(position, &endptr); + + // 检查转换是否成功 + if (errno != 0 || endptr == position || !isfinite(parsed)) { + return 0; + } + + *value = parsed; + return 1; +} + +// 3. 解析函数:从 JSON 字符串中提取 Int 类型的值 +static int json_extract_int(const char* json, const char* key, int* value) { + double dval; + if (json_extract_double(json, key, &dval)) { + *value = (int)dval; + return 1; + } + return 0; +} + +// 4. 检查是否为 TPV (定位数据) 包 +static int is_tpv_class(const char* json) { + char class_buf[32] = {0}; + const char* pos = find_json_value(json, "class"); + if (pos == NULL || *pos != '"') return 0; + + // 简单提取 class 的值 (TPV/SKY/DEVICES) + sscanf(pos, "\"%31[^\"]\"", class_buf); + return (strcmp(class_buf, "TPV") == 0); +} + +// ================================================================= +// 后台线程函数:负责连接 gpsd 并更新全局变量 +// ================================================================= +void* gps_update_thread(void* arg) { + const char* host = (const char*)arg; + int sockfd; + struct addrinfo hints, *res, *rp; + int s; + + // 1. 解析地址并连接 gpsd (默认端口 2947) + memset(&hints, 0, sizeof(hints)); + hints.ai_family = AF_UNSPEC; // 兼容 IPv4/IPv6 + hints.ai_socktype = SOCK_STREAM; + + s = getaddrinfo(host, "2947", &hints, &res); + if (s != 0) { + fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(s)); + return NULL; + } + + // 尝试连接每一个解析出来的地址 + for (rp = res; rp != NULL; rp = rp->ai_next) { + sockfd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol); + if (sockfd == -1) continue; + + if (connect(sockfd, rp->ai_addr, rp->ai_addrlen) != -1) { + break; // 成功连接 + } + close(sockfd); + } + + if (rp == NULL) { // 没有连接成功 + fprintf(stderr, "无法连接到 %s:2947\n", host); + freeaddrinfo(res); + return NULL; + } + freeaddrinfo(res); + + printf("GPS线程: 已连接到 gpsd %s\n", host); + + // 2. 发送 WATCH 命令,开启 JSON 流 + const char* watch_cmd = "?WATCH={\"enable\":true,\"json\":true};\n"; + if (send(sockfd, watch_cmd, strlen(watch_cmd), 0) < 0) { + perror("发送 WATCH 命令失败"); + close(sockfd); + return NULL; + } + + // 3. 主循环:读取并解析数据流 + // 注意:gpsd 数据是以 \n 结尾的,不能直接用固定长度 recv + char buffer[4096]; // 增大缓冲区以容纳长 JSON + size_t offset = 0; // 当前缓冲区数据长度 + + while (g_running) { + ssize_t len = recv(sockfd, buffer + offset, sizeof(buffer) - 1 - offset, 0); + + if (len <= 0) { + // 连接断开,进行重连逻辑 + break; + } + + offset += len; + buffer[offset] = '\0'; // 确保字符串结束 + + // 查找换行符 \n,因为一条完整的 JSON 消息以 \n 结尾 + char* start = buffer; + char* end; + + while ((end = memchr(start, '\n', buffer + offset - start)) != NULL) { + *end = '\0'; // 临时截断,形成独立字符串 + + // --- 核心解析逻辑 --- + // 1. 检查是否为 TPV 数据包 + if (is_tpv_class(start)) { + double lat = 0.0, lon = 0.0; + int mode = 0; + int has_fix = 0; + + // 2. 提取定位模式 (mode: 1=无定位, 2=2D, 3=3D) + if (json_extract_int(start, "mode", &mode)) { + has_fix = (mode >= 2); + } + + // 3. 如果有定位,提取经纬度 + if (has_fix) { + int got_lat = json_extract_double(start, "lat", &lat); + int got_lon = json_extract_double(start, "lon", &lon); + + if (got_lat && got_lon) { + // 4. 更新全局共享变量 (原子操作) + g_current_gps_data = pack_gps(lat, lon); + // 调试:取消注释可查看实时经纬度 + // printf("更新GPS: lat=%.6f, lon=%.6f\n", lat, lon); + } + } + // 如果无定位,这里不操作,保持上一次的有效值 + } + // --- 解析结束 --- + + // 移动指针到下一条消息 + start = end + 1; + } + + // 处理完所有完整消息后,将剩余未处理的数据移到缓冲区头部 + // (这种情况很少见,但为了严谨性) + if (start < buffer + offset) { + size_t remaining = (buffer + offset) - start; + memmove(buffer, start, remaining); + offset = remaining; + } else { + offset = 0; // 缓冲区已清空 + } + } + + // 循环结束,清理资源 + close(sockfd); + printf("GPS线程: 连接断开,尝试重连...\n"); + + // 这里可以添加一个短暂的休眠防止重连风暴 + // 但通常主程序会重启线程 + return NULL; +} + +// ================================================================= +// 接口函数实现 +// ================================================================= +uint64_t get_latest_gps_for_video(void) { + return g_current_gps_data; +} + +int gps_buffer_init(const char* host) { + if (g_running) return 0; + + g_running = 1; + g_current_gps_data = 0; + pthread_attr_t attr; + pthread_attr_init(&attr); + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); + // 创建后台线程 + if (pthread_create(&g_gps_thread, NULL, gps_update_thread, (void*)host) != 0) { + g_running = 0; + pthread_attr_destroy(&attr); // 清理属性 + perror("无法创建 GPS 线程"); + return -1; + } + pthread_attr_destroy(&attr); // 清理属性 + return 0; +} + +void gps_buffer_cleanup(void) { + g_running = 0; + // 等待线程结束 + + usleep(10000); // 等待 100ms 让后台线程有机会处理退出标志 +} + + +//gcc main.c video_pipeline_run.c gps_buffer.c -lpthread -lm -o my_app 请确保在编译命令中链接 pthread 和 m (math) 库 \ No newline at end of file diff --git a/src/video_pipeline.c b/src/video_pipeline.c index 2e35347..d7ed569 100644 --- a/src/video_pipeline.c +++ b/src/video_pipeline.c @@ -621,24 +621,28 @@ static int video_sender_drain_pending_messages(video_sender_t *sender) { } } -static int video_sender_send_packet(video_sender_t *sender, const AVPacket *encoded_pkt, uint64_t timestamp) { +static int video_sender_send_packet( + video_sender_t *sender, + const AVPacket *encoded_pkt, + const video_pipeline_packet_metadata_t *metadata +) { uint8_t *payload; size_t payload_len; int rc; - if (sender == NULL || sender->client == NULL || encoded_pkt == NULL) { + if (sender == NULL || sender->client == NULL || encoded_pkt == NULL || metadata == NULL) { errno = EINVAL; return -1; } - payload_len = (size_t) encoded_pkt->size + sizeof(timestamp); + payload_len = (size_t) encoded_pkt->size + sizeof(*metadata); if (video_sender_ensure_buffer_capacity(sender, payload_len) != 0) { return -1; } payload = sender->send_buffer; memcpy(payload, encoded_pkt->data, (size_t) encoded_pkt->size); - memcpy(payload + encoded_pkt->size, ×tamp, sizeof(timestamp)); + memcpy(payload + encoded_pkt->size, metadata, sizeof(*metadata)); rc = kcp_client_send_binary(sender->client, sender->target_peer, payload, payload_len); if (rc != 0) { return rc; @@ -837,9 +841,11 @@ int video_pipeline_run(const video_pipeline_config_t *config, video_pipeline_sta double encode_end_ms = 0.0; double send_start_ms = 0.0; double send_end_ms = 0.0; + video_pipeline_packet_metadata_t packet_metadata; int frame_number = frame_index + 1; memset(&transport_stats, 0, sizeof(transport_stats)); + memset(&packet_metadata, 0, sizeof(packet_metadata)); if (config->max_frames > 0 && frame_index >= config->max_frames) { break; @@ -924,6 +930,9 @@ int video_pipeline_run(const video_pipeline_config_t *config, video_pipeline_sta send_start_ms = encode_end_ms; } + packet_metadata.timestamp_ms = (uint64_t) get_realtime_ms(); + packet_metadata.gps_data = get_latest_gps_for_video(); + kcp_client_runtime_stats_snapshot(sender.client, &transport_stats); if (video_sender_hard_backpressure_active(config, &transport_stats)) { uint32_t now_ms = omni_now_millis32(); @@ -1002,7 +1011,7 @@ int video_pipeline_run(const video_pipeline_config_t *config, video_pipeline_sta continue; } - if (video_sender_send_packet(&sender, encoded_pkt, (uint64_t) get_realtime_ms()) != 0) { + if (video_sender_send_packet(&sender, encoded_pkt, &packet_metadata) != 0) { pthread_mutex_lock(&stats->mutex); stats->send_errors += 1; pthread_mutex_unlock(&stats->mutex);