Compare commits

..

13 Commits

29 changed files with 4931 additions and 23 deletions

View File

@@ -9,7 +9,9 @@
"Bash(git status:*)",
"Bash(git fetch:*)",
"Bash(git pull:*)",
"Bash(wc:*)"
"Bash(wc:*)",
"Bash(python3 -c \"import dis, marshal, types; f = open\\(''''C:/Users/64187/Desktop/Workspace/OmniSocketGo/__pycache__/omnisocket_video_sender.cpython-312.pyc'''',''''rb''''\\); f.read\\(16\\); code=marshal.load\\(f\\); dis.dis\\(code\\)\")",
"Bash(gh pr:*)"
]
}
}

9
.gitignore vendored
View File

@@ -12,4 +12,11 @@ root@117.78.11.244
c/bin
*__pycache__*
*__pycache__*
/python/build
/python/omnisocket.egg-info
*.so*
/.venv

View File

@@ -36,6 +36,34 @@ TARGETS := \
$(BIN_DIR)/kcppeer \
$(BIN_DIR)/kcpping
CAMERA_VIDEO_SENDER := $(BIN_DIR)/camera_video_sender
CAMERA_VIDEO_SENDER_SRCS := \
$(CMD_DIR)/v1_camera_pipeline_ifdef.c \
$(SRC_DIR)/omni_common.c \
$(SRC_DIR)/protocol.c \
$(SRC_DIR)/latencylog.c \
$(SRC_DIR)/kcp_packet_debug.c \
$(SRC_DIR)/kcp_session_stats.c \
$(SRC_DIR)/linux_timestamping.c \
$(SRC_DIR)/transport_kcp.c \
$(SRC_DIR)/peer_kcp_client.c \
third_party/cjson/cJSON.c \
third_party/kcp/ikcp.c
B_SIDE_VIDEO_SENDER := $(BIN_DIR)/b_side_video_sender
B_SIDE_VIDEO_SENDER_SRCS := \
$(CMD_DIR)/b_side_video_sender.c \
$(SRC_DIR)/omni_common.c \
$(SRC_DIR)/protocol.c \
$(SRC_DIR)/latencylog.c \
$(SRC_DIR)/kcp_packet_debug.c \
$(SRC_DIR)/kcp_session_stats.c \
$(SRC_DIR)/linux_timestamping.c \
$(SRC_DIR)/transport_kcp.c \
$(SRC_DIR)/peer_kcp_client.c \
third_party/cjson/cJSON.c \
third_party/kcp/ikcp.c
all: $(TARGETS)
$(BIN_DIR):
@@ -62,6 +90,16 @@ $(BIN_DIR)/kcppeer: $(CMD_DIR)/kcppeer.c $(COMMON_SRCS) | $(BIN_DIR)
$(BIN_DIR)/kcpping: $(CMD_DIR)/kcpping.c $(COMMON_SRCS) | $(BIN_DIR)
$(CC) $(CFLAGS) $(CPPFLAGS) -o $@ $^ $(LDFLAGS)
$(CAMERA_VIDEO_SENDER): $(CAMERA_VIDEO_SENDER_SRCS) | $(BIN_DIR)
$(CC) $(CFLAGS) $(CPPFLAGS) $$(pkg-config --cflags libavformat libavcodec libavutil libswscale) -o $@ $^ $(LDFLAGS) $$(pkg-config --libs libavformat libavcodec libavutil libswscale)
camera_video_sender: $(CAMERA_VIDEO_SENDER)
$(B_SIDE_VIDEO_SENDER): $(B_SIDE_VIDEO_SENDER_SRCS) | $(BIN_DIR)
$(CC) $(CFLAGS) $(CPPFLAGS) $$(pkg-config --cflags libavformat libavcodec libavutil libswscale) -o $@ $^ $(LDFLAGS) $$(pkg-config --libs libavformat libavcodec libavutil libswscale)
b_side_video_sender: $(B_SIDE_VIDEO_SENDER)
clean:
rm -rf $(BIN_DIR)
@@ -71,4 +109,4 @@ python-ext:
python-install:
cd python && $(PYTHON) -m pip install -e .
.PHONY: all clean python-ext python-install
.PHONY: all clean python-ext python-install camera_video_sender b_side_video_sender

View File

@@ -27,6 +27,30 @@ make python-ext
make python-install
```
## A-Side OmniDaemon
The A-side daemon is configured from `config/a_side_omnidaemon.yaml` in this repo. The safest way to start it is to pass that file explicitly, because the installed Python package does not bundle the YAML config.
Run from a source checkout:
```bash
python -m omnisocket_a_side.daemon --config "$(pwd)/config/a_side_omnidaemon.yaml"
```
Or, if you installed the console script:
```bash
OMNIDAEMON_CONFIG="$(pwd)/config/a_side_omnidaemon.yaml" \
omnisocket-a-side-daemon
```
Optional overrides:
- `OMNIDAEMON_SOCKET=/tmp/omnisocket-a-side.sock` selects the local UDS path.
- `OMNIDAEMON_CONFIG=/abs/path/to/a_side_omnidaemon.yaml` overrides `--config`.
For `robot-command-center` and the A-side senders, keep the daemon and its clients on the same Linux machine so they can share the Unix-domain socket.
## Run On Different Machines
Server `D` runs the KCP hub on `0.0.0.0:10909`:

933
cmd/b_side_video_sender.c Normal file
View File

@@ -0,0 +1,933 @@
#include <errno.h>
#include <fcntl.h>
#include <inttypes.h>
#include <linux/videodev2.h>
#include <pthread.h>
#include <signal.h>
#include <stdbool.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/ioctl.h>
#include <sys/mman.h>
#include <sys/select.h>
#include <time.h>
#include <unistd.h>
#include <libavcodec/avcodec.h>
#include <libavutil/avutil.h>
#include <libavutil/imgutils.h>
#include <libavutil/opt.h>
#include <libswscale/swscale.h>
#include "cJSON.h"
#include "peer_kcp_client.h"
#define WORKER_CONTROL_FD 3
#define WORKER_TELEMETRY_FD 4
#define WORKER_CONTROL_FD_ENV "OMNI_WORKER_CONTROL_FD"
#define WORKER_TELEMETRY_FD_ENV "OMNI_WORKER_TELEMETRY_FD"
#define NUM_BUFFERS 4
#define CLEAR(x) memset(&(x), 0, sizeof(x))
#define VIDEO_SERVER_ADDR_ENV "OMNI_VIDEO_SERVER_ADDR"
#define VIDEO_RELAY_ADDR_ENV "OMNI_VIDEO_RELAY_VIA"
#define VIDEO_BIND_IP_ENV "OMNI_VIDEO_BIND_IP"
#define VIDEO_BIND_DEVICE_ENV "OMNI_VIDEO_BIND_DEVICE"
#define VIDEO_PEER_ID_ENV "OMNI_VIDEO_PEER_ID"
#define VIDEO_TARGET_PEER_ENV "OMNI_VIDEO_TARGET_PEER"
#define VIDEO_DEVICE_ENV "OMNI_VIDEO_DEVICE"
#define VIDEO_CAPTURE_WIDTH_ENV "OMNI_VIDEO_CAPTURE_WIDTH"
#define VIDEO_CAPTURE_HEIGHT_ENV "OMNI_VIDEO_CAPTURE_HEIGHT"
#define VIDEO_OUTPUT_WIDTH_ENV "OMNI_VIDEO_OUTPUT_WIDTH"
#define VIDEO_OUTPUT_HEIGHT_ENV "OMNI_VIDEO_OUTPUT_HEIGHT"
#define VIDEO_FPS_ENV "OMNI_VIDEO_FPS"
#define VIDEO_JPEG_QSCALE_ENV "OMNI_VIDEO_JPEG_QSCALE"
#define VIDEO_MAX_FRAME_BYTES_ENV "OMNI_VIDEO_MAX_FRAME_BYTES"
#define VIDEO_STATS_INTERVAL_ENV "OMNI_VIDEO_STATS_INTERVAL_MS"
typedef struct {
void *start;
size_t length;
} Buffer;
typedef struct {
char server_addr[OMNI_MAX_ADDR_TEXT];
char relay_addr[OMNI_MAX_ADDR_TEXT];
char bind_ip[OMNI_MAX_ADDR_TEXT];
char bind_device[128];
char peer_id[OMNI_MAX_PEER_ID];
char target_peer[OMNI_MAX_PEER_ID];
char video_device[256];
int capture_width;
int capture_height;
int output_width;
int output_height;
int initial_fps;
int initial_qscale;
int initial_max_frame_bytes;
int stats_interval_ms;
} worker_config_t;
typedef struct {
pthread_mutex_t lock;
pthread_mutex_t telemetry_lock;
FILE *telemetry_stream;
int target_fps;
int jpeg_quality_qscale;
int max_frame_bytes;
bool shutdown_requested;
} runtime_state_t;
typedef struct {
kcp_client_t *client;
char target_peer[OMNI_MAX_PEER_ID];
} video_sender_t;
typedef struct {
FILE *control_stream;
runtime_state_t *runtime;
} control_thread_args_t;
static volatile sig_atomic_t g_signal_stop = 0;
static double monotonic_ms(void) {
struct timespec ts;
clock_gettime(CLOCK_MONOTONIC, &ts);
return (double) ts.tv_sec * 1000.0 + (double) ts.tv_nsec / 1000000.0;
}
static int64_t realtime_ms(void) {
struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
return (int64_t) ts.tv_sec * 1000 + (int64_t) ts.tv_nsec / 1000000;
}
static int env_as_int(const char *name, int fallback) {
const char *raw = getenv(name);
char *endptr = NULL;
long value;
if (raw == NULL || raw[0] == '\0') {
return fallback;
}
errno = 0;
value = strtol(raw, &endptr, 10);
if (errno != 0 || endptr == raw || (endptr != NULL && *endptr != '\0')) {
return fallback;
}
return (int) value;
}
static const char *env_or_default(const char *name, const char *fallback) {
const char *value = getenv(name);
if (value != NULL && value[0] != '\0') {
return value;
}
return fallback;
}
static void handle_signal(int signum) {
(void) signum;
g_signal_stop = 1;
}
static void runtime_set_shutdown(runtime_state_t *runtime, bool shutdown_requested) {
pthread_mutex_lock(&runtime->lock);
runtime->shutdown_requested = shutdown_requested;
pthread_mutex_unlock(&runtime->lock);
}
static bool runtime_should_stop(runtime_state_t *runtime) {
bool shutdown_requested;
pthread_mutex_lock(&runtime->lock);
shutdown_requested = runtime->shutdown_requested;
pthread_mutex_unlock(&runtime->lock);
return g_signal_stop != 0 || shutdown_requested;
}
static void runtime_get_profile(runtime_state_t *runtime, int *fps, int *qscale, int *max_frame_bytes) {
pthread_mutex_lock(&runtime->lock);
*fps = runtime->target_fps;
*qscale = runtime->jpeg_quality_qscale;
*max_frame_bytes = runtime->max_frame_bytes;
pthread_mutex_unlock(&runtime->lock);
}
static void runtime_set_profile(runtime_state_t *runtime, int fps, int qscale, int max_frame_bytes) {
pthread_mutex_lock(&runtime->lock);
if (fps > 0) {
runtime->target_fps = fps;
}
if (qscale > 0) {
runtime->jpeg_quality_qscale = qscale;
}
if (max_frame_bytes > 0) {
runtime->max_frame_bytes = max_frame_bytes;
}
pthread_mutex_unlock(&runtime->lock);
}
static void telemetry_write_line(runtime_state_t *runtime, const char *line) {
pthread_mutex_lock(&runtime->telemetry_lock);
if (runtime->telemetry_stream != NULL) {
fputs(line, runtime->telemetry_stream);
fputc('\n', runtime->telemetry_stream);
fflush(runtime->telemetry_stream);
}
pthread_mutex_unlock(&runtime->telemetry_lock);
}
static void telemetry_write_frame_stat(
runtime_state_t *runtime,
int frame_bytes,
int encode_us,
bool sent,
const char *drop_reason
) {
char line[512];
int written;
written = snprintf(
line,
sizeof(line),
"{\"type\":\"frame_stat\",\"frame_bytes\":%d,\"encode_us\":%d,"
"\"sent\":%s,\"drop_reason\":\"%s\",\"ts_unix_ms\":%" PRId64 "}",
frame_bytes,
encode_us,
sent ? "true" : "false",
drop_reason != NULL ? drop_reason : "",
realtime_ms());
if (written > 0 && written < (int) sizeof(line)) {
telemetry_write_line(runtime, line);
}
}
static void telemetry_write_kcp_metrics(runtime_state_t *runtime, const kcp_conn_metrics_t *metrics) {
char line[1024];
int written;
written = snprintf(
line,
sizeof(line),
"{\"type\":\"kcp_metrics\",\"connected\":%d,\"srtt_ms\":%d,\"srttvar_ms\":%d,"
"\"rto_ms\":%u,\"bytes_sent\":%" PRIu64 ",\"bytes_received\":%" PRIu64 ","
"\"out_pkts\":%" PRIu64 ",\"out_segs\":%" PRIu64 ",\"retrans_segs\":%" PRIu64 ","
"\"fast_retrans_segs\":%" PRIu64 ",\"early_retrans_segs\":%" PRIu64 ","
"\"lost_segs\":%" PRIu64 ",\"ring_buffer_snd_queue\":%" PRIu64 ","
"\"ring_buffer_snd_buffer\":%" PRIu64 ",\"ts_unix_ms\":%" PRId64 "}",
metrics->connected,
metrics->srtt_ms,
metrics->srttvar_ms,
metrics->rto_ms,
metrics->bytes_sent,
metrics->bytes_received,
metrics->out_pkts,
metrics->out_segs,
metrics->retrans_segs,
metrics->fast_retrans_segs,
metrics->early_retrans_segs,
metrics->lost_segs,
metrics->ring_buffer_snd_queue,
metrics->ring_buffer_snd_buffer,
realtime_ms());
if (written > 0 && written < (int) sizeof(line)) {
telemetry_write_line(runtime, line);
}
}
static int load_worker_config(worker_config_t *cfg) {
const char *server_addr = getenv(VIDEO_SERVER_ADDR_ENV);
const char *relay_addr = env_or_default(VIDEO_RELAY_ADDR_ENV, "");
if (cfg == NULL) {
errno = EINVAL;
return -1;
}
if ((server_addr == NULL || server_addr[0] == '\0') && relay_addr[0] != '\0') {
server_addr = relay_addr;
}
if (server_addr == NULL || server_addr[0] == '\0') {
fprintf(stderr, "%s is required\n", VIDEO_SERVER_ADDR_ENV);
errno = EINVAL;
return -1;
}
CLEAR(*cfg);
snprintf(cfg->server_addr, sizeof(cfg->server_addr), "%s", server_addr);
snprintf(cfg->relay_addr, sizeof(cfg->relay_addr), "%s", relay_addr);
snprintf(cfg->bind_ip, sizeof(cfg->bind_ip), "%s", env_or_default(VIDEO_BIND_IP_ENV, ""));
snprintf(cfg->bind_device, sizeof(cfg->bind_device), "%s", env_or_default(VIDEO_BIND_DEVICE_ENV, ""));
snprintf(cfg->peer_id, sizeof(cfg->peer_id), "%s", env_or_default(VIDEO_PEER_ID_ENV, "peer-b-video"));
snprintf(cfg->target_peer, sizeof(cfg->target_peer), "%s", env_or_default(VIDEO_TARGET_PEER_ENV, "peer-a-video"));
snprintf(cfg->video_device, sizeof(cfg->video_device), "%s", env_or_default(VIDEO_DEVICE_ENV, "/dev/video0"));
cfg->capture_width = env_as_int(VIDEO_CAPTURE_WIDTH_ENV, 1280);
cfg->capture_height = env_as_int(VIDEO_CAPTURE_HEIGHT_ENV, 720);
cfg->output_width = env_as_int(VIDEO_OUTPUT_WIDTH_ENV, 640);
cfg->output_height = env_as_int(VIDEO_OUTPUT_HEIGHT_ENV, 360);
cfg->initial_fps = env_as_int(VIDEO_FPS_ENV, 10);
cfg->initial_qscale = env_as_int(VIDEO_JPEG_QSCALE_ENV, 8);
cfg->initial_max_frame_bytes = env_as_int(VIDEO_MAX_FRAME_BYTES_ENV, 40960);
cfg->stats_interval_ms = env_as_int(VIDEO_STATS_INTERVAL_ENV, 100);
return 0;
}
static int open_v4l2_device(const char *device) {
int fd = open(device, O_RDWR | O_NONBLOCK);
if (fd < 0) {
perror("open device");
}
return fd;
}
static int init_v4l2_device(int fd, const worker_config_t *cfg) {
struct v4l2_format fmt;
struct v4l2_streamparm parm;
CLEAR(fmt);
fmt.type = V4L2_BUF_TYPE_VIDEO_CAPTURE;
fmt.fmt.pix.width = cfg->capture_width;
fmt.fmt.pix.height = cfg->capture_height;
fmt.fmt.pix.pixelformat = V4L2_PIX_FMT_MJPEG;
fmt.fmt.pix.field = V4L2_FIELD_NONE;
if (ioctl(fd, VIDIOC_S_FMT, &fmt) < 0) {
perror("VIDIOC_S_FMT");
return -1;
}
CLEAR(parm);
parm.type = V4L2_BUF_TYPE_VIDEO_CAPTURE;
if (cfg->initial_fps > 0 && ioctl(fd, VIDIOC_G_PARM, &parm) == 0) {
if ((parm.parm.capture.capability & V4L2_CAP_TIMEPERFRAME) != 0U) {
parm.parm.capture.timeperframe.numerator = 1U;
parm.parm.capture.timeperframe.denominator = (unsigned int) cfg->initial_fps;
if (ioctl(fd, VIDIOC_S_PARM, &parm) < 0) {
perror("VIDIOC_S_PARM");
}
}
}
return 0;
}
static int init_mmap(int fd, Buffer **buffers, int *num_buffers) {
struct v4l2_requestbuffers req;
int index;
CLEAR(req);
req.count = NUM_BUFFERS;
req.type = V4L2_BUF_TYPE_VIDEO_CAPTURE;
req.memory = V4L2_MEMORY_MMAP;
if (ioctl(fd, VIDIOC_REQBUFS, &req) < 0) {
perror("VIDIOC_REQBUFS");
return -1;
}
*num_buffers = (int) req.count;
*buffers = (Buffer *) calloc(req.count, sizeof(Buffer));
if (*buffers == NULL) {
return -1;
}
for (index = 0; index < (int) req.count; ++index) {
struct v4l2_buffer buf;
CLEAR(buf);
buf.type = V4L2_BUF_TYPE_VIDEO_CAPTURE;
buf.memory = V4L2_MEMORY_MMAP;
buf.index = (unsigned int) index;
if (ioctl(fd, VIDIOC_QUERYBUF, &buf) < 0) {
perror("VIDIOC_QUERYBUF");
return -1;
}
(*buffers)[index].length = buf.length;
(*buffers)[index].start = mmap(NULL, buf.length, PROT_READ | PROT_WRITE, MAP_SHARED, fd, buf.m.offset);
if ((*buffers)[index].start == MAP_FAILED) {
perror("mmap");
return -1;
}
}
return 0;
}
static int queue_all_buffers(int fd, int num_buffers) {
int index;
for (index = 0; index < num_buffers; ++index) {
struct v4l2_buffer buf;
CLEAR(buf);
buf.type = V4L2_BUF_TYPE_VIDEO_CAPTURE;
buf.memory = V4L2_MEMORY_MMAP;
buf.index = (unsigned int) index;
if (ioctl(fd, VIDIOC_QBUF, &buf) < 0) {
perror("VIDIOC_QBUF");
return -1;
}
}
return 0;
}
static int dequeue_latest_buffer(int fd, struct v4l2_buffer *latest_buf) {
struct v4l2_buffer latest_local;
bool have_latest = false;
if (latest_buf == NULL) {
errno = EINVAL;
return -1;
}
for (;;) {
struct v4l2_buffer current;
int dq_errno;
CLEAR(current);
current.type = V4L2_BUF_TYPE_VIDEO_CAPTURE;
current.memory = V4L2_MEMORY_MMAP;
if (ioctl(fd, VIDIOC_DQBUF, &current) < 0) {
dq_errno = errno;
if (dq_errno == EINTR) {
continue;
}
if (dq_errno == EAGAIN) {
if (!have_latest) {
errno = EAGAIN;
return 1;
}
*latest_buf = latest_local;
return 0;
}
if (have_latest && ioctl(fd, VIDIOC_QBUF, &latest_local) < 0) {
perror("VIDIOC_QBUF");
}
errno = dq_errno;
return -1;
}
if (have_latest && ioctl(fd, VIDIOC_QBUF, &latest_local) < 0) {
int q_errno = errno;
perror("VIDIOC_QBUF");
if (ioctl(fd, VIDIOC_QBUF, &current) < 0) {
perror("VIDIOC_QBUF");
}
errno = q_errno;
return -1;
}
latest_local = current;
have_latest = true;
}
}
static AVCodecContext *create_mjpeg_decoder(const worker_config_t *cfg) {
const AVCodec *decoder = avcodec_find_decoder(AV_CODEC_ID_MJPEG);
AVCodecContext *ctx;
if (decoder == NULL) {
fprintf(stderr, "MJPEG decoder not found\n");
return NULL;
}
ctx = avcodec_alloc_context3(decoder);
if (ctx == NULL) {
return NULL;
}
ctx->width = cfg->capture_width;
ctx->height = cfg->capture_height;
ctx->pix_fmt = AV_PIX_FMT_YUVJ420P;
ctx->color_range = AVCOL_RANGE_JPEG;
ctx->thread_count = 1;
if (avcodec_open2(ctx, decoder, NULL) < 0) {
avcodec_free_context(&ctx);
return NULL;
}
return ctx;
}
static AVCodecContext *create_mjpeg_encoder(const worker_config_t *cfg) {
const AVCodec *encoder = avcodec_find_encoder(AV_CODEC_ID_MJPEG);
AVCodecContext *ctx;
if (encoder == NULL) {
fprintf(stderr, "MJPEG encoder not found\n");
return NULL;
}
ctx = avcodec_alloc_context3(encoder);
if (ctx == NULL) {
return NULL;
}
ctx->width = cfg->output_width;
ctx->height = cfg->output_height;
ctx->pix_fmt = AV_PIX_FMT_YUVJ420P;
ctx->time_base = (AVRational) {1, cfg->initial_fps > 0 ? cfg->initial_fps : 10};
ctx->qmin = 1;
ctx->qmax = 31;
ctx->flags |= AV_CODEC_FLAG_QSCALE;
ctx->global_quality = FF_QP2LAMBDA * cfg->initial_qscale;
if (avcodec_open2(ctx, encoder, NULL) < 0) {
avcodec_free_context(&ctx);
return NULL;
}
return ctx;
}
static int decode_mjpeg_frame(AVCodecContext *decoder, const uint8_t *data, int size, AVFrame **frame) {
AVPacket *pkt;
int ret;
*frame = NULL;
pkt = av_packet_alloc();
if (pkt == NULL) {
return -1;
}
pkt->data = (uint8_t *) data;
pkt->size = size;
ret = avcodec_send_packet(decoder, pkt);
if (ret < 0) {
av_packet_free(&pkt);
return -1;
}
*frame = av_frame_alloc();
if (*frame == NULL) {
av_packet_free(&pkt);
return -1;
}
ret = avcodec_receive_frame(decoder, *frame);
av_packet_free(&pkt);
if (ret < 0) {
av_frame_free(frame);
return -1;
}
return 0;
}
static struct SwsContext *create_scaler(AVFrame *src, const worker_config_t *cfg) {
return sws_getContext(
src->width,
src->height,
src->format,
cfg->output_width,
cfg->output_height,
AV_PIX_FMT_YUVJ420P,
SWS_BILINEAR,
NULL,
NULL,
NULL);
}
static int scale_frame(struct SwsContext *sws_ctx, const worker_config_t *cfg, AVFrame *src, AVFrame **dst) {
int ret;
if (sws_ctx == NULL) {
return -1;
}
*dst = av_frame_alloc();
if (*dst == NULL) {
return -1;
}
(*dst)->width = cfg->output_width;
(*dst)->height = cfg->output_height;
(*dst)->format = AV_PIX_FMT_YUVJ420P;
if (av_frame_get_buffer(*dst, 0) < 0) {
av_frame_free(dst);
return -1;
}
ret = sws_scale(
sws_ctx,
(const uint8_t *const *) src->data,
src->linesize,
0,
src->height,
(*dst)->data,
(*dst)->linesize);
if (ret < 0) {
av_frame_free(dst);
return -1;
}
return 0;
}
static int encode_frame(AVCodecContext *encoder, AVFrame *frame, int qscale, AVPacket **pkt) {
int ret;
*pkt = av_packet_alloc();
if (*pkt == NULL) {
return -1;
}
encoder->global_quality = FF_QP2LAMBDA * qscale;
frame->quality = encoder->global_quality;
ret = avcodec_send_frame(encoder, frame);
if (ret < 0) {
av_packet_free(pkt);
return -1;
}
ret = avcodec_receive_packet(encoder, *pkt);
if (ret < 0) {
av_packet_free(pkt);
return -1;
}
return 0;
}
static int video_sender_init(video_sender_t *sender, const worker_config_t *cfg) {
kcp_conn_options_t options;
if (sender == NULL) {
errno = EINVAL;
return -1;
}
CLEAR(*sender);
snprintf(sender->target_peer, sizeof(sender->target_peer), "%s", cfg->target_peer);
kcp_conn_options_set_video_defaults(&options);
sender->client = kcp_client_dial_with_options(
cfg->server_addr,
cfg->relay_addr,
cfg->peer_id,
cfg->bind_ip,
cfg->bind_device,
&options,
NULL,
NULL,
NULL,
cfg->stats_interval_ms);
if (sender->client == NULL) {
return -1;
}
fprintf(stderr, "B-side video worker connected as %s -> %s\n", kcp_client_id(sender->client), sender->target_peer);
return 0;
}
static int video_sender_send_packet(video_sender_t *sender, const AVPacket *encoded_pkt) {
if (sender == NULL || sender->client == NULL || encoded_pkt == NULL) {
errno = EINVAL;
return -1;
}
return kcp_client_send_binary(sender->client, sender->target_peer, encoded_pkt->data, (size_t) encoded_pkt->size);
}
static void video_sender_close(video_sender_t *sender) {
if (sender == NULL || sender->client == NULL) {
return;
}
kcp_client_close(sender->client);
kcp_client_free(sender->client);
sender->client = NULL;
}
static void *control_thread_main(void *opaque) {
control_thread_args_t *args = (control_thread_args_t *) opaque;
char line[512];
while (fgets(line, sizeof(line), args->control_stream) != NULL) {
cJSON *root = cJSON_Parse(line);
cJSON *type_item;
if (root == NULL) {
continue;
}
type_item = cJSON_GetObjectItemCaseSensitive(root, "type");
if (!cJSON_IsString(type_item) || type_item->valuestring == NULL) {
cJSON_Delete(root);
continue;
}
if (strcmp(type_item->valuestring, "shutdown") == 0) {
runtime_set_shutdown(args->runtime, true);
cJSON_Delete(root);
return NULL;
}
if (strcmp(type_item->valuestring, "set_profile") == 0) {
cJSON *fps = cJSON_GetObjectItemCaseSensitive(root, "fps");
cJSON *qscale = cJSON_GetObjectItemCaseSensitive(root, "jpeg_quality_qscale");
cJSON *max_frame_bytes = cJSON_GetObjectItemCaseSensitive(root, "max_frame_bytes");
runtime_set_profile(
args->runtime,
cJSON_IsNumber(fps) ? fps->valueint : -1,
cJSON_IsNumber(qscale) ? qscale->valueint : -1,
cJSON_IsNumber(max_frame_bytes) ? max_frame_bytes->valueint : -1);
}
cJSON_Delete(root);
}
runtime_set_shutdown(args->runtime, true);
return NULL;
}
int main(void) {
worker_config_t cfg;
runtime_state_t runtime;
video_sender_t sender;
control_thread_args_t control_args;
pthread_t control_thread;
FILE *control_stream = NULL;
FILE *telemetry_stream = NULL;
Buffer *buffers = NULL;
int num_buffers = 0;
int camera_fd = -1;
int control_fd = env_as_int(WORKER_CONTROL_FD_ENV, WORKER_CONTROL_FD);
int telemetry_fd = env_as_int(WORKER_TELEMETRY_FD_ENV, WORKER_TELEMETRY_FD);
enum v4l2_buf_type stream_type = V4L2_BUF_TYPE_VIDEO_CAPTURE;
AVCodecContext *decoder = NULL;
AVCodecContext *encoder = NULL;
struct SwsContext *sws_ctx = NULL;
double next_deadline_ms = 0.0;
double next_metrics_ms = 0.0;
int exit_code = 1;
int index;
int sws_src_width = 0;
int sws_src_height = 0;
int sws_src_format = -1;
bool control_thread_started = false;
av_log_set_level(AV_LOG_ERROR);
signal(SIGINT, handle_signal);
signal(SIGTERM, handle_signal);
CLEAR(sender);
if (load_worker_config(&cfg) != 0) {
return 1;
}
CLEAR(runtime);
pthread_mutex_init(&runtime.lock, NULL);
pthread_mutex_init(&runtime.telemetry_lock, NULL);
runtime.target_fps = cfg.initial_fps;
runtime.jpeg_quality_qscale = cfg.initial_qscale;
runtime.max_frame_bytes = cfg.initial_max_frame_bytes;
control_stream = fdopen(control_fd, "r");
telemetry_stream = fdopen(telemetry_fd, "w");
if (control_stream == NULL || telemetry_stream == NULL) {
perror("fdopen worker control/telemetry");
goto cleanup;
}
setvbuf(telemetry_stream, NULL, _IOLBF, 0);
runtime.telemetry_stream = telemetry_stream;
control_args.control_stream = control_stream;
control_args.runtime = &runtime;
if (pthread_create(&control_thread, NULL, control_thread_main, &control_args) != 0) {
perror("pthread_create");
goto cleanup;
}
control_thread_started = true;
camera_fd = open_v4l2_device(cfg.video_device);
if (camera_fd < 0) {
goto cleanup_join_thread;
}
if (init_v4l2_device(camera_fd, &cfg) != 0) {
goto cleanup_join_thread;
}
if (init_mmap(camera_fd, &buffers, &num_buffers) != 0) {
goto cleanup_join_thread;
}
if (queue_all_buffers(camera_fd, num_buffers) != 0) {
goto cleanup_join_thread;
}
if (ioctl(camera_fd, VIDIOC_STREAMON, &stream_type) < 0) {
perror("VIDIOC_STREAMON");
goto cleanup_join_thread;
}
decoder = create_mjpeg_decoder(&cfg);
encoder = create_mjpeg_encoder(&cfg);
if (decoder == NULL || encoder == NULL) {
fprintf(stderr, "failed to create codecs\n");
goto cleanup_join_thread;
}
if (video_sender_init(&sender, &cfg) != 0) {
perror("video_sender_init");
goto cleanup_join_thread;
}
next_deadline_ms = monotonic_ms();
next_metrics_ms = next_deadline_ms + 100.0;
while (!runtime_should_stop(&runtime)) {
AVFrame *decoded_frame = NULL;
AVFrame *scaled_frame = NULL;
AVPacket *encoded_pkt = NULL;
struct v4l2_buffer buf;
fd_set fds;
struct timeval tv;
int select_result;
int fps;
int qscale;
int max_frame_bytes;
int encode_us = 0;
bool sent = false;
const char *drop_reason = "";
double now_ms = monotonic_ms();
double encode_start_ms;
double encode_end_ms;
runtime_get_profile(&runtime, &fps, &qscale, &max_frame_bytes);
if (fps < 1) {
fps = 1;
}
FD_ZERO(&fds);
FD_SET(camera_fd, &fds);
tv.tv_sec = 2;
tv.tv_usec = 0;
select_result = select(camera_fd + 1, &fds, NULL, NULL, &tv);
if (select_result <= 0) {
if (select_result < 0 && errno == EINTR) {
continue;
}
fprintf(stderr, "select timeout/error on camera\n");
continue;
}
if (dequeue_latest_buffer(camera_fd, &buf) != 0) {
if (errno == EAGAIN) {
continue;
}
perror("VIDIOC_DQBUF");
break;
}
now_ms = monotonic_ms();
if (now_ms < next_deadline_ms) {
drop_reason = "paced_drop";
goto requeue_and_report;
}
next_deadline_ms = now_ms + (1000.0 / (double) fps);
if (decode_mjpeg_frame(decoder, (uint8_t *) buffers[buf.index].start, (int) buf.bytesused, &decoded_frame) != 0) {
drop_reason = "decode_failed";
goto requeue_and_report;
}
if (
sws_ctx == NULL
|| sws_src_width != decoded_frame->width
|| sws_src_height != decoded_frame->height
|| sws_src_format != decoded_frame->format
) {
sws_freeContext(sws_ctx);
sws_ctx = create_scaler(decoded_frame, &cfg);
sws_src_width = decoded_frame->width;
sws_src_height = decoded_frame->height;
sws_src_format = decoded_frame->format;
}
if (scale_frame(sws_ctx, &cfg, decoded_frame, &scaled_frame) != 0) {
drop_reason = "scale_failed";
goto requeue_and_report;
}
encode_start_ms = monotonic_ms();
if (encode_frame(encoder, scaled_frame, qscale, &encoded_pkt) != 0) {
drop_reason = "encode_failed";
goto requeue_and_report;
}
encode_end_ms = monotonic_ms();
encode_us = (int) ((encode_end_ms - encode_start_ms) * 1000.0);
if (encoded_pkt->size > max_frame_bytes) {
drop_reason = "frame_too_large";
goto requeue_and_report;
}
if (video_sender_send_packet(&sender, encoded_pkt) != 0) {
perror("video_sender_send_packet");
drop_reason = "send_failed";
goto requeue_and_report;
}
sent = true;
requeue_and_report:
telemetry_write_frame_stat(
&runtime,
encoded_pkt != NULL ? encoded_pkt->size : 0,
encode_us,
sent,
drop_reason);
if (ioctl(camera_fd, VIDIOC_QBUF, &buf) < 0) {
perror("VIDIOC_QBUF");
av_frame_free(&decoded_frame);
av_frame_free(&scaled_frame);
if (encoded_pkt != NULL) {
av_packet_free(&encoded_pkt);
}
break;
}
av_frame_free(&decoded_frame);
av_frame_free(&scaled_frame);
if (encoded_pkt != NULL) {
av_packet_free(&encoded_pkt);
}
now_ms = monotonic_ms();
if (sender.client != NULL && now_ms >= next_metrics_ms) {
kcp_conn_metrics_t metrics;
if (kcp_client_metrics_snapshot(sender.client, &metrics) == 0) {
telemetry_write_kcp_metrics(&runtime, &metrics);
}
next_metrics_ms = now_ms + 100.0;
}
}
exit_code = 0;
cleanup_join_thread:
runtime_set_shutdown(&runtime, true);
if (control_stream != NULL) {
fclose(control_stream);
control_stream = NULL;
}
if (control_thread_started) {
pthread_join(control_thread, NULL);
}
cleanup:
if (control_stream != NULL) {
fclose(control_stream);
control_stream = NULL;
}
if (camera_fd >= 0) {
ioctl(camera_fd, VIDIOC_STREAMOFF, &stream_type);
}
if (buffers != NULL) {
for (index = 0; index < num_buffers; ++index) {
if (buffers[index].start != NULL && buffers[index].start != MAP_FAILED) {
munmap(buffers[index].start, buffers[index].length);
}
}
free(buffers);
}
video_sender_close(&sender);
if (encoder != NULL) {
avcodec_free_context(&encoder);
}
if (decoder != NULL) {
avcodec_free_context(&decoder);
}
sws_freeContext(sws_ctx);
if (camera_fd >= 0) {
close(camera_fd);
}
if (telemetry_stream != NULL) {
fclose(telemetry_stream);
telemetry_stream = NULL;
}
pthread_mutex_destroy(&runtime.lock);
pthread_mutex_destroy(&runtime.telemetry_lock);
return exit_code;
}

View File

@@ -0,0 +1,652 @@
// camera_pipeline_ifdef_fixed.c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/ioctl.h>
#include <sys/mman.h>
#include <sys/time.h>
#include <linux/videodev2.h>
#include <time.h>
// FFmpeg头文件 - 使用纯C包含
#include <libavcodec/avcodec.h>
#include <libavformat/avformat.h>
#include <libavutil/avutil.h>
#include <libavutil/imgutils.h>
#include <libavutil/opt.h>
#include <libswscale/swscale.h>
#include "peer_kcp_client.h"
// ==========================================
// 1. 配置区域:在这里开启或关闭时间打印
// ==========================================
#define DEBUG_TIMING // 注释掉这一行,所有时间打印和计算都会消失
// 定义打印宏
#ifdef DEBUG_TIMING
#define PRINT_TIME(fmt, ...) printf(fmt, ##__VA_ARGS__)
#else
#define PRINT_TIME(fmt, ...) // 什么都不做,编译器会优化掉
#endif
#define WIDTH 1280
#define HEIGHT 720
#define OUTPUT_WIDTH 640
#define OUTPUT_HEIGHT 360
#define NUM_BUFFERS 4
#define CLEAR(x) memset(&(x), 0, sizeof(x))
#define VIDEO_SERVER_ADDR_ENV "OMNI_VIDEO_SERVER_ADDR"
#define VIDEO_RELAY_ADDR_ENV "OMNI_VIDEO_RELAY_VIA"
#define VIDEO_BIND_IP_ENV "OMNI_VIDEO_BIND_IP"
#define VIDEO_BIND_DEVICE_ENV "OMNI_VIDEO_BIND_DEVICE"
#define VIDEO_PEER_ID_ENV "OMNI_VIDEO_PEER_ID"
#define VIDEO_TARGET_PEER_ENV "OMNI_VIDEO_TARGET_PEER"
#define VIDEO_DEFAULT_PEER_ID "peer-b-video"
#define VIDEO_DEFAULT_TARGET_PEER "peer-a-video"
typedef struct
{
kcp_client_t *client;
char target_peer[OMNI_MAX_PEER_ID];
} VideoSender;
static int video_sender_init(VideoSender *sender);
static int video_sender_send_packet(VideoSender *sender, const AVPacket *encoded_pkt);
static void video_sender_close(VideoSender *sender);
typedef struct
{
void *start;
size_t length;
} Buffer;
double get_time_ms()
{
struct timespec ts;
clock_gettime(CLOCK_MONOTONIC, &ts);
return ts.tv_sec * 1000.0 + ts.tv_nsec / 1000000.0;
}
static const char *env_or_default(const char *name, const char *fallback)
{
const char *value = getenv(name);
if (value != NULL && value[0] != '\0')
return value;
return fallback;
}
static int video_sender_init(VideoSender *sender)
{
const char *server_addr = getenv(VIDEO_SERVER_ADDR_ENV);
const char *relay_addr = env_or_default(VIDEO_RELAY_ADDR_ENV, "");
const char *bind_ip = env_or_default(VIDEO_BIND_IP_ENV, "");
const char *bind_device = env_or_default(VIDEO_BIND_DEVICE_ENV, "");
const char *peer_id = env_or_default(VIDEO_PEER_ID_ENV, VIDEO_DEFAULT_PEER_ID);
const char *target_peer = env_or_default(VIDEO_TARGET_PEER_ENV, VIDEO_DEFAULT_TARGET_PEER);
kcp_conn_options_t options;
if (sender == NULL)
{
errno = EINVAL;
return -1;
}
if (server_addr == NULL || server_addr[0] == '\0')
{
errno = EINVAL;
fprintf(stderr, "%s is required\n", VIDEO_SERVER_ADDR_ENV);
return -1;
}
memset(sender, 0, sizeof(*sender));
snprintf(sender->target_peer, sizeof(sender->target_peer), "%s", target_peer);
kcp_conn_options_set_video_defaults(&options);
sender->client = kcp_client_dial_with_options(
server_addr,
relay_addr,
peer_id,
bind_ip,
bind_device,
&options,
NULL,
NULL,
NULL,
KCP_DEFAULT_STATS_INTERVAL_MS);
if (sender->client == NULL)
return -1;
fprintf(stderr, "Video sender connected as %s -> %s\n",
kcp_client_id(sender->client), sender->target_peer);
return 0;
}
static int video_sender_send_packet(VideoSender *sender, const AVPacket *encoded_pkt)
{
if (sender == NULL || sender->client == NULL || encoded_pkt == NULL)
{
errno = EINVAL;
return -1;
}
return kcp_client_send_binary(
sender->client,
sender->target_peer,
encoded_pkt->data,
(size_t)encoded_pkt->size);
}
static void video_sender_close(VideoSender *sender)
{
if (sender == NULL || sender->client == NULL)
return;
kcp_client_close(sender->client);
kcp_client_free(sender->client);
sender->client = NULL;
}
int open_v4l2_device(const char *device)
{
int fd = open(device, O_RDWR | O_NONBLOCK);
if (fd < 0)
{
perror("open device");
return -1;
}
return fd;
}
int init_v4l2_device(int fd)
{
struct v4l2_format fmt = {0};
fmt.type = V4L2_BUF_TYPE_VIDEO_CAPTURE;
fmt.fmt.pix.width = WIDTH;
fmt.fmt.pix.height = HEIGHT;
fmt.fmt.pix.pixelformat = V4L2_PIX_FMT_MJPEG;
fmt.fmt.pix.field = V4L2_FIELD_NONE;
if (ioctl(fd, VIDIOC_S_FMT, &fmt) < 0)
{
perror("VIDIOC_S_FMT");
return -1;
}
PRINT_TIME("Set format: %dx%d MJPEG\n", WIDTH, HEIGHT);
return 0;
}
int init_mmap(int fd, Buffer **buffers, int *num_buffers)
{
struct v4l2_requestbuffers req = {0};
req.count = NUM_BUFFERS;
req.type = V4L2_BUF_TYPE_VIDEO_CAPTURE;
req.memory = V4L2_MEMORY_MMAP;
if (ioctl(fd, VIDIOC_REQBUFS, &req) < 0)
{
perror("VIDIOC_REQBUFS");
return -1;
}
*num_buffers = req.count;
*buffers = (Buffer *)calloc(req.count, sizeof(Buffer));
for (int i = 0; i < req.count; i++)
{
struct v4l2_buffer buf = {0};
buf.type = V4L2_BUF_TYPE_VIDEO_CAPTURE;
buf.memory = V4L2_MEMORY_MMAP;
buf.index = i;
if (ioctl(fd, VIDIOC_QUERYBUF, &buf) < 0)
{
perror("VIDIOC_QUERYBUF");
return -1;
}
(*buffers)[i].length = buf.length;
(*buffers)[i].start = mmap(NULL, buf.length,
PROT_READ | PROT_WRITE,
MAP_SHARED, fd, buf.m.offset);
if ((*buffers)[i].start == MAP_FAILED)
{
perror("mmap");
return -1;
}
}
return 0;
}
AVCodecContext *create_mjpeg_decoder()
{
const AVCodec *decoder = avcodec_find_decoder(AV_CODEC_ID_MJPEG);
if (!decoder)
{
fprintf(stderr, "MJPEG decoder not found\n");
return NULL;
}
AVCodecContext *ctx = avcodec_alloc_context3(decoder);
ctx->width = WIDTH;
ctx->height = HEIGHT;
ctx->pix_fmt = AV_PIX_FMT_YUVJ420P; // 使用YUVJ420P
ctx->color_range = AVCOL_RANGE_JPEG; // JPEG范围
ctx->thread_count = 1;
AVDictionary *opts = NULL;
av_dict_set(&opts, "flags2", "+fast", 0);
if (avcodec_open2(ctx, decoder, &opts) < 0)
{
avcodec_free_context(&ctx);
av_dict_free(&opts);
return NULL;
}
av_dict_free(&opts);
printf("Decoder created with format: YUVJ420P\n");
return ctx;
}
AVCodecContext *create_mjpeg_encoder()
{
const AVCodec *encoder = avcodec_find_encoder(AV_CODEC_ID_MJPEG);
if (!encoder)
{
fprintf(stderr, "MJPEG encoder not found\n");
return NULL;
}
AVCodecContext *ctx = avcodec_alloc_context3(encoder);
ctx->width = OUTPUT_WIDTH;
ctx->height = OUTPUT_HEIGHT;
ctx->pix_fmt = AV_PIX_FMT_YUVJ420P; // 使用YUVJ420P
ctx->time_base = (AVRational){1, 30};
ctx->qmin = 8;
ctx->qmax = 31;
ctx->flags |= AV_CODEC_FLAG_QSCALE;
ctx->global_quality = FF_QP2LAMBDA * 5;
AVDictionary *opts = NULL;
av_dict_set(&opts, "huffman", "default", 0);
if (avcodec_open2(ctx, encoder, &opts) < 0)
{
avcodec_free_context(&ctx);
av_dict_free(&opts);
return NULL;
}
av_dict_free(&opts);
printf("Encoder created with format: YUVJ420P\n");
return ctx;
}
int decode_mjpeg_frame(AVCodecContext *decoder, const uint8_t *data, int size, AVFrame **frame)
{
if (frame == NULL)
return -1;
*frame = NULL;
AVPacket *pkt = av_packet_alloc();
if (!pkt)
return -1;
pkt->data = (uint8_t *)data;
pkt->size = size;
int ret = avcodec_send_packet(decoder, pkt);
if (ret < 0)
{
av_packet_free(&pkt);
return -1;
}
*frame = av_frame_alloc();
if (!*frame)
{
av_packet_free(&pkt);
return -1;
}
ret = avcodec_receive_frame(decoder, *frame);
av_packet_free(&pkt);
if (ret < 0)
av_frame_free(frame);
return ret;
}
int scale_frame(AVFrame *src, AVFrame **dst)
{
// 简单缩放,不设置复杂的色彩空间参数
struct SwsContext *sws_ctx = sws_getContext(
src->width, src->height, src->format,
OUTPUT_WIDTH, OUTPUT_HEIGHT, AV_PIX_FMT_YUVJ420P,
SWS_BILINEAR, NULL, NULL, NULL);
if (!sws_ctx)
{
fprintf(stderr, "Failed to create sws context\n");
return -1;
}
*dst = av_frame_alloc();
if (!*dst)
{
sws_freeContext(sws_ctx);
return -1;
}
(*dst)->width = OUTPUT_WIDTH;
(*dst)->height = OUTPUT_HEIGHT;
(*dst)->format = AV_PIX_FMT_YUVJ420P;
if (av_frame_get_buffer(*dst, 0) < 0)
{
fprintf(stderr, "Failed to allocate frame buffer\n");
av_frame_free(dst);
sws_freeContext(sws_ctx);
return -1;
}
int ret = sws_scale(sws_ctx, (const uint8_t *const *)src->data, src->linesize,
0, src->height, (*dst)->data, (*dst)->linesize);
sws_freeContext(sws_ctx);
if (ret < 0)
{
fprintf(stderr, "sws_scale failed\n");
av_frame_free(dst);
return -1;
}
return 0;
}
int encode_frame(AVCodecContext *encoder, AVFrame *frame, AVPacket **pkt)
{
if (pkt == NULL)
return -1;
*pkt = NULL;
*pkt = av_packet_alloc();
if (!*pkt)
return -1;
int ret = avcodec_send_frame(encoder, frame);
if (ret < 0)
{
av_packet_free(pkt);
return -1;
}
ret = avcodec_receive_packet(encoder, *pkt);
if (ret < 0)
av_packet_free(pkt);
return ret;
}
int main()
{
VideoSender sender;
memset(&sender, 0, sizeof(sender));
PRINT_TIME("=== V4L2 Direct Capture + FFmpeg Processing ===\n");
// 1. Open V4L2 device
int fd = open_v4l2_device("/dev/video0");
if (fd < 0)
{
fprintf(stderr, "Failed to open camera device\n");
return 1;
}
// 2. Initialize V4L2
if (init_v4l2_device(fd) < 0)
{
close(fd);
return 1;
}
// 3. Setup MMAP buffers
Buffer *buffers = NULL;
int num_buffers = 0;
if (init_mmap(fd, &buffers, &num_buffers) < 0)
{
close(fd);
return 1;
}
// 4. Create FFmpeg codecs
AVCodecContext *decoder = create_mjpeg_decoder();
AVCodecContext *encoder = create_mjpeg_encoder();
if (!decoder || !encoder)
{
fprintf(stderr, "Failed to create codecs\n");
close(fd);
return 1;
}
if (video_sender_init(&sender) < 0)
{
perror("video_sender_init");
video_sender_close(&sender);
avcodec_free_context(&encoder);
avcodec_free_context(&decoder);
close(fd);
return 1;
}
// 5. Queue buffers
for (int i = 0; i < num_buffers; i++)
{
struct v4l2_buffer buf = {0};
buf.type = V4L2_BUF_TYPE_VIDEO_CAPTURE;
buf.memory = V4L2_MEMORY_MMAP;
buf.index = i;
if (ioctl(fd, VIDIOC_QBUF, &buf) < 0)
{
perror("VIDIOC_QBUF");
close(fd);
return 1;
}
}
// 6. Start streaming
enum v4l2_buf_type type = V4L2_BUF_TYPE_VIDEO_CAPTURE;
if (ioctl(fd, VIDIOC_STREAMON, &type) < 0)
{
perror("VIDIOC_STREAMON");
close(fd);
return 1;
}
// 7. Benchmark
// 使用宏控制打印表头
PRINT_TIME("\nRunning benchmark (100 frames)...\n");
PRINT_TIME("Frame | Capture | Decode | Scale | Encode | Total | Size | Marker\n");
PRINT_TIME("------|---------|--------|-------|--------|-------|------|--------\n");
for (int i = 0; i < 100; i++)
{
// 只有在开启 DEBUG_TIMING 时才声明这些时间变量
#ifdef DEBUG_TIMING
double total_start = get_time_ms();
double capture_start, capture_end;
double decode_start, decode_end;
double scale_start, scale_end;
double encode_start, encode_end;
#endif
// Wait for frame
fd_set fds;
FD_ZERO(&fds);
FD_SET(fd, &fds);
struct timeval tv = {2, 0};
int r = select(fd + 1, &fds, NULL, NULL, &tv);
if (r <= 0)
{
PRINT_TIME("Timeout waiting for frame\n");
break;
}
#ifdef DEBUG_TIMING
capture_start = get_time_ms();
#endif
// Dequeue buffer
struct v4l2_buffer buf = {0};
buf.type = V4L2_BUF_TYPE_VIDEO_CAPTURE;
buf.memory = V4L2_MEMORY_MMAP;
if (ioctl(fd, VIDIOC_DQBUF, &buf) < 0)
{
perror("VIDIOC_DQBUF");
break;
}
#ifdef DEBUG_TIMING
capture_end = get_time_ms();
#endif
// Decode
#ifdef DEBUG_TIMING
decode_start = get_time_ms();
#endif
AVFrame *decoded_frame = NULL;
int ret = decode_mjpeg_frame(decoder,
(uint8_t *)buffers[buf.index].start, buf.bytesused, &decoded_frame);
#ifdef DEBUG_TIMING
decode_end = get_time_ms();
#endif
if (ret < 0 || !decoded_frame)
{
PRINT_TIME("Frame %d: Decode failed\n", i + 1);
ioctl(fd, VIDIOC_QBUF, &buf);
continue;
}
// Scale
#ifdef DEBUG_TIMING
scale_start = get_time_ms();
#endif
AVFrame *scaled_frame = NULL;
if (scale_frame(decoded_frame, &scaled_frame) < 0)
{
PRINT_TIME("Frame %d: Scale failed\n", i + 1);
av_frame_free(&decoded_frame);
ioctl(fd, VIDIOC_QBUF, &buf);
continue;
}
#ifdef DEBUG_TIMING
scale_end = get_time_ms();
#endif
// Encode
#ifdef DEBUG_TIMING
encode_start = get_time_ms();
#endif
AVPacket *encoded_pkt = NULL;
if (encode_frame(encoder, scaled_frame, &encoded_pkt) < 0)
{
PRINT_TIME("Frame %d: Encode failed\n", i + 1);
}
#ifdef DEBUG_TIMING
if (encoded_pkt && i % 50 == 0)
{
char filename[100];
sprintf(filename, "frame_%04d.jpg", i + 1);
FILE *f = fopen(filename, "wb");
if (f)
{
fwrite(encoded_pkt->data, 1, encoded_pkt->size, f);
fclose(f);
PRINT_TIME("Saved as %s\n", filename);
}
}
#endif
#ifdef DEBUG_TIMING
encode_end = get_time_ms();
double total_end = get_time_ms();
#endif
// 打印结果
#ifdef DEBUG_TIMING
PRINT_TIME("%5d | %7.1f | %6.1f | %5.1f | %6.1f | %5.1f | %4d KB | 0x%02x\n",
i + 1,
capture_end - capture_start,
decode_end - decode_start,
scale_end - scale_start,
encode_end - encode_start,
total_end - total_start,
encoded_pkt ? encoded_pkt->size / 1024 : 0,
encoded_pkt && encoded_pkt->size > 1 ? encoded_pkt->data[1] : 0);
#else
// 如果不开启宏,也打印一些基本信息
printf("Frame %d processed\n", i + 1);
#endif
if (encoded_pkt && video_sender_send_packet(&sender, encoded_pkt) != 0)
{
perror("video_sender_send_packet");
av_frame_free(&decoded_frame);
av_frame_free(&scaled_frame);
av_packet_free(&encoded_pkt);
ioctl(fd, VIDIOC_QBUF, &buf);
break;
}
// Cleanup
av_frame_free(&decoded_frame);
av_frame_free(&scaled_frame);
if (encoded_pkt)
av_packet_free(&encoded_pkt);
// Requeue buffer
if (ioctl(fd, VIDIOC_QBUF, &buf) < 0)
{
perror("VIDIOC_QBUF");
break;
}
}
// 8. Stop streaming
ioctl(fd, VIDIOC_STREAMOFF, &type);
// 9. Cleanup
for (int i = 0; i < num_buffers; i++)
{
if (buffers[i].start != MAP_FAILED)
{
munmap(buffers[i].start, buffers[i].length);
}
}
free(buffers);
video_sender_close(&sender);
avcodec_free_context(&encoder);
avcodec_free_context(&decoder);
close(fd);
return 0;
}
// gcc -o v1_camera_pipeline_ifdef v1_camera_pipeline_ifdef.c $(pkg-config --cflags --libs libavformat libavcodec libavutil libswscale)

View File

@@ -0,0 +1,51 @@
transport:
server_addr: "81.70.156.140:10909"
relay_via: "106.55.173.235:10909"
bind_ip: ""
bind_device: ""
control_sender:
peer_id: "peer-a-ctrl"
target_peer: "peer-b-ctrl"
nodelay: 1
interval_ms: 5
resend: 2
nc: 1
sndwnd: 32
rcvwnd: 32
mtu: 1400
stats_interval_ms: 100
video_receiver:
peer_id: "peer-a-video"
buffer_bytes: 1048576
nodelay: 1
interval_ms: 10
resend: 2
nc: 1
sndwnd: 256
rcvwnd: 256
mtu: 1400
stats_interval_ms: 100
daemon:
socket_path: "/tmp/omnisocket-a-side.sock"
reconnect_delay_ms: 2000
telemetry_interval_ms: 100
analog_send_hz: 100
frame_stale_ms: 500
policy:
health_window_ms: 2000
green_srtt_ms: 35
yellow_srtt_ms: 60
retrans_red_threshold: 10
profile_green:
fps: 15
max_frame_kb: 60
profile_yellow:
fps: 10
max_frame_kb: 40
profile_red:
fps: 5
max_frame_kb: 20

View File

@@ -0,0 +1,58 @@
transport:
server_addr: ""
relay_via: "81.70.156.140:10909"
bind_ip: ""
bind_device: ""
control_receiver:
peer_id: "peer-b-ctrl"
nodelay: 1
interval_ms: 5
resend: 2
nc: 1
sndwnd: 32
rcvwnd: 32
mtu: 1400
stats_interval_ms: 100
queue_capacity: 256
video_sender:
enabled: true
peer_id: "peer-b-video"
target_peer: "peer-a-video"
binary_path: "bin/b_side_video_sender"
device: "/dev/video0"
capture_width: 1280
capture_height: 720
output_width: 640
output_height: 360
fps: 10
jpeg_quality_qscale: 8
max_frame_bytes: 40960
stats_interval_ms: 100
daemon:
socket_path: "/tmp/omnisocket-b-side.sock"
ctrl_socket_path: "/tmp/omnisocket-b-ctrl.sock"
reconnect_delay_ms: 2000
telemetry_interval_ms: 100
worker_restart_delay_ms: 2000
policy:
mode: "manual"
health_window_ms: 2000
green_srtt_ms: 30
yellow_srtt_ms: 55
retrans_red_threshold: 8
profile_green:
fps: 10
jpeg_quality_qscale: 8
max_frame_bytes: 40960
profile_yellow:
fps: 7
jpeg_quality_qscale: 12
max_frame_bytes: 28672
profile_red:
fps: 5
jpeg_quality_qscale: 16
max_frame_bytes: 20480

View File

@@ -27,6 +27,7 @@ int kcp_client_receive_timed(kcp_client_t *client, message_t *out_msg, int timeo
int kcp_client_receive(kcp_client_t *client, message_t *out_msg);
int kcp_client_receive_binary_into(kcp_client_t *client, void *buffer, size_t buffer_len, kcp_client_recv_meta_t *out_meta, int timeout_ms);
int kcp_client_persist_message(kcp_client_t *client, const message_t *msg, const char *inbox_dir, char *out_path, size_t out_path_len);
int kcp_client_metrics_snapshot(kcp_client_t *client, kcp_conn_metrics_t *out_metrics);
int kcp_client_close(kcp_client_t *client);
void kcp_client_free(kcp_client_t *client);

View File

@@ -43,6 +43,32 @@ extern "C" {
typedef struct kcp_conn kcp_conn_t;
typedef struct kcp_listener kcp_listener_t;
typedef struct kcp_conn_metrics {
int connected;
int has_conv;
uint32_t conv;
char local_addr[OMNI_MAX_ADDR_TEXT];
char remote_addr[OMNI_MAX_ADDR_TEXT];
uint32_t rto_ms;
int32_t srtt_ms;
int32_t srttvar_ms;
uint64_t bytes_sent;
uint64_t bytes_received;
uint64_t in_pkts;
uint64_t out_pkts;
uint64_t in_segs;
uint64_t out_segs;
uint64_t retrans_segs;
uint64_t fast_retrans_segs;
uint64_t early_retrans_segs;
uint64_t lost_segs;
uint64_t repeat_segs;
uint64_t in_errs;
uint64_t kcp_in_errs;
uint64_t ring_buffer_snd_queue;
uint64_t ring_buffer_rcv_queue;
uint64_t ring_buffer_snd_buffer;
} kcp_conn_metrics_t;
typedef struct kcp_conn_options {
int nodelay;
int interval_ms;
@@ -68,6 +94,7 @@ int kcp_conn_close(kcp_conn_t *conn);
void kcp_conn_free(kcp_conn_t *conn);
uint32_t kcp_conn_conv(const kcp_conn_t *conn);
int kcp_conn_local_addr(const kcp_conn_t *conn, struct sockaddr_storage *addr, socklen_t *addr_len);
int kcp_conn_metrics_snapshot(kcp_conn_t *conn, kcp_conn_metrics_t *out_metrics);
kcp_listener_t *kcp_listener_listen(const char *listen_addr, const char *bind_device, kcp_packet_debug_logger_t *packet_logger, const char *node_role, const char *node_id);
kcp_conn_t *kcp_listener_accept(kcp_listener_t *listener);

View File

@@ -8,15 +8,26 @@ typedef struct PyOmniSession {
omnisocket_session_t session;
} PyOmniSession;
static const char *PyOmniSession_recv_doc =
"recv(timeout_ms=-1) -> (from_peer, msg_type, payload) | None";
PyDoc_STRVAR(
PyOmniSession_recv_doc,
"recv(timeout_ms=-1) -> (from_peer, msg_type, payload) | None"
);
static const char *PyOmniSession_recv_into_doc =
PyDoc_STRVAR(
PyOmniSession_recv_into_doc,
"recv_into(buffer, timeout_ms=-1) -> dict | None\n"
"\n"
"The writable buffer must be large enough for the full message body.\n"
"If it is too small, BufferError reports the required size but the\n"
"current frame has already been consumed and is lost.";
"current frame has already been consumed and is lost."
);
PyDoc_STRVAR(
PyOmniSession_kcp_metrics_doc,
"kcp_metrics() -> dict\n"
"\n"
"Return a snapshot of low-level KCP metrics for the current session."
);
static PyObject *PyOmniSession_new(PyTypeObject *type, PyObject *args, PyObject *kwargs) {
PyOmniSession *self;
@@ -277,6 +288,67 @@ static PyObject *PyOmniSession_stats(PyOmniSession *self, PyObject *Py_UNUSED(ig
);
}
static PyObject *PyOmniSession_kcp_metrics(PyOmniSession *self, PyObject *Py_UNUSED(ignored)) {
omnisocket_session_kcp_metrics_t metrics;
memset(&metrics, 0, sizeof(metrics));
if (omnisocket_session_kcp_metrics_snapshot(&self->session, &metrics) != 0) {
return PyErr_SetFromErrno(PyExc_OSError);
}
return Py_BuildValue(
"{s:i,s:i,s:I,s:s,s:s,s:I,s:i,s:i,s:K,s:K,s:K,s:K,s:K,s:K,s:K,s:K,s:K,s:K,s:K,s:K,s:K,s:K,s:K}",
"connected",
metrics.connected,
"has_conv",
metrics.has_conv,
"conv",
metrics.conv,
"local_addr",
metrics.local_addr,
"remote_addr",
metrics.remote_addr,
"rto_ms",
metrics.rto_ms,
"srtt_ms",
metrics.srtt_ms,
"srttvar_ms",
metrics.srttvar_ms,
"bytes_sent",
(unsigned long long) metrics.bytes_sent,
"bytes_received",
(unsigned long long) metrics.bytes_received,
"in_pkts",
(unsigned long long) metrics.in_pkts,
"out_pkts",
(unsigned long long) metrics.out_pkts,
"in_segs",
(unsigned long long) metrics.in_segs,
"out_segs",
(unsigned long long) metrics.out_segs,
"retrans_segs",
(unsigned long long) metrics.retrans_segs,
"fast_retrans_segs",
(unsigned long long) metrics.fast_retrans_segs,
"early_retrans_segs",
(unsigned long long) metrics.early_retrans_segs,
"lost_segs",
(unsigned long long) metrics.lost_segs,
"repeat_segs",
(unsigned long long) metrics.repeat_segs,
"in_errs",
(unsigned long long) metrics.in_errs,
"kcp_in_errs",
(unsigned long long) metrics.kcp_in_errs,
"ring_buffer_snd_queue",
(unsigned long long) metrics.ring_buffer_snd_queue,
"ring_buffer_rcv_queue",
(unsigned long long) metrics.ring_buffer_rcv_queue,
"ring_buffer_snd_buffer",
(unsigned long long) metrics.ring_buffer_snd_buffer
);
}
static PyMethodDef PyOmniSession_methods[] = {
{"connect", (PyCFunction) PyOmniSession_connect, METH_VARARGS | METH_KEYWORDS, NULL},
{"close", (PyCFunction) PyOmniSession_close, METH_NOARGS, NULL},
@@ -284,6 +356,7 @@ static PyMethodDef PyOmniSession_methods[] = {
{"recv", (PyCFunction) PyOmniSession_recv, METH_VARARGS | METH_KEYWORDS, PyOmniSession_recv_doc},
{"recv_into", (PyCFunction) PyOmniSession_recv_into, METH_VARARGS | METH_KEYWORDS, PyOmniSession_recv_into_doc},
{"stats", (PyCFunction) PyOmniSession_stats, METH_NOARGS, NULL},
{"kcp_metrics", (PyCFunction) PyOmniSession_kcp_metrics, METH_NOARGS, PyOmniSession_kcp_metrics_doc},
{NULL, NULL, 0, NULL}
};

View File

@@ -246,3 +246,72 @@ void omnisocket_session_stats_snapshot(omnisocket_session_t *session, omnisocket
*out_stats = session->stats;
pthread_mutex_unlock(&session->mutex);
}
int omnisocket_session_kcp_metrics_snapshot(
omnisocket_session_t *session,
omnisocket_session_kcp_metrics_t *out_metrics
) {
kcp_client_t *client = NULL;
kcp_conn_metrics_t metrics;
int rc = 0;
if (session == NULL || out_metrics == NULL) {
errno = EINVAL;
return -1;
}
memset(out_metrics, 0, sizeof(*out_metrics));
pthread_mutex_lock(&session->mutex);
if (session->client != NULL && !session->closing) {
client = session->client;
session->active_ops += 1;
}
pthread_mutex_unlock(&session->mutex);
if (client == NULL) {
return 0;
}
memset(&metrics, 0, sizeof(metrics));
rc = kcp_client_metrics_snapshot(client, &metrics);
pthread_mutex_lock(&session->mutex);
if (session->active_ops > 0) {
session->active_ops -= 1;
}
if (session->closing && session->active_ops == 0) {
pthread_cond_broadcast(&session->idle_cond);
}
pthread_mutex_unlock(&session->mutex);
if (rc != 0) {
return rc;
}
out_metrics->connected = metrics.connected;
out_metrics->has_conv = metrics.has_conv;
out_metrics->conv = metrics.conv;
snprintf(out_metrics->local_addr, sizeof(out_metrics->local_addr), "%s", metrics.local_addr);
snprintf(out_metrics->remote_addr, sizeof(out_metrics->remote_addr), "%s", metrics.remote_addr);
out_metrics->rto_ms = metrics.rto_ms;
out_metrics->srtt_ms = metrics.srtt_ms;
out_metrics->srttvar_ms = metrics.srttvar_ms;
out_metrics->bytes_sent = metrics.bytes_sent;
out_metrics->bytes_received = metrics.bytes_received;
out_metrics->in_pkts = metrics.in_pkts;
out_metrics->out_pkts = metrics.out_pkts;
out_metrics->in_segs = metrics.in_segs;
out_metrics->out_segs = metrics.out_segs;
out_metrics->retrans_segs = metrics.retrans_segs;
out_metrics->fast_retrans_segs = metrics.fast_retrans_segs;
out_metrics->early_retrans_segs = metrics.early_retrans_segs;
out_metrics->lost_segs = metrics.lost_segs;
out_metrics->repeat_segs = metrics.repeat_segs;
out_metrics->in_errs = metrics.in_errs;
out_metrics->kcp_in_errs = metrics.kcp_in_errs;
out_metrics->ring_buffer_snd_queue = metrics.ring_buffer_snd_queue;
out_metrics->ring_buffer_rcv_queue = metrics.ring_buffer_rcv_queue;
out_metrics->ring_buffer_snd_buffer = metrics.ring_buffer_snd_buffer;
return 0;
}

View File

@@ -14,6 +14,33 @@ typedef struct omnisocket_session_stats {
int connected;
} omnisocket_session_stats_t;
typedef struct omnisocket_session_kcp_metrics {
int connected;
int has_conv;
uint32_t conv;
char local_addr[OMNI_MAX_ADDR_TEXT];
char remote_addr[OMNI_MAX_ADDR_TEXT];
uint32_t rto_ms;
int32_t srtt_ms;
int32_t srttvar_ms;
uint64_t bytes_sent;
uint64_t bytes_received;
uint64_t in_pkts;
uint64_t out_pkts;
uint64_t in_segs;
uint64_t out_segs;
uint64_t retrans_segs;
uint64_t fast_retrans_segs;
uint64_t early_retrans_segs;
uint64_t lost_segs;
uint64_t repeat_segs;
uint64_t in_errs;
uint64_t kcp_in_errs;
uint64_t ring_buffer_snd_queue;
uint64_t ring_buffer_rcv_queue;
uint64_t ring_buffer_snd_buffer;
} omnisocket_session_kcp_metrics_t;
typedef struct omnisocket_session {
pthread_mutex_t mutex;
pthread_cond_t idle_cond;
@@ -47,5 +74,9 @@ int omnisocket_session_recv_into(
int timeout_ms
);
void omnisocket_session_stats_snapshot(omnisocket_session_t *session, omnisocket_session_stats_t *out_stats);
int omnisocket_session_kcp_metrics_snapshot(
omnisocket_session_t *session,
omnisocket_session_kcp_metrics_t *out_metrics
);
#endif

View File

@@ -0,0 +1,9 @@
from pathlib import Path
PACKAGE_ROOT = Path(__file__).resolve().parent
PYTHON_ROOT = PACKAGE_ROOT.parent
REPO_ROOT = PYTHON_ROOT.parent
DEFAULT_SOCKET_PATH = "/tmp/omnisocket-a-side.sock"
DEFAULT_CONFIG_PATH = REPO_ROOT / "config" / "a_side_omnidaemon.yaml"
VERSION = "0.1.0"

View File

@@ -0,0 +1,5 @@
from .daemon import main
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,149 @@
"""Local Unix-domain HTTP client for the A-side OmniDaemon."""
from __future__ import annotations
import http.client
import json
import os
import socket
import threading
from typing import Any
from . import DEFAULT_SOCKET_PATH
class OmniDaemonError(RuntimeError):
def __init__(self, message: str, status_code: int | None = None) -> None:
super().__init__(message)
self.status_code = status_code
class UnixHTTPConnection(http.client.HTTPConnection):
def __init__(self, socket_path: str, timeout: float = 2.0) -> None:
super().__init__("localhost", timeout=timeout)
self.socket_path = socket_path
def connect(self) -> None: # pragma: no cover - runtime depends on Linux socket support
if not hasattr(socket, "AF_UNIX"):
raise OSError("AF_UNIX sockets are not available on this platform")
self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
self.sock.settimeout(self.timeout)
self.sock.connect(self.socket_path)
class OmniDaemonClient:
def __init__(self, socket_path: str | None = None, timeout: float = 2.0) -> None:
self.socket_path = socket_path or os.getenv("OMNIDAEMON_SOCKET", DEFAULT_SOCKET_PATH)
self.timeout = timeout
self._local = threading.local()
def get_health(self) -> dict[str, Any]:
return self._request_json("GET", "/v1/health")
def get_state(self) -> dict[str, Any]:
return self._request_json("GET", "/v1/state")
def get_video_frame(self) -> bytes:
return self._request_bytes("GET", "/v1/video/frame")
def get_control_status(self) -> dict[str, Any]:
return self._request_json("GET", "/v1/control/status")
def send_control_event(
self,
*,
source: str,
event_code: str,
drive_value: float = 1.0,
client_time_ms: int | None = None,
) -> dict[str, Any]:
payload = {
"source": source,
"event_code": event_code,
"drive_value": float(drive_value),
"client_time_ms": client_time_ms,
}
return self._request_json("POST", "/v1/control/event", payload)
def close(self) -> None:
self._reset_connection()
def _request_json(
self,
method: str,
path: str,
payload: dict[str, Any] | None = None,
) -> dict[str, Any]:
raw = self._request_bytes(method, path, payload)
if not raw:
return {}
try:
return json.loads(raw.decode("utf-8"))
except json.JSONDecodeError as error:
raise OmniDaemonError(f"invalid daemon JSON response: {error}") from error
def _request_bytes(
self,
method: str,
path: str,
payload: dict[str, Any] | None = None,
) -> bytes:
body = b""
headers: dict[str, str] = {}
if payload is not None:
body = json.dumps(payload).encode("utf-8")
headers["Content-Type"] = "application/json"
headers["Content-Length"] = str(len(body))
headers.setdefault("Connection", "keep-alive")
for attempt in range(2):
connection = self._get_connection()
try:
connection.request(method, path, body=body, headers=headers)
response = connection.getresponse()
raw = response.read()
except FileNotFoundError as error:
self._reset_connection()
raise OmniDaemonError(
f"daemon socket not found: {self.socket_path}"
) from error
except (OSError, http.client.HTTPException) as error:
self._reset_connection()
if attempt == 0:
continue
raise OmniDaemonError(
f"daemon request failed via {self.socket_path}: {error}"
) from error
if getattr(response, "will_close", False):
self._reset_connection()
if response.status >= 400:
message = raw.decode("utf-8", errors="replace").strip() or response.reason
try:
parsed = json.loads(message)
if isinstance(parsed, dict) and "error" in parsed:
message = str(parsed["error"])
except json.JSONDecodeError:
pass
raise OmniDaemonError(message, status_code=response.status)
return raw
raise OmniDaemonError(f"daemon request failed via {self.socket_path}: retry exhausted")
def _get_connection(self) -> UnixHTTPConnection:
connection = getattr(self._local, "connection", None)
if connection is None:
connection = UnixHTTPConnection(self.socket_path, timeout=self.timeout)
self._local.connection = connection
return connection
def _reset_connection(self) -> None:
connection = getattr(self._local, "connection", None)
if connection is None:
return
try:
connection.close()
except OSError:
pass
self._local.connection = None

View File

@@ -0,0 +1,90 @@
"""Binary control packet codec shared by the daemon and local clients."""
from __future__ import annotations
from dataclasses import dataclass
import struct
import time
CONTROL_PACKET_VERSION = 1
CONTROL_PACKET_STRUCT = struct.Struct("!BBHIfQ")
EVENT_NAME_TO_ID = {
"pose_home": 1,
"pose_hold": 2,
"mode_stride": 3,
"surge_up": 6,
"surge_down": 7,
"sway_left": 8,
"sway_right": 9,
"spin_left": 10,
"spin_right": 11,
"set_surge": 12,
"set_sway": 13,
"set_spin": 14,
"set_lift": 15,
"lift_up": 16,
"lift_down": 17,
"trim_reset": 18,
"session_quit": 19,
}
EVENT_ID_TO_NAME = {value: key for key, value in EVENT_NAME_TO_ID.items()}
ANALOG_EVENT_CODES = {"set_surge", "set_sway", "set_spin"}
@dataclass(slots=True)
class ControlPacket:
seq_id: int
event_id: int
drive_value: float = 1.0
sent_at_ns: int = 0
@property
def event_name(self) -> str:
return EVENT_ID_TO_NAME.get(self.event_id, f"unknown_{self.event_id}")
def encode(self) -> bytes:
sent_at_ns = self.sent_at_ns or time.time_ns()
return CONTROL_PACKET_STRUCT.pack(
CONTROL_PACKET_VERSION,
self.event_id,
0,
int(self.seq_id),
float(self.drive_value),
int(sent_at_ns),
)
@classmethod
def decode(cls, payload: bytes) -> "ControlPacket":
if len(payload) != CONTROL_PACKET_STRUCT.size:
raise ValueError(
f"invalid control packet length {len(payload)}; "
f"want {CONTROL_PACKET_STRUCT.size}"
)
version, event_id, _reserved, seq_id, drive_value, sent_at_ns = (
CONTROL_PACKET_STRUCT.unpack(payload)
)
if version != CONTROL_PACKET_VERSION:
raise ValueError(f"unsupported control packet version {version}")
return cls(
seq_id=int(seq_id),
event_id=int(event_id),
drive_value=float(drive_value),
sent_at_ns=int(sent_at_ns),
)
def make_control_packet(
seq_id: int,
event_name: str,
drive_value: float = 1.0,
sent_at_ns: int | None = None,
) -> ControlPacket:
event_id = EVENT_NAME_TO_ID[event_name]
return ControlPacket(
seq_id=seq_id,
event_id=event_id,
drive_value=drive_value,
sent_at_ns=time.time_ns() if sent_at_ns is None else sent_at_ns,
)

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,10 @@
from pathlib import Path
PACKAGE_ROOT = Path(__file__).resolve().parent
PYTHON_ROOT = PACKAGE_ROOT.parent
REPO_ROOT = PYTHON_ROOT.parent
DEFAULT_SOCKET_PATH = "/tmp/omnisocket-b-side.sock"
DEFAULT_CTRL_SOCKET_PATH = "/tmp/omnisocket-b-ctrl.sock"
DEFAULT_CONFIG_PATH = REPO_ROOT / "config" / "b_side_omnidaemon.yaml"
VERSION = "0.1.0"

View File

@@ -0,0 +1,5 @@
from .daemon import main
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,54 @@
"""Local AF_UNIX SOCK_SEQPACKET client for B-side control delivery."""
from __future__ import annotations
import os
import socket
from omnisocket_a_side.control_codec import CONTROL_PACKET_STRUCT
from . import DEFAULT_CTRL_SOCKET_PATH
class BSideControlClient:
def __init__(self, socket_path: str | None = None) -> None:
self.socket_path = socket_path or os.getenv(
"OMNIBDAEMON_CTRL_SOCKET",
DEFAULT_CTRL_SOCKET_PATH,
)
self._sock: socket.socket | None = None
def connect(self) -> None:
if not hasattr(socket, "AF_UNIX"):
raise OSError("AF_UNIX sockets are not available on this platform")
if not hasattr(socket, "SOCK_SEQPACKET"):
raise OSError("SOCK_SEQPACKET is not available on this platform")
sock = socket.socket(socket.AF_UNIX, socket.SOCK_SEQPACKET)
sock.connect(self.socket_path)
self._sock = sock
def recv_control_packet(self, timeout_ms: int = 100) -> bytes | None:
if self._sock is None:
raise RuntimeError("B-side control client is not connected")
self._sock.settimeout(max(0.001, timeout_ms / 1000.0))
try:
payload = self._sock.recv(CONTROL_PACKET_STRUCT.size)
except socket.timeout:
return None
except BlockingIOError:
return None
if payload == b"":
raise ConnectionResetError("daemon control socket closed")
if len(payload) != CONTROL_PACKET_STRUCT.size:
return None
return payload
def close(self) -> None:
if self._sock is None:
return
try:
self._sock.close()
finally:
self._sock = None

File diff suppressed because it is too large Load Diff

View File

@@ -35,7 +35,16 @@ COMMON_SOURCES = [
setup(
name="omnisocket",
version="0.1.0",
packages=["omnisocket"],
packages=["omnisocket", "omnisocket_a_side", "omnisocket_b_side"],
install_requires=[
"PyYAML>=6.0",
],
entry_points={
"console_scripts": [
"omnisocket-a-side-daemon=omnisocket_a_side.daemon:main",
"omnisocket-b-side-daemon=omnisocket_b_side.daemon:main",
]
},
ext_modules=[
Extension(
"omnisocket._omnisocket",

11
scripts/start_b_side.sh Executable file
View File

@@ -0,0 +1,11 @@
#!/usr/bin/env bash
set -euo pipefail
ROOT="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)"
CONFIG_PATH="${1:-$ROOT/config/b_side_omnidaemon.yaml}"
export OMNIBDAEMON_CONFIG="$CONFIG_PATH"
export PYTHONPATH="$ROOT/python${PYTHONPATH:+:$PYTHONPATH}"
cd "$ROOT"
exec python3 -m omnisocket_b_side.daemon --config "$CONFIG_PATH"

View File

@@ -294,6 +294,14 @@ int kcp_client_persist_message(kcp_client_t *client, const message_t *msg, const
return 0;
}
int kcp_client_metrics_snapshot(kcp_client_t *client, kcp_conn_metrics_t *out_metrics) {
if (client == NULL || out_metrics == NULL) {
errno = EINVAL;
return -1;
}
return kcp_conn_metrics_snapshot(client->conn, out_metrics);
}
int kcp_client_close(kcp_client_t *client) {
return client == NULL ? 0 : kcp_conn_close(client->conn);
}

View File

@@ -5,6 +5,13 @@
#define UDP_RELAY_BUF_SIZE (64U * 1024U)
typedef struct udp_relay_client_entry {
struct udp_relay_client_entry *next;
uint32_t conv;
struct sockaddr_storage addr;
socklen_t addr_len;
} udp_relay_client_entry_t;
struct udp_relay {
int downstream_fd;
int upstream_fd;
@@ -12,9 +19,10 @@ struct udp_relay {
socklen_t upstream_addr_len;
char downstream_local_addr[OMNI_MAX_ADDR_TEXT];
char upstream_local_addr[OMNI_MAX_ADDR_TEXT];
struct sockaddr_storage client_addr;
socklen_t client_addr_len;
int has_client;
struct sockaddr_storage last_client_addr;
socklen_t last_client_addr_len;
int has_last_client;
udp_relay_client_entry_t *clients;
pthread_mutex_t lock;
pthread_mutex_t log_mu;
pthread_mutex_t state_mu;
@@ -131,22 +139,55 @@ static void udp_relay_note_result(udp_relay_t *relay, int rc, int errnum) {
pthread_mutex_unlock(&relay->state_mu);
}
static void udp_relay_record_client(udp_relay_t *relay, const struct sockaddr_storage *addr, socklen_t addr_len) {
static udp_relay_client_entry_t *udp_relay_find_client_locked(udp_relay_t *relay, uint32_t conv) {
udp_relay_client_entry_t *entry;
for (entry = relay->clients; entry != NULL; entry = entry->next) {
if (entry->conv == conv) {
return entry;
}
}
return NULL;
}
static void udp_relay_record_client(udp_relay_t *relay, int has_conv, uint32_t conv, const struct sockaddr_storage *addr, socklen_t addr_len) {
pthread_mutex_lock(&relay->lock);
memcpy(&relay->client_addr, addr, sizeof(*addr));
relay->client_addr_len = addr_len;
relay->has_client = 1;
memcpy(&relay->last_client_addr, addr, sizeof(*addr));
relay->last_client_addr_len = addr_len;
relay->has_last_client = 1;
if (has_conv) {
udp_relay_client_entry_t *entry = udp_relay_find_client_locked(relay, conv);
if (entry == NULL) {
entry = (udp_relay_client_entry_t *) calloc(1, sizeof(*entry));
if (entry != NULL) {
entry->conv = conv;
entry->next = relay->clients;
relay->clients = entry;
}
}
if (entry != NULL) {
memcpy(&entry->addr, addr, sizeof(*addr));
entry->addr_len = addr_len;
}
}
pthread_mutex_unlock(&relay->lock);
}
static int udp_relay_copy_client(udp_relay_t *relay, struct sockaddr_storage *addr, socklen_t *addr_len) {
int has_client;
static int udp_relay_copy_client(udp_relay_t *relay, int has_conv, uint32_t conv, struct sockaddr_storage *addr, socklen_t *addr_len) {
int has_client = 0;
pthread_mutex_lock(&relay->lock);
has_client = relay->has_client;
if (has_client) {
memcpy(addr, &relay->client_addr, sizeof(*addr));
*addr_len = relay->client_addr_len;
if (has_conv) {
udp_relay_client_entry_t *entry = udp_relay_find_client_locked(relay, conv);
if (entry != NULL) {
memcpy(addr, &entry->addr, sizeof(*addr));
*addr_len = entry->addr_len;
has_client = 1;
}
} else if (relay->has_last_client) {
memcpy(addr, &relay->last_client_addr, sizeof(*addr));
*addr_len = relay->last_client_addr_len;
has_client = 1;
}
pthread_mutex_unlock(&relay->lock);
return has_client;
@@ -160,6 +201,8 @@ static void *udp_relay_forward_downstream_to_upstream(void *arg) {
struct sockaddr_storage source;
socklen_t source_len = sizeof(source);
ssize_t n = recvfrom(relay->downstream_fd, buffer, sizeof(buffer), 0, (struct sockaddr *) &source, &source_len);
int has_conv = 0;
uint32_t conv = 0;
if (n < 0) {
int errnum = errno;
@@ -174,7 +217,8 @@ static void *udp_relay_forward_downstream_to_upstream(void *arg) {
return NULL;
}
udp_relay_record_client(relay, &source, source_len);
udp_relay_parse_kcp_summary(buffer, (size_t) n, &has_conv, &conv, NULL);
udp_relay_record_client(relay, has_conv, conv, &source, source_len);
udp_relay_print_packet(relay, "relay_downstream_rx", relay->downstream_local_addr, &source, source_len, buffer, (size_t) n);
for (;;) {
if (send(relay->upstream_fd, buffer, (size_t) n, 0) >= 0) {
@@ -205,6 +249,8 @@ static void *udp_relay_forward_upstream_to_downstream(void *arg) {
struct sockaddr_storage client_addr;
socklen_t client_addr_len = 0;
ssize_t n = recv(relay->upstream_fd, buffer, sizeof(buffer), 0);
int has_conv = 0;
uint32_t conv = 0;
if (n < 0) {
int errnum = errno;
@@ -220,7 +266,8 @@ static void *udp_relay_forward_upstream_to_downstream(void *arg) {
}
udp_relay_print_packet(relay, "relay_upstream_rx", relay->upstream_local_addr, &relay->upstream_addr, relay->upstream_addr_len, buffer, (size_t) n);
if (!udp_relay_copy_client(relay, &client_addr, &client_addr_len)) {
udp_relay_parse_kcp_summary(buffer, (size_t) n, &has_conv, &conv, NULL);
if (!udp_relay_copy_client(relay, has_conv, conv, &client_addr, &client_addr_len)) {
udp_relay_print_packet(relay, "relay_upstream_drop_no_client", relay->upstream_local_addr, &relay->upstream_addr, relay->upstream_addr_len, buffer, (size_t) n);
continue;
}
@@ -404,11 +451,18 @@ int udp_relay_close(udp_relay_t *relay) {
}
void udp_relay_free(udp_relay_t *relay) {
udp_relay_client_entry_t *entry;
udp_relay_client_entry_t *next;
if (relay == NULL) {
return;
}
udp_relay_close(relay);
udp_relay_join_threads(relay);
for (entry = relay->clients; entry != NULL; entry = next) {
next = entry->next;
free(entry);
}
pthread_mutex_destroy(&relay->lock);
pthread_mutex_destroy(&relay->log_mu);
pthread_cond_destroy(&relay->state_cond);

View File

@@ -1788,6 +1788,83 @@ int kcp_conn_local_addr(const kcp_conn_t *conn, struct sockaddr_storage *addr, s
return 0;
}
int kcp_conn_metrics_snapshot(kcp_conn_t *conn, kcp_conn_metrics_t *out_metrics) {
struct sockaddr_storage local_addr;
socklen_t local_len = sizeof(local_addr);
if (conn == NULL || out_metrics == NULL) {
errno = EINVAL;
return -1;
}
memset(out_metrics, 0, sizeof(*out_metrics));
out_metrics->connected = !atomic_load(&conn->closed);
if (conn->sock_state != NULL && !conn->socket_closed &&
getsockname(conn->sock_state->fd, (struct sockaddr *) &local_addr, &local_len) == 0) {
omni_sockaddr_to_string(
(const struct sockaddr *) &local_addr,
local_len,
out_metrics->local_addr,
sizeof(out_metrics->local_addr)
);
}
if (conn->remote_addr_len > 0) {
omni_sockaddr_to_string(
(const struct sockaddr *) &conn->remote_addr,
conn->remote_addr_len,
out_metrics->remote_addr,
sizeof(out_metrics->remote_addr)
);
}
if (conn->process_sampler != NULL) {
out_metrics->bytes_sent = atomic_load_explicit(&conn->process_sampler->bytes_sent, memory_order_relaxed);
out_metrics->bytes_received = atomic_load_explicit(&conn->process_sampler->bytes_received, memory_order_relaxed);
out_metrics->in_pkts = atomic_load_explicit(&conn->process_sampler->in_pkts, memory_order_relaxed);
out_metrics->out_pkts = atomic_load_explicit(&conn->process_sampler->out_pkts, memory_order_relaxed);
out_metrics->in_segs = atomic_load_explicit(&conn->process_sampler->in_segs, memory_order_relaxed);
out_metrics->out_segs = atomic_load_explicit(&conn->process_sampler->out_segs, memory_order_relaxed);
out_metrics->in_errs = atomic_load_explicit(&conn->process_sampler->in_errs, memory_order_relaxed);
out_metrics->kcp_in_errs = atomic_load_explicit(&conn->process_sampler->kcp_in_errs, memory_order_relaxed);
} else {
out_metrics->bytes_sent = conn->pending_bytes_sent;
out_metrics->bytes_received = conn->pending_bytes_received;
out_metrics->in_pkts = conn->pending_in_pkts;
out_metrics->out_pkts = conn->pending_out_pkts;
out_metrics->in_segs = conn->pending_in_segs;
out_metrics->out_segs = conn->pending_out_segs;
out_metrics->in_errs = conn->pending_in_errs;
out_metrics->kcp_in_errs = conn->pending_kcp_in_errs;
}
pthread_mutex_lock(&conn->kcp_mu);
if (conn->kcp != NULL) {
out_metrics->has_conv = 1;
out_metrics->conv = conn->kcp->conv;
out_metrics->rto_ms = conn->kcp->rx_rto;
out_metrics->srtt_ms = conn->kcp->rx_srtt;
out_metrics->srttvar_ms = conn->kcp->rx_rttval;
out_metrics->ring_buffer_snd_queue = conn->kcp->nsnd_que;
out_metrics->ring_buffer_rcv_queue = conn->kcp->nrcv_que;
out_metrics->ring_buffer_snd_buffer = conn->kcp->nsnd_buf;
out_metrics->fast_retrans_segs = conn->kcp->fast_retrans_xmit;
/* This KCP fork does not implement early retransmit, so the counter stays zero. */
out_metrics->early_retrans_segs = conn->kcp->early_retrans_xmit;
out_metrics->lost_segs = conn->kcp->lost_xmit;
out_metrics->repeat_segs = conn->kcp->repeat_xmit;
out_metrics->retrans_segs =
out_metrics->fast_retrans_segs +
out_metrics->early_retrans_segs +
out_metrics->lost_segs;
} else {
out_metrics->connected = 0;
}
pthread_mutex_unlock(&conn->kcp_mu);
return 0;
}
int kcp_conn_close(kcp_conn_t *conn) {
if (conn == NULL) {
return 0;

View File

@@ -298,6 +298,10 @@ ikcpcb *ikcp_create(IUINT32 conv, void *user)
kcp->fastlimit = IKCP_FASTACK_LIMIT;
kcp->nocwnd = 0;
kcp->xmit = 0;
kcp->fast_retrans_xmit = 0;
kcp->early_retrans_xmit = 0;
kcp->lost_xmit = 0;
kcp->repeat_xmit = 0;
kcp->dead_link = IKCP_DEADLINK;
kcp->output = NULL;
kcp->writelog = NULL;
@@ -788,6 +792,7 @@ void ikcp_parse_data(ikcpcb *kcp, IKCPSEG *newseg)
}
else
{
kcp->repeat_xmit++;
ikcp_segment_delete(kcp, newseg);
}
@@ -1192,6 +1197,7 @@ void ikcp_flush(ikcpcb *kcp)
needsend = 1;
segment->xmit++;
kcp->xmit++;
kcp->lost_xmit++;
if (kcp->nodelay == 0)
{
segment->rto += _imax_(segment->rto, (IUINT32)kcp->rx_rto);
@@ -1211,6 +1217,7 @@ void ikcp_flush(ikcpcb *kcp)
{
needsend = 1;
segment->xmit++;
kcp->fast_retrans_xmit++;
segment->fastack = 0;
segment->resendts = current + segment->rto;
change++;

View File

@@ -300,6 +300,10 @@ struct IKCPCB
IINT32 rx_rttval, rx_srtt, rx_rto, rx_minrto;
IUINT32 snd_wnd, rcv_wnd, rmt_wnd, cwnd, probe;
IUINT32 current, interval, ts_flush, xmit;
IUINT32 fast_retrans_xmit;
IUINT32 early_retrans_xmit;
IUINT32 lost_xmit;
IUINT32 repeat_xmit;
IUINT32 nrcv_buf, nsnd_buf;
IUINT32 nrcv_que, nsnd_que;
IUINT32 nodelay, updated;