Compare commits

...

15 Commits

Author SHA1 Message Date
0f2405cb04 fix: 静默 BrokenPipeError 因为后关server 2026-04-02 22:49:25 +08:00
c7b995efd7 fix: 不能“sender 里先按 fps 睡再去 DQBUF” 2026-04-02 22:31:32 +08:00
9cd1f88bfc fix: 当前 relay 很可能只记住了“一个下游 UDP 地址” 2026-04-02 20:56:36 +08:00
6d77dc26bd fix: 把服务端错误直接写进 last_error 2026-04-02 20:05:28 +08:00
878e11e597 Merge branch 'dev' of https://106.52.207.92:9103/limingjie/OmniSocketGo into dev 2026-04-02 17:45:00 +08:00
8ab12a0d69 fix: daemon终端无日志 2026-04-02 17:44:58 +08:00
0ae13b428e gitignore for .venv 2026-04-02 16:52:29 +08:00
nnbcccscdscdsc
0933692737 fix: start_b_side.sh 权限 2026-04-01 23:59:06 +08:00
aec42c83e4 fix: B端 relay_via 配置 2026-04-01 23:49:24 +08:00
583bcf120d feat: 把 B 端的 视频/控制 都收口到一个本地 daemon 进程里 2026-04-01 21:01:47 +08:00
2f2c2008e7 feat: 把 A 端的 Session/KCP/视频/控制 都收口到一个本地 daemon 进程里,Django 和输入发送端都改成通过本机 UDS HTTP 去访问它,同时补齐了观测、性能和可用性上的几个关键问题。 2026-04-01 15:48:01 +08:00
e69db4c466 feat: 对接采集视频 2026-03-31 21:10:01 +08:00
meiqi
16702a0853 fix: 编译cpython 2026-03-30 23:09:04 +08:00
86e36d0859 Merge branch 'c' of https://106.52.207.92:9103/limingjie/OmniSocketGo into c 2026-03-30 22:48:38 +08:00
d678bfc326 feat: 对接Python,暴露接口 2026-03-30 22:48:36 +08:00
40 changed files with 6235 additions and 67 deletions

View File

@@ -9,7 +9,9 @@
"Bash(git status:*)",
"Bash(git fetch:*)",
"Bash(git pull:*)",
"Bash(wc:*)"
"Bash(wc:*)",
"Bash(python3 -c \"import dis, marshal, types; f = open\\(''''C:/Users/64187/Desktop/Workspace/OmniSocketGo/__pycache__/omnisocket_video_sender.cpython-312.pyc'''',''''rb''''\\); f.read\\(16\\); code=marshal.load\\(f\\); dis.dis\\(code\\)\")",
"Bash(gh pr:*)"
]
}
}

11
.gitignore vendored
View File

@@ -10,4 +10,13 @@ peer-b-latency.*
*.log
root@117.78.11.244
c/bin
c/bin
*__pycache__*
/python/build
/python/omnisocket.egg-info
*.so*
/.venv

View File

@@ -2,6 +2,7 @@ CC ?= gcc
CFLAGS ?= -std=c11 -Wall -Wextra -O2 -pthread -D_GNU_SOURCE
CPPFLAGS ?= -Iinclude -Ithird_party/cjson -Ithird_party/kcp
LDFLAGS ?= -pthread
PYTHON ?= python3
BIN_DIR := bin
SRC_DIR := src
@@ -35,6 +36,34 @@ TARGETS := \
$(BIN_DIR)/kcppeer \
$(BIN_DIR)/kcpping
CAMERA_VIDEO_SENDER := $(BIN_DIR)/camera_video_sender
CAMERA_VIDEO_SENDER_SRCS := \
$(CMD_DIR)/v1_camera_pipeline_ifdef.c \
$(SRC_DIR)/omni_common.c \
$(SRC_DIR)/protocol.c \
$(SRC_DIR)/latencylog.c \
$(SRC_DIR)/kcp_packet_debug.c \
$(SRC_DIR)/kcp_session_stats.c \
$(SRC_DIR)/linux_timestamping.c \
$(SRC_DIR)/transport_kcp.c \
$(SRC_DIR)/peer_kcp_client.c \
third_party/cjson/cJSON.c \
third_party/kcp/ikcp.c
B_SIDE_VIDEO_SENDER := $(BIN_DIR)/b_side_video_sender
B_SIDE_VIDEO_SENDER_SRCS := \
$(CMD_DIR)/b_side_video_sender.c \
$(SRC_DIR)/omni_common.c \
$(SRC_DIR)/protocol.c \
$(SRC_DIR)/latencylog.c \
$(SRC_DIR)/kcp_packet_debug.c \
$(SRC_DIR)/kcp_session_stats.c \
$(SRC_DIR)/linux_timestamping.c \
$(SRC_DIR)/transport_kcp.c \
$(SRC_DIR)/peer_kcp_client.c \
third_party/cjson/cJSON.c \
third_party/kcp/ikcp.c
all: $(TARGETS)
$(BIN_DIR):
@@ -61,7 +90,23 @@ $(BIN_DIR)/kcppeer: $(CMD_DIR)/kcppeer.c $(COMMON_SRCS) | $(BIN_DIR)
$(BIN_DIR)/kcpping: $(CMD_DIR)/kcpping.c $(COMMON_SRCS) | $(BIN_DIR)
$(CC) $(CFLAGS) $(CPPFLAGS) -o $@ $^ $(LDFLAGS)
$(CAMERA_VIDEO_SENDER): $(CAMERA_VIDEO_SENDER_SRCS) | $(BIN_DIR)
$(CC) $(CFLAGS) $(CPPFLAGS) $$(pkg-config --cflags libavformat libavcodec libavutil libswscale) -o $@ $^ $(LDFLAGS) $$(pkg-config --libs libavformat libavcodec libavutil libswscale)
camera_video_sender: $(CAMERA_VIDEO_SENDER)
$(B_SIDE_VIDEO_SENDER): $(B_SIDE_VIDEO_SENDER_SRCS) | $(BIN_DIR)
$(CC) $(CFLAGS) $(CPPFLAGS) $$(pkg-config --cflags libavformat libavcodec libavutil libswscale) -o $@ $^ $(LDFLAGS) $$(pkg-config --libs libavformat libavcodec libavutil libswscale)
b_side_video_sender: $(B_SIDE_VIDEO_SENDER)
clean:
rm -rf $(BIN_DIR)
.PHONY: all clean
python-ext:
cd python && $(PYTHON) setup.py build_ext --inplace
python-install:
cd python && $(PYTHON) -m pip install -e .
.PHONY: all clean python-ext python-install camera_video_sender b_side_video_sender

View File

@@ -7,26 +7,56 @@ This subtree is intentionally standalone. The Go code stays in place as the beha
## Build
```bash
cd c
make
make -j$(nproc)
```
Build outputs:
- `c/bin/udpserver`
- `c/bin/udppeer`
- `c/bin/udpping`
- `c/bin/udprelay`
- `c/bin/kcpserver`
- `c/bin/kcppeer`
- `c/bin/kcpping`
- `./bin/udpserver`
- `./bin/udppeer`
- `./bin/udpping`
- `./bin/udprelay`
- `./bin/kcpserver`
- `./bin/kcppeer`
- `./bin/kcpping`
Python extension build:
```bash
make python-ext
make python-install
```
## A-Side OmniDaemon
The A-side daemon is configured from `config/a_side_omnidaemon.yaml` in this repo. The safest way to start it is to pass that file explicitly, because the installed Python package does not bundle the YAML config.
Run from a source checkout:
```bash
python -m omnisocket_a_side.daemon --config "$(pwd)/config/a_side_omnidaemon.yaml"
```
Or, if you installed the console script:
```bash
OMNIDAEMON_CONFIG="$(pwd)/config/a_side_omnidaemon.yaml" \
omnisocket-a-side-daemon
```
Optional overrides:
- `OMNIDAEMON_SOCKET=/tmp/omnisocket-a-side.sock` selects the local UDS path.
- `OMNIDAEMON_CONFIG=/abs/path/to/a_side_omnidaemon.yaml` overrides `--config`.
For `robot-command-center` and the A-side senders, keep the daemon and its clients on the same Linux machine so they can share the Unix-domain socket.
## Run On Different Machines
Server `D` runs the KCP hub on `0.0.0.0:10909`:
```bash
./c/bin/kcpserver -listen 0.0.0.0:10909 \
./bin/kcpserver -listen 0.0.0.0:10909 \
-kcp-ts-debug-log logs/d-kcp-ts.jsonl \
-kcp-session-stats-log logs/d-kcp-stats.jsonl
```
@@ -34,13 +64,13 @@ Server `D` runs the KCP hub on `0.0.0.0:10909`:
Relay `C` runs a raw UDP forwarder to `D`:
```bash
./c/bin/kcpserver -mode=relay -listen 0.0.0.0:10909 -relay-remote 172.21.32.15:10909
./bin/kcpserver -mode=relay -listen 0.0.0.0:10909 -relay-remote 172.21.32.15:10909
```
Peer `A` dials `D` through relay `C`:
```bash
./c/bin/kcppeer -id peer-a -server 172.21.32.15:10909 -relay-via 106.55.173.235:10909 \
./bin/kcppeer -id peer-a -server 172.21.32.15:10909 -relay-via 106.55.173.235:10909 \
-inbox-dir inbox/a \
-latency-log logs/a-latency.jsonl \
-kcp-ts-debug-log logs/a-kcp-ts.jsonl \
@@ -50,7 +80,7 @@ Peer `A` dials `D` through relay `C`:
Peer `B` dials `D` directly:
```bash
./c/bin/kcppeer -id peer-b -server 81.70.156.140:10909 \
./bin/kcppeer -id peer-b -server 81.70.156.140:10909 \
-inbox-dir inbox/b \
-latency-log logs/b-latency.jsonl \
-kcp-ts-debug-log logs/b-kcp-ts.jsonl \
@@ -60,13 +90,33 @@ Peer `B` dials `D` directly:
Optional ping / echo tools:
```bash
./c/bin/kcpping -id peer-a -server 106.55.173.235:10909 -echo
./c/bin/kcpping -id peer-b -server 81.70.156.140:10909 -to peer-a -count 20 -interval 100ms
./c/bin/udpserver -listen 0.0.0.0:9001
./c/bin/udppeer -id peer-a -server 127.0.0.1:9001
./c/bin/udpping -id pinger -server 127.0.0.1:9001 -to peer-a -count 20
./bin/kcpping -id peer-a -server 106.55.173.235:10909 -echo
./bin/kcpping -id peer-b -server 81.70.156.140:10909 -to peer-a -count 20 -interval 100ms
./bin/udpserver -listen 0.0.0.0:9001
./bin/udppeer -id peer-a -server 127.0.0.1:9001
./bin/udpping -id pinger -server 127.0.0.1:9001 -to peer-a -count 20
```
Python control/video demos use two KCP sessions:
- `peer-a-ctrl <-> peer-b-ctrl` for small binary control packets
- `peer-b-video -> peer-a-video` for larger binary video frames
Example demo entry points:
- `udp_keyboard_sender.py`
- `udp_xbox_sender.py`
- `udp_fsm_controller.py`
- `omnisocket_video_sender.py`
- `omnisocket_video_receiver.py`
- `scripts/kcp_control_benchmark.py`
Python `recv_into()` note:
- The writable buffer must be large enough for the full incoming payload.
- If the buffer is too small, `recv_into()` reports the required size but the current frame has already been consumed and is lost.
- For the video demo, keep `video_receiver.buffer_bytes >= video_sender.frame_bytes`.
## Interactive Commands
`udppeer` and `kcppeer` support the same interactive shell:
@@ -83,6 +133,8 @@ quit
- The C project targets Linux only.
- It preserves the Go wire format for UDP datagrams and KCP stream frames.
- It now supports `binary` payload messages in addition to `text`, `file`, `register`, and `error`.
- Python `Session.recv_into()` is a zero-copy receive helper for already-sized buffers; it does not retain oversized frames for a retry.
- It keeps runtime JSONL logging, UDP TX timestamp debug, KCP packet debug, and KCP session stats.
- Offline `latencysummary` and HTML chart generation are intentionally not migrated.
- No automated C tests are included in this subtree; validation is expected to happen on Linux via `make` and manual smoke tests.

933
cmd/b_side_video_sender.c Normal file
View File

@@ -0,0 +1,933 @@
#include <errno.h>
#include <fcntl.h>
#include <inttypes.h>
#include <linux/videodev2.h>
#include <pthread.h>
#include <signal.h>
#include <stdbool.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/ioctl.h>
#include <sys/mman.h>
#include <sys/select.h>
#include <time.h>
#include <unistd.h>
#include <libavcodec/avcodec.h>
#include <libavutil/avutil.h>
#include <libavutil/imgutils.h>
#include <libavutil/opt.h>
#include <libswscale/swscale.h>
#include "cJSON.h"
#include "peer_kcp_client.h"
#define WORKER_CONTROL_FD 3
#define WORKER_TELEMETRY_FD 4
#define WORKER_CONTROL_FD_ENV "OMNI_WORKER_CONTROL_FD"
#define WORKER_TELEMETRY_FD_ENV "OMNI_WORKER_TELEMETRY_FD"
#define NUM_BUFFERS 4
#define CLEAR(x) memset(&(x), 0, sizeof(x))
#define VIDEO_SERVER_ADDR_ENV "OMNI_VIDEO_SERVER_ADDR"
#define VIDEO_RELAY_ADDR_ENV "OMNI_VIDEO_RELAY_VIA"
#define VIDEO_BIND_IP_ENV "OMNI_VIDEO_BIND_IP"
#define VIDEO_BIND_DEVICE_ENV "OMNI_VIDEO_BIND_DEVICE"
#define VIDEO_PEER_ID_ENV "OMNI_VIDEO_PEER_ID"
#define VIDEO_TARGET_PEER_ENV "OMNI_VIDEO_TARGET_PEER"
#define VIDEO_DEVICE_ENV "OMNI_VIDEO_DEVICE"
#define VIDEO_CAPTURE_WIDTH_ENV "OMNI_VIDEO_CAPTURE_WIDTH"
#define VIDEO_CAPTURE_HEIGHT_ENV "OMNI_VIDEO_CAPTURE_HEIGHT"
#define VIDEO_OUTPUT_WIDTH_ENV "OMNI_VIDEO_OUTPUT_WIDTH"
#define VIDEO_OUTPUT_HEIGHT_ENV "OMNI_VIDEO_OUTPUT_HEIGHT"
#define VIDEO_FPS_ENV "OMNI_VIDEO_FPS"
#define VIDEO_JPEG_QSCALE_ENV "OMNI_VIDEO_JPEG_QSCALE"
#define VIDEO_MAX_FRAME_BYTES_ENV "OMNI_VIDEO_MAX_FRAME_BYTES"
#define VIDEO_STATS_INTERVAL_ENV "OMNI_VIDEO_STATS_INTERVAL_MS"
typedef struct {
void *start;
size_t length;
} Buffer;
typedef struct {
char server_addr[OMNI_MAX_ADDR_TEXT];
char relay_addr[OMNI_MAX_ADDR_TEXT];
char bind_ip[OMNI_MAX_ADDR_TEXT];
char bind_device[128];
char peer_id[OMNI_MAX_PEER_ID];
char target_peer[OMNI_MAX_PEER_ID];
char video_device[256];
int capture_width;
int capture_height;
int output_width;
int output_height;
int initial_fps;
int initial_qscale;
int initial_max_frame_bytes;
int stats_interval_ms;
} worker_config_t;
typedef struct {
pthread_mutex_t lock;
pthread_mutex_t telemetry_lock;
FILE *telemetry_stream;
int target_fps;
int jpeg_quality_qscale;
int max_frame_bytes;
bool shutdown_requested;
} runtime_state_t;
typedef struct {
kcp_client_t *client;
char target_peer[OMNI_MAX_PEER_ID];
} video_sender_t;
typedef struct {
FILE *control_stream;
runtime_state_t *runtime;
} control_thread_args_t;
static volatile sig_atomic_t g_signal_stop = 0;
static double monotonic_ms(void) {
struct timespec ts;
clock_gettime(CLOCK_MONOTONIC, &ts);
return (double) ts.tv_sec * 1000.0 + (double) ts.tv_nsec / 1000000.0;
}
static int64_t realtime_ms(void) {
struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
return (int64_t) ts.tv_sec * 1000 + (int64_t) ts.tv_nsec / 1000000;
}
static int env_as_int(const char *name, int fallback) {
const char *raw = getenv(name);
char *endptr = NULL;
long value;
if (raw == NULL || raw[0] == '\0') {
return fallback;
}
errno = 0;
value = strtol(raw, &endptr, 10);
if (errno != 0 || endptr == raw || (endptr != NULL && *endptr != '\0')) {
return fallback;
}
return (int) value;
}
static const char *env_or_default(const char *name, const char *fallback) {
const char *value = getenv(name);
if (value != NULL && value[0] != '\0') {
return value;
}
return fallback;
}
static void handle_signal(int signum) {
(void) signum;
g_signal_stop = 1;
}
static void runtime_set_shutdown(runtime_state_t *runtime, bool shutdown_requested) {
pthread_mutex_lock(&runtime->lock);
runtime->shutdown_requested = shutdown_requested;
pthread_mutex_unlock(&runtime->lock);
}
static bool runtime_should_stop(runtime_state_t *runtime) {
bool shutdown_requested;
pthread_mutex_lock(&runtime->lock);
shutdown_requested = runtime->shutdown_requested;
pthread_mutex_unlock(&runtime->lock);
return g_signal_stop != 0 || shutdown_requested;
}
static void runtime_get_profile(runtime_state_t *runtime, int *fps, int *qscale, int *max_frame_bytes) {
pthread_mutex_lock(&runtime->lock);
*fps = runtime->target_fps;
*qscale = runtime->jpeg_quality_qscale;
*max_frame_bytes = runtime->max_frame_bytes;
pthread_mutex_unlock(&runtime->lock);
}
static void runtime_set_profile(runtime_state_t *runtime, int fps, int qscale, int max_frame_bytes) {
pthread_mutex_lock(&runtime->lock);
if (fps > 0) {
runtime->target_fps = fps;
}
if (qscale > 0) {
runtime->jpeg_quality_qscale = qscale;
}
if (max_frame_bytes > 0) {
runtime->max_frame_bytes = max_frame_bytes;
}
pthread_mutex_unlock(&runtime->lock);
}
static void telemetry_write_line(runtime_state_t *runtime, const char *line) {
pthread_mutex_lock(&runtime->telemetry_lock);
if (runtime->telemetry_stream != NULL) {
fputs(line, runtime->telemetry_stream);
fputc('\n', runtime->telemetry_stream);
fflush(runtime->telemetry_stream);
}
pthread_mutex_unlock(&runtime->telemetry_lock);
}
static void telemetry_write_frame_stat(
runtime_state_t *runtime,
int frame_bytes,
int encode_us,
bool sent,
const char *drop_reason
) {
char line[512];
int written;
written = snprintf(
line,
sizeof(line),
"{\"type\":\"frame_stat\",\"frame_bytes\":%d,\"encode_us\":%d,"
"\"sent\":%s,\"drop_reason\":\"%s\",\"ts_unix_ms\":%" PRId64 "}",
frame_bytes,
encode_us,
sent ? "true" : "false",
drop_reason != NULL ? drop_reason : "",
realtime_ms());
if (written > 0 && written < (int) sizeof(line)) {
telemetry_write_line(runtime, line);
}
}
static void telemetry_write_kcp_metrics(runtime_state_t *runtime, const kcp_conn_metrics_t *metrics) {
char line[1024];
int written;
written = snprintf(
line,
sizeof(line),
"{\"type\":\"kcp_metrics\",\"connected\":%d,\"srtt_ms\":%d,\"srttvar_ms\":%d,"
"\"rto_ms\":%u,\"bytes_sent\":%" PRIu64 ",\"bytes_received\":%" PRIu64 ","
"\"out_pkts\":%" PRIu64 ",\"out_segs\":%" PRIu64 ",\"retrans_segs\":%" PRIu64 ","
"\"fast_retrans_segs\":%" PRIu64 ",\"early_retrans_segs\":%" PRIu64 ","
"\"lost_segs\":%" PRIu64 ",\"ring_buffer_snd_queue\":%" PRIu64 ","
"\"ring_buffer_snd_buffer\":%" PRIu64 ",\"ts_unix_ms\":%" PRId64 "}",
metrics->connected,
metrics->srtt_ms,
metrics->srttvar_ms,
metrics->rto_ms,
metrics->bytes_sent,
metrics->bytes_received,
metrics->out_pkts,
metrics->out_segs,
metrics->retrans_segs,
metrics->fast_retrans_segs,
metrics->early_retrans_segs,
metrics->lost_segs,
metrics->ring_buffer_snd_queue,
metrics->ring_buffer_snd_buffer,
realtime_ms());
if (written > 0 && written < (int) sizeof(line)) {
telemetry_write_line(runtime, line);
}
}
static int load_worker_config(worker_config_t *cfg) {
const char *server_addr = getenv(VIDEO_SERVER_ADDR_ENV);
const char *relay_addr = env_or_default(VIDEO_RELAY_ADDR_ENV, "");
if (cfg == NULL) {
errno = EINVAL;
return -1;
}
if ((server_addr == NULL || server_addr[0] == '\0') && relay_addr[0] != '\0') {
server_addr = relay_addr;
}
if (server_addr == NULL || server_addr[0] == '\0') {
fprintf(stderr, "%s is required\n", VIDEO_SERVER_ADDR_ENV);
errno = EINVAL;
return -1;
}
CLEAR(*cfg);
snprintf(cfg->server_addr, sizeof(cfg->server_addr), "%s", server_addr);
snprintf(cfg->relay_addr, sizeof(cfg->relay_addr), "%s", relay_addr);
snprintf(cfg->bind_ip, sizeof(cfg->bind_ip), "%s", env_or_default(VIDEO_BIND_IP_ENV, ""));
snprintf(cfg->bind_device, sizeof(cfg->bind_device), "%s", env_or_default(VIDEO_BIND_DEVICE_ENV, ""));
snprintf(cfg->peer_id, sizeof(cfg->peer_id), "%s", env_or_default(VIDEO_PEER_ID_ENV, "peer-b-video"));
snprintf(cfg->target_peer, sizeof(cfg->target_peer), "%s", env_or_default(VIDEO_TARGET_PEER_ENV, "peer-a-video"));
snprintf(cfg->video_device, sizeof(cfg->video_device), "%s", env_or_default(VIDEO_DEVICE_ENV, "/dev/video0"));
cfg->capture_width = env_as_int(VIDEO_CAPTURE_WIDTH_ENV, 1280);
cfg->capture_height = env_as_int(VIDEO_CAPTURE_HEIGHT_ENV, 720);
cfg->output_width = env_as_int(VIDEO_OUTPUT_WIDTH_ENV, 640);
cfg->output_height = env_as_int(VIDEO_OUTPUT_HEIGHT_ENV, 360);
cfg->initial_fps = env_as_int(VIDEO_FPS_ENV, 10);
cfg->initial_qscale = env_as_int(VIDEO_JPEG_QSCALE_ENV, 8);
cfg->initial_max_frame_bytes = env_as_int(VIDEO_MAX_FRAME_BYTES_ENV, 40960);
cfg->stats_interval_ms = env_as_int(VIDEO_STATS_INTERVAL_ENV, 100);
return 0;
}
static int open_v4l2_device(const char *device) {
int fd = open(device, O_RDWR | O_NONBLOCK);
if (fd < 0) {
perror("open device");
}
return fd;
}
static int init_v4l2_device(int fd, const worker_config_t *cfg) {
struct v4l2_format fmt;
struct v4l2_streamparm parm;
CLEAR(fmt);
fmt.type = V4L2_BUF_TYPE_VIDEO_CAPTURE;
fmt.fmt.pix.width = cfg->capture_width;
fmt.fmt.pix.height = cfg->capture_height;
fmt.fmt.pix.pixelformat = V4L2_PIX_FMT_MJPEG;
fmt.fmt.pix.field = V4L2_FIELD_NONE;
if (ioctl(fd, VIDIOC_S_FMT, &fmt) < 0) {
perror("VIDIOC_S_FMT");
return -1;
}
CLEAR(parm);
parm.type = V4L2_BUF_TYPE_VIDEO_CAPTURE;
if (cfg->initial_fps > 0 && ioctl(fd, VIDIOC_G_PARM, &parm) == 0) {
if ((parm.parm.capture.capability & V4L2_CAP_TIMEPERFRAME) != 0U) {
parm.parm.capture.timeperframe.numerator = 1U;
parm.parm.capture.timeperframe.denominator = (unsigned int) cfg->initial_fps;
if (ioctl(fd, VIDIOC_S_PARM, &parm) < 0) {
perror("VIDIOC_S_PARM");
}
}
}
return 0;
}
static int init_mmap(int fd, Buffer **buffers, int *num_buffers) {
struct v4l2_requestbuffers req;
int index;
CLEAR(req);
req.count = NUM_BUFFERS;
req.type = V4L2_BUF_TYPE_VIDEO_CAPTURE;
req.memory = V4L2_MEMORY_MMAP;
if (ioctl(fd, VIDIOC_REQBUFS, &req) < 0) {
perror("VIDIOC_REQBUFS");
return -1;
}
*num_buffers = (int) req.count;
*buffers = (Buffer *) calloc(req.count, sizeof(Buffer));
if (*buffers == NULL) {
return -1;
}
for (index = 0; index < (int) req.count; ++index) {
struct v4l2_buffer buf;
CLEAR(buf);
buf.type = V4L2_BUF_TYPE_VIDEO_CAPTURE;
buf.memory = V4L2_MEMORY_MMAP;
buf.index = (unsigned int) index;
if (ioctl(fd, VIDIOC_QUERYBUF, &buf) < 0) {
perror("VIDIOC_QUERYBUF");
return -1;
}
(*buffers)[index].length = buf.length;
(*buffers)[index].start = mmap(NULL, buf.length, PROT_READ | PROT_WRITE, MAP_SHARED, fd, buf.m.offset);
if ((*buffers)[index].start == MAP_FAILED) {
perror("mmap");
return -1;
}
}
return 0;
}
static int queue_all_buffers(int fd, int num_buffers) {
int index;
for (index = 0; index < num_buffers; ++index) {
struct v4l2_buffer buf;
CLEAR(buf);
buf.type = V4L2_BUF_TYPE_VIDEO_CAPTURE;
buf.memory = V4L2_MEMORY_MMAP;
buf.index = (unsigned int) index;
if (ioctl(fd, VIDIOC_QBUF, &buf) < 0) {
perror("VIDIOC_QBUF");
return -1;
}
}
return 0;
}
static int dequeue_latest_buffer(int fd, struct v4l2_buffer *latest_buf) {
struct v4l2_buffer latest_local;
bool have_latest = false;
if (latest_buf == NULL) {
errno = EINVAL;
return -1;
}
for (;;) {
struct v4l2_buffer current;
int dq_errno;
CLEAR(current);
current.type = V4L2_BUF_TYPE_VIDEO_CAPTURE;
current.memory = V4L2_MEMORY_MMAP;
if (ioctl(fd, VIDIOC_DQBUF, &current) < 0) {
dq_errno = errno;
if (dq_errno == EINTR) {
continue;
}
if (dq_errno == EAGAIN) {
if (!have_latest) {
errno = EAGAIN;
return 1;
}
*latest_buf = latest_local;
return 0;
}
if (have_latest && ioctl(fd, VIDIOC_QBUF, &latest_local) < 0) {
perror("VIDIOC_QBUF");
}
errno = dq_errno;
return -1;
}
if (have_latest && ioctl(fd, VIDIOC_QBUF, &latest_local) < 0) {
int q_errno = errno;
perror("VIDIOC_QBUF");
if (ioctl(fd, VIDIOC_QBUF, &current) < 0) {
perror("VIDIOC_QBUF");
}
errno = q_errno;
return -1;
}
latest_local = current;
have_latest = true;
}
}
static AVCodecContext *create_mjpeg_decoder(const worker_config_t *cfg) {
const AVCodec *decoder = avcodec_find_decoder(AV_CODEC_ID_MJPEG);
AVCodecContext *ctx;
if (decoder == NULL) {
fprintf(stderr, "MJPEG decoder not found\n");
return NULL;
}
ctx = avcodec_alloc_context3(decoder);
if (ctx == NULL) {
return NULL;
}
ctx->width = cfg->capture_width;
ctx->height = cfg->capture_height;
ctx->pix_fmt = AV_PIX_FMT_YUVJ420P;
ctx->color_range = AVCOL_RANGE_JPEG;
ctx->thread_count = 1;
if (avcodec_open2(ctx, decoder, NULL) < 0) {
avcodec_free_context(&ctx);
return NULL;
}
return ctx;
}
static AVCodecContext *create_mjpeg_encoder(const worker_config_t *cfg) {
const AVCodec *encoder = avcodec_find_encoder(AV_CODEC_ID_MJPEG);
AVCodecContext *ctx;
if (encoder == NULL) {
fprintf(stderr, "MJPEG encoder not found\n");
return NULL;
}
ctx = avcodec_alloc_context3(encoder);
if (ctx == NULL) {
return NULL;
}
ctx->width = cfg->output_width;
ctx->height = cfg->output_height;
ctx->pix_fmt = AV_PIX_FMT_YUVJ420P;
ctx->time_base = (AVRational) {1, cfg->initial_fps > 0 ? cfg->initial_fps : 10};
ctx->qmin = 1;
ctx->qmax = 31;
ctx->flags |= AV_CODEC_FLAG_QSCALE;
ctx->global_quality = FF_QP2LAMBDA * cfg->initial_qscale;
if (avcodec_open2(ctx, encoder, NULL) < 0) {
avcodec_free_context(&ctx);
return NULL;
}
return ctx;
}
static int decode_mjpeg_frame(AVCodecContext *decoder, const uint8_t *data, int size, AVFrame **frame) {
AVPacket *pkt;
int ret;
*frame = NULL;
pkt = av_packet_alloc();
if (pkt == NULL) {
return -1;
}
pkt->data = (uint8_t *) data;
pkt->size = size;
ret = avcodec_send_packet(decoder, pkt);
if (ret < 0) {
av_packet_free(&pkt);
return -1;
}
*frame = av_frame_alloc();
if (*frame == NULL) {
av_packet_free(&pkt);
return -1;
}
ret = avcodec_receive_frame(decoder, *frame);
av_packet_free(&pkt);
if (ret < 0) {
av_frame_free(frame);
return -1;
}
return 0;
}
static struct SwsContext *create_scaler(AVFrame *src, const worker_config_t *cfg) {
return sws_getContext(
src->width,
src->height,
src->format,
cfg->output_width,
cfg->output_height,
AV_PIX_FMT_YUVJ420P,
SWS_BILINEAR,
NULL,
NULL,
NULL);
}
static int scale_frame(struct SwsContext *sws_ctx, const worker_config_t *cfg, AVFrame *src, AVFrame **dst) {
int ret;
if (sws_ctx == NULL) {
return -1;
}
*dst = av_frame_alloc();
if (*dst == NULL) {
return -1;
}
(*dst)->width = cfg->output_width;
(*dst)->height = cfg->output_height;
(*dst)->format = AV_PIX_FMT_YUVJ420P;
if (av_frame_get_buffer(*dst, 0) < 0) {
av_frame_free(dst);
return -1;
}
ret = sws_scale(
sws_ctx,
(const uint8_t *const *) src->data,
src->linesize,
0,
src->height,
(*dst)->data,
(*dst)->linesize);
if (ret < 0) {
av_frame_free(dst);
return -1;
}
return 0;
}
static int encode_frame(AVCodecContext *encoder, AVFrame *frame, int qscale, AVPacket **pkt) {
int ret;
*pkt = av_packet_alloc();
if (*pkt == NULL) {
return -1;
}
encoder->global_quality = FF_QP2LAMBDA * qscale;
frame->quality = encoder->global_quality;
ret = avcodec_send_frame(encoder, frame);
if (ret < 0) {
av_packet_free(pkt);
return -1;
}
ret = avcodec_receive_packet(encoder, *pkt);
if (ret < 0) {
av_packet_free(pkt);
return -1;
}
return 0;
}
static int video_sender_init(video_sender_t *sender, const worker_config_t *cfg) {
kcp_conn_options_t options;
if (sender == NULL) {
errno = EINVAL;
return -1;
}
CLEAR(*sender);
snprintf(sender->target_peer, sizeof(sender->target_peer), "%s", cfg->target_peer);
kcp_conn_options_set_video_defaults(&options);
sender->client = kcp_client_dial_with_options(
cfg->server_addr,
cfg->relay_addr,
cfg->peer_id,
cfg->bind_ip,
cfg->bind_device,
&options,
NULL,
NULL,
NULL,
cfg->stats_interval_ms);
if (sender->client == NULL) {
return -1;
}
fprintf(stderr, "B-side video worker connected as %s -> %s\n", kcp_client_id(sender->client), sender->target_peer);
return 0;
}
static int video_sender_send_packet(video_sender_t *sender, const AVPacket *encoded_pkt) {
if (sender == NULL || sender->client == NULL || encoded_pkt == NULL) {
errno = EINVAL;
return -1;
}
return kcp_client_send_binary(sender->client, sender->target_peer, encoded_pkt->data, (size_t) encoded_pkt->size);
}
static void video_sender_close(video_sender_t *sender) {
if (sender == NULL || sender->client == NULL) {
return;
}
kcp_client_close(sender->client);
kcp_client_free(sender->client);
sender->client = NULL;
}
static void *control_thread_main(void *opaque) {
control_thread_args_t *args = (control_thread_args_t *) opaque;
char line[512];
while (fgets(line, sizeof(line), args->control_stream) != NULL) {
cJSON *root = cJSON_Parse(line);
cJSON *type_item;
if (root == NULL) {
continue;
}
type_item = cJSON_GetObjectItemCaseSensitive(root, "type");
if (!cJSON_IsString(type_item) || type_item->valuestring == NULL) {
cJSON_Delete(root);
continue;
}
if (strcmp(type_item->valuestring, "shutdown") == 0) {
runtime_set_shutdown(args->runtime, true);
cJSON_Delete(root);
return NULL;
}
if (strcmp(type_item->valuestring, "set_profile") == 0) {
cJSON *fps = cJSON_GetObjectItemCaseSensitive(root, "fps");
cJSON *qscale = cJSON_GetObjectItemCaseSensitive(root, "jpeg_quality_qscale");
cJSON *max_frame_bytes = cJSON_GetObjectItemCaseSensitive(root, "max_frame_bytes");
runtime_set_profile(
args->runtime,
cJSON_IsNumber(fps) ? fps->valueint : -1,
cJSON_IsNumber(qscale) ? qscale->valueint : -1,
cJSON_IsNumber(max_frame_bytes) ? max_frame_bytes->valueint : -1);
}
cJSON_Delete(root);
}
runtime_set_shutdown(args->runtime, true);
return NULL;
}
int main(void) {
worker_config_t cfg;
runtime_state_t runtime;
video_sender_t sender;
control_thread_args_t control_args;
pthread_t control_thread;
FILE *control_stream = NULL;
FILE *telemetry_stream = NULL;
Buffer *buffers = NULL;
int num_buffers = 0;
int camera_fd = -1;
int control_fd = env_as_int(WORKER_CONTROL_FD_ENV, WORKER_CONTROL_FD);
int telemetry_fd = env_as_int(WORKER_TELEMETRY_FD_ENV, WORKER_TELEMETRY_FD);
enum v4l2_buf_type stream_type = V4L2_BUF_TYPE_VIDEO_CAPTURE;
AVCodecContext *decoder = NULL;
AVCodecContext *encoder = NULL;
struct SwsContext *sws_ctx = NULL;
double next_deadline_ms = 0.0;
double next_metrics_ms = 0.0;
int exit_code = 1;
int index;
int sws_src_width = 0;
int sws_src_height = 0;
int sws_src_format = -1;
bool control_thread_started = false;
av_log_set_level(AV_LOG_ERROR);
signal(SIGINT, handle_signal);
signal(SIGTERM, handle_signal);
CLEAR(sender);
if (load_worker_config(&cfg) != 0) {
return 1;
}
CLEAR(runtime);
pthread_mutex_init(&runtime.lock, NULL);
pthread_mutex_init(&runtime.telemetry_lock, NULL);
runtime.target_fps = cfg.initial_fps;
runtime.jpeg_quality_qscale = cfg.initial_qscale;
runtime.max_frame_bytes = cfg.initial_max_frame_bytes;
control_stream = fdopen(control_fd, "r");
telemetry_stream = fdopen(telemetry_fd, "w");
if (control_stream == NULL || telemetry_stream == NULL) {
perror("fdopen worker control/telemetry");
goto cleanup;
}
setvbuf(telemetry_stream, NULL, _IOLBF, 0);
runtime.telemetry_stream = telemetry_stream;
control_args.control_stream = control_stream;
control_args.runtime = &runtime;
if (pthread_create(&control_thread, NULL, control_thread_main, &control_args) != 0) {
perror("pthread_create");
goto cleanup;
}
control_thread_started = true;
camera_fd = open_v4l2_device(cfg.video_device);
if (camera_fd < 0) {
goto cleanup_join_thread;
}
if (init_v4l2_device(camera_fd, &cfg) != 0) {
goto cleanup_join_thread;
}
if (init_mmap(camera_fd, &buffers, &num_buffers) != 0) {
goto cleanup_join_thread;
}
if (queue_all_buffers(camera_fd, num_buffers) != 0) {
goto cleanup_join_thread;
}
if (ioctl(camera_fd, VIDIOC_STREAMON, &stream_type) < 0) {
perror("VIDIOC_STREAMON");
goto cleanup_join_thread;
}
decoder = create_mjpeg_decoder(&cfg);
encoder = create_mjpeg_encoder(&cfg);
if (decoder == NULL || encoder == NULL) {
fprintf(stderr, "failed to create codecs\n");
goto cleanup_join_thread;
}
if (video_sender_init(&sender, &cfg) != 0) {
perror("video_sender_init");
goto cleanup_join_thread;
}
next_deadline_ms = monotonic_ms();
next_metrics_ms = next_deadline_ms + 100.0;
while (!runtime_should_stop(&runtime)) {
AVFrame *decoded_frame = NULL;
AVFrame *scaled_frame = NULL;
AVPacket *encoded_pkt = NULL;
struct v4l2_buffer buf;
fd_set fds;
struct timeval tv;
int select_result;
int fps;
int qscale;
int max_frame_bytes;
int encode_us = 0;
bool sent = false;
const char *drop_reason = "";
double now_ms = monotonic_ms();
double encode_start_ms;
double encode_end_ms;
runtime_get_profile(&runtime, &fps, &qscale, &max_frame_bytes);
if (fps < 1) {
fps = 1;
}
FD_ZERO(&fds);
FD_SET(camera_fd, &fds);
tv.tv_sec = 2;
tv.tv_usec = 0;
select_result = select(camera_fd + 1, &fds, NULL, NULL, &tv);
if (select_result <= 0) {
if (select_result < 0 && errno == EINTR) {
continue;
}
fprintf(stderr, "select timeout/error on camera\n");
continue;
}
if (dequeue_latest_buffer(camera_fd, &buf) != 0) {
if (errno == EAGAIN) {
continue;
}
perror("VIDIOC_DQBUF");
break;
}
now_ms = monotonic_ms();
if (now_ms < next_deadline_ms) {
drop_reason = "paced_drop";
goto requeue_and_report;
}
next_deadline_ms = now_ms + (1000.0 / (double) fps);
if (decode_mjpeg_frame(decoder, (uint8_t *) buffers[buf.index].start, (int) buf.bytesused, &decoded_frame) != 0) {
drop_reason = "decode_failed";
goto requeue_and_report;
}
if (
sws_ctx == NULL
|| sws_src_width != decoded_frame->width
|| sws_src_height != decoded_frame->height
|| sws_src_format != decoded_frame->format
) {
sws_freeContext(sws_ctx);
sws_ctx = create_scaler(decoded_frame, &cfg);
sws_src_width = decoded_frame->width;
sws_src_height = decoded_frame->height;
sws_src_format = decoded_frame->format;
}
if (scale_frame(sws_ctx, &cfg, decoded_frame, &scaled_frame) != 0) {
drop_reason = "scale_failed";
goto requeue_and_report;
}
encode_start_ms = monotonic_ms();
if (encode_frame(encoder, scaled_frame, qscale, &encoded_pkt) != 0) {
drop_reason = "encode_failed";
goto requeue_and_report;
}
encode_end_ms = monotonic_ms();
encode_us = (int) ((encode_end_ms - encode_start_ms) * 1000.0);
if (encoded_pkt->size > max_frame_bytes) {
drop_reason = "frame_too_large";
goto requeue_and_report;
}
if (video_sender_send_packet(&sender, encoded_pkt) != 0) {
perror("video_sender_send_packet");
drop_reason = "send_failed";
goto requeue_and_report;
}
sent = true;
requeue_and_report:
telemetry_write_frame_stat(
&runtime,
encoded_pkt != NULL ? encoded_pkt->size : 0,
encode_us,
sent,
drop_reason);
if (ioctl(camera_fd, VIDIOC_QBUF, &buf) < 0) {
perror("VIDIOC_QBUF");
av_frame_free(&decoded_frame);
av_frame_free(&scaled_frame);
if (encoded_pkt != NULL) {
av_packet_free(&encoded_pkt);
}
break;
}
av_frame_free(&decoded_frame);
av_frame_free(&scaled_frame);
if (encoded_pkt != NULL) {
av_packet_free(&encoded_pkt);
}
now_ms = monotonic_ms();
if (sender.client != NULL && now_ms >= next_metrics_ms) {
kcp_conn_metrics_t metrics;
if (kcp_client_metrics_snapshot(sender.client, &metrics) == 0) {
telemetry_write_kcp_metrics(&runtime, &metrics);
}
next_metrics_ms = now_ms + 100.0;
}
}
exit_code = 0;
cleanup_join_thread:
runtime_set_shutdown(&runtime, true);
if (control_stream != NULL) {
fclose(control_stream);
control_stream = NULL;
}
if (control_thread_started) {
pthread_join(control_thread, NULL);
}
cleanup:
if (control_stream != NULL) {
fclose(control_stream);
control_stream = NULL;
}
if (camera_fd >= 0) {
ioctl(camera_fd, VIDIOC_STREAMOFF, &stream_type);
}
if (buffers != NULL) {
for (index = 0; index < num_buffers; ++index) {
if (buffers[index].start != NULL && buffers[index].start != MAP_FAILED) {
munmap(buffers[index].start, buffers[index].length);
}
}
free(buffers);
}
video_sender_close(&sender);
if (encoder != NULL) {
avcodec_free_context(&encoder);
}
if (decoder != NULL) {
avcodec_free_context(&decoder);
}
sws_freeContext(sws_ctx);
if (camera_fd >= 0) {
close(camera_fd);
}
if (telemetry_stream != NULL) {
fclose(telemetry_stream);
telemetry_stream = NULL;
}
pthread_mutex_destroy(&runtime.lock);
pthread_mutex_destroy(&runtime.telemetry_lock);
return exit_code;
}

View File

@@ -53,6 +53,15 @@ static void *kcppeer_receive_thread_main(void *arg) {
}
fprintf(stderr, "received file from %s to %s: %s (%lu bytes) -> %s\n", msg.from, msg.to, msg.file_name, (unsigned long) msg.body_len, persisted_path);
break;
case MSG_TYPE_BINARY:
if (kcp_client_persist_message(ctx->client, &msg, ctx->inbox_dir, persisted_path, sizeof(persisted_path)) != 0) {
fprintf(stderr, "kcppeer: persist binary payload from %s to %s failed\n", msg.from, msg.to);
protocol_message_clear(&msg);
ctx->rc = -1;
return NULL;
}
fprintf(stderr, "received binary payload from %s to %s (%lu bytes) -> %s\n", msg.from, msg.to, (unsigned long) msg.body_len, persisted_path);
break;
case MSG_TYPE_ERROR:
fprintf(stderr, "received error from %s to %s: %.*s\n", msg.from, msg.to, (int) msg.body_len, msg.body == NULL ? "" : (const char *) msg.body);
break;

View File

@@ -50,6 +50,15 @@ static void *udppeer_receive_thread_main(void *arg) {
}
fprintf(stderr, "received file from %s to %s: %s (%lu bytes) -> %s\n", msg.from, msg.to, msg.file_name, (unsigned long) msg.body_len, persisted_path);
break;
case MSG_TYPE_BINARY:
if (udp_client_persist_message(ctx->client, &msg, ctx->inbox_dir, persisted_path, sizeof(persisted_path)) != 0) {
fprintf(stderr, "udppeer: persist binary payload from %s to %s failed\n", msg.from, msg.to);
protocol_message_clear(&msg);
ctx->rc = -1;
return NULL;
}
fprintf(stderr, "received binary payload from %s to %s (%lu bytes) -> %s\n", msg.from, msg.to, (unsigned long) msg.body_len, persisted_path);
break;
case MSG_TYPE_ERROR:
fprintf(stderr, "received error from %s to %s: %.*s\n", msg.from, msg.to, (int) msg.body_len, msg.body == NULL ? "" : (const char *) msg.body);
break;

View File

@@ -0,0 +1,652 @@
// camera_pipeline_ifdef_fixed.c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/ioctl.h>
#include <sys/mman.h>
#include <sys/time.h>
#include <linux/videodev2.h>
#include <time.h>
// FFmpeg头文件 - 使用纯C包含
#include <libavcodec/avcodec.h>
#include <libavformat/avformat.h>
#include <libavutil/avutil.h>
#include <libavutil/imgutils.h>
#include <libavutil/opt.h>
#include <libswscale/swscale.h>
#include "peer_kcp_client.h"
// ==========================================
// 1. 配置区域:在这里开启或关闭时间打印
// ==========================================
#define DEBUG_TIMING // 注释掉这一行,所有时间打印和计算都会消失
// 定义打印宏
#ifdef DEBUG_TIMING
#define PRINT_TIME(fmt, ...) printf(fmt, ##__VA_ARGS__)
#else
#define PRINT_TIME(fmt, ...) // 什么都不做,编译器会优化掉
#endif
#define WIDTH 1280
#define HEIGHT 720
#define OUTPUT_WIDTH 640
#define OUTPUT_HEIGHT 360
#define NUM_BUFFERS 4
#define CLEAR(x) memset(&(x), 0, sizeof(x))
#define VIDEO_SERVER_ADDR_ENV "OMNI_VIDEO_SERVER_ADDR"
#define VIDEO_RELAY_ADDR_ENV "OMNI_VIDEO_RELAY_VIA"
#define VIDEO_BIND_IP_ENV "OMNI_VIDEO_BIND_IP"
#define VIDEO_BIND_DEVICE_ENV "OMNI_VIDEO_BIND_DEVICE"
#define VIDEO_PEER_ID_ENV "OMNI_VIDEO_PEER_ID"
#define VIDEO_TARGET_PEER_ENV "OMNI_VIDEO_TARGET_PEER"
#define VIDEO_DEFAULT_PEER_ID "peer-b-video"
#define VIDEO_DEFAULT_TARGET_PEER "peer-a-video"
typedef struct
{
kcp_client_t *client;
char target_peer[OMNI_MAX_PEER_ID];
} VideoSender;
static int video_sender_init(VideoSender *sender);
static int video_sender_send_packet(VideoSender *sender, const AVPacket *encoded_pkt);
static void video_sender_close(VideoSender *sender);
typedef struct
{
void *start;
size_t length;
} Buffer;
double get_time_ms()
{
struct timespec ts;
clock_gettime(CLOCK_MONOTONIC, &ts);
return ts.tv_sec * 1000.0 + ts.tv_nsec / 1000000.0;
}
static const char *env_or_default(const char *name, const char *fallback)
{
const char *value = getenv(name);
if (value != NULL && value[0] != '\0')
return value;
return fallback;
}
static int video_sender_init(VideoSender *sender)
{
const char *server_addr = getenv(VIDEO_SERVER_ADDR_ENV);
const char *relay_addr = env_or_default(VIDEO_RELAY_ADDR_ENV, "");
const char *bind_ip = env_or_default(VIDEO_BIND_IP_ENV, "");
const char *bind_device = env_or_default(VIDEO_BIND_DEVICE_ENV, "");
const char *peer_id = env_or_default(VIDEO_PEER_ID_ENV, VIDEO_DEFAULT_PEER_ID);
const char *target_peer = env_or_default(VIDEO_TARGET_PEER_ENV, VIDEO_DEFAULT_TARGET_PEER);
kcp_conn_options_t options;
if (sender == NULL)
{
errno = EINVAL;
return -1;
}
if (server_addr == NULL || server_addr[0] == '\0')
{
errno = EINVAL;
fprintf(stderr, "%s is required\n", VIDEO_SERVER_ADDR_ENV);
return -1;
}
memset(sender, 0, sizeof(*sender));
snprintf(sender->target_peer, sizeof(sender->target_peer), "%s", target_peer);
kcp_conn_options_set_video_defaults(&options);
sender->client = kcp_client_dial_with_options(
server_addr,
relay_addr,
peer_id,
bind_ip,
bind_device,
&options,
NULL,
NULL,
NULL,
KCP_DEFAULT_STATS_INTERVAL_MS);
if (sender->client == NULL)
return -1;
fprintf(stderr, "Video sender connected as %s -> %s\n",
kcp_client_id(sender->client), sender->target_peer);
return 0;
}
static int video_sender_send_packet(VideoSender *sender, const AVPacket *encoded_pkt)
{
if (sender == NULL || sender->client == NULL || encoded_pkt == NULL)
{
errno = EINVAL;
return -1;
}
return kcp_client_send_binary(
sender->client,
sender->target_peer,
encoded_pkt->data,
(size_t)encoded_pkt->size);
}
static void video_sender_close(VideoSender *sender)
{
if (sender == NULL || sender->client == NULL)
return;
kcp_client_close(sender->client);
kcp_client_free(sender->client);
sender->client = NULL;
}
int open_v4l2_device(const char *device)
{
int fd = open(device, O_RDWR | O_NONBLOCK);
if (fd < 0)
{
perror("open device");
return -1;
}
return fd;
}
int init_v4l2_device(int fd)
{
struct v4l2_format fmt = {0};
fmt.type = V4L2_BUF_TYPE_VIDEO_CAPTURE;
fmt.fmt.pix.width = WIDTH;
fmt.fmt.pix.height = HEIGHT;
fmt.fmt.pix.pixelformat = V4L2_PIX_FMT_MJPEG;
fmt.fmt.pix.field = V4L2_FIELD_NONE;
if (ioctl(fd, VIDIOC_S_FMT, &fmt) < 0)
{
perror("VIDIOC_S_FMT");
return -1;
}
PRINT_TIME("Set format: %dx%d MJPEG\n", WIDTH, HEIGHT);
return 0;
}
int init_mmap(int fd, Buffer **buffers, int *num_buffers)
{
struct v4l2_requestbuffers req = {0};
req.count = NUM_BUFFERS;
req.type = V4L2_BUF_TYPE_VIDEO_CAPTURE;
req.memory = V4L2_MEMORY_MMAP;
if (ioctl(fd, VIDIOC_REQBUFS, &req) < 0)
{
perror("VIDIOC_REQBUFS");
return -1;
}
*num_buffers = req.count;
*buffers = (Buffer *)calloc(req.count, sizeof(Buffer));
for (int i = 0; i < req.count; i++)
{
struct v4l2_buffer buf = {0};
buf.type = V4L2_BUF_TYPE_VIDEO_CAPTURE;
buf.memory = V4L2_MEMORY_MMAP;
buf.index = i;
if (ioctl(fd, VIDIOC_QUERYBUF, &buf) < 0)
{
perror("VIDIOC_QUERYBUF");
return -1;
}
(*buffers)[i].length = buf.length;
(*buffers)[i].start = mmap(NULL, buf.length,
PROT_READ | PROT_WRITE,
MAP_SHARED, fd, buf.m.offset);
if ((*buffers)[i].start == MAP_FAILED)
{
perror("mmap");
return -1;
}
}
return 0;
}
AVCodecContext *create_mjpeg_decoder()
{
const AVCodec *decoder = avcodec_find_decoder(AV_CODEC_ID_MJPEG);
if (!decoder)
{
fprintf(stderr, "MJPEG decoder not found\n");
return NULL;
}
AVCodecContext *ctx = avcodec_alloc_context3(decoder);
ctx->width = WIDTH;
ctx->height = HEIGHT;
ctx->pix_fmt = AV_PIX_FMT_YUVJ420P; // 使用YUVJ420P
ctx->color_range = AVCOL_RANGE_JPEG; // JPEG范围
ctx->thread_count = 1;
AVDictionary *opts = NULL;
av_dict_set(&opts, "flags2", "+fast", 0);
if (avcodec_open2(ctx, decoder, &opts) < 0)
{
avcodec_free_context(&ctx);
av_dict_free(&opts);
return NULL;
}
av_dict_free(&opts);
printf("Decoder created with format: YUVJ420P\n");
return ctx;
}
AVCodecContext *create_mjpeg_encoder()
{
const AVCodec *encoder = avcodec_find_encoder(AV_CODEC_ID_MJPEG);
if (!encoder)
{
fprintf(stderr, "MJPEG encoder not found\n");
return NULL;
}
AVCodecContext *ctx = avcodec_alloc_context3(encoder);
ctx->width = OUTPUT_WIDTH;
ctx->height = OUTPUT_HEIGHT;
ctx->pix_fmt = AV_PIX_FMT_YUVJ420P; // 使用YUVJ420P
ctx->time_base = (AVRational){1, 30};
ctx->qmin = 8;
ctx->qmax = 31;
ctx->flags |= AV_CODEC_FLAG_QSCALE;
ctx->global_quality = FF_QP2LAMBDA * 5;
AVDictionary *opts = NULL;
av_dict_set(&opts, "huffman", "default", 0);
if (avcodec_open2(ctx, encoder, &opts) < 0)
{
avcodec_free_context(&ctx);
av_dict_free(&opts);
return NULL;
}
av_dict_free(&opts);
printf("Encoder created with format: YUVJ420P\n");
return ctx;
}
int decode_mjpeg_frame(AVCodecContext *decoder, const uint8_t *data, int size, AVFrame **frame)
{
if (frame == NULL)
return -1;
*frame = NULL;
AVPacket *pkt = av_packet_alloc();
if (!pkt)
return -1;
pkt->data = (uint8_t *)data;
pkt->size = size;
int ret = avcodec_send_packet(decoder, pkt);
if (ret < 0)
{
av_packet_free(&pkt);
return -1;
}
*frame = av_frame_alloc();
if (!*frame)
{
av_packet_free(&pkt);
return -1;
}
ret = avcodec_receive_frame(decoder, *frame);
av_packet_free(&pkt);
if (ret < 0)
av_frame_free(frame);
return ret;
}
int scale_frame(AVFrame *src, AVFrame **dst)
{
// 简单缩放,不设置复杂的色彩空间参数
struct SwsContext *sws_ctx = sws_getContext(
src->width, src->height, src->format,
OUTPUT_WIDTH, OUTPUT_HEIGHT, AV_PIX_FMT_YUVJ420P,
SWS_BILINEAR, NULL, NULL, NULL);
if (!sws_ctx)
{
fprintf(stderr, "Failed to create sws context\n");
return -1;
}
*dst = av_frame_alloc();
if (!*dst)
{
sws_freeContext(sws_ctx);
return -1;
}
(*dst)->width = OUTPUT_WIDTH;
(*dst)->height = OUTPUT_HEIGHT;
(*dst)->format = AV_PIX_FMT_YUVJ420P;
if (av_frame_get_buffer(*dst, 0) < 0)
{
fprintf(stderr, "Failed to allocate frame buffer\n");
av_frame_free(dst);
sws_freeContext(sws_ctx);
return -1;
}
int ret = sws_scale(sws_ctx, (const uint8_t *const *)src->data, src->linesize,
0, src->height, (*dst)->data, (*dst)->linesize);
sws_freeContext(sws_ctx);
if (ret < 0)
{
fprintf(stderr, "sws_scale failed\n");
av_frame_free(dst);
return -1;
}
return 0;
}
int encode_frame(AVCodecContext *encoder, AVFrame *frame, AVPacket **pkt)
{
if (pkt == NULL)
return -1;
*pkt = NULL;
*pkt = av_packet_alloc();
if (!*pkt)
return -1;
int ret = avcodec_send_frame(encoder, frame);
if (ret < 0)
{
av_packet_free(pkt);
return -1;
}
ret = avcodec_receive_packet(encoder, *pkt);
if (ret < 0)
av_packet_free(pkt);
return ret;
}
int main()
{
VideoSender sender;
memset(&sender, 0, sizeof(sender));
PRINT_TIME("=== V4L2 Direct Capture + FFmpeg Processing ===\n");
// 1. Open V4L2 device
int fd = open_v4l2_device("/dev/video0");
if (fd < 0)
{
fprintf(stderr, "Failed to open camera device\n");
return 1;
}
// 2. Initialize V4L2
if (init_v4l2_device(fd) < 0)
{
close(fd);
return 1;
}
// 3. Setup MMAP buffers
Buffer *buffers = NULL;
int num_buffers = 0;
if (init_mmap(fd, &buffers, &num_buffers) < 0)
{
close(fd);
return 1;
}
// 4. Create FFmpeg codecs
AVCodecContext *decoder = create_mjpeg_decoder();
AVCodecContext *encoder = create_mjpeg_encoder();
if (!decoder || !encoder)
{
fprintf(stderr, "Failed to create codecs\n");
close(fd);
return 1;
}
if (video_sender_init(&sender) < 0)
{
perror("video_sender_init");
video_sender_close(&sender);
avcodec_free_context(&encoder);
avcodec_free_context(&decoder);
close(fd);
return 1;
}
// 5. Queue buffers
for (int i = 0; i < num_buffers; i++)
{
struct v4l2_buffer buf = {0};
buf.type = V4L2_BUF_TYPE_VIDEO_CAPTURE;
buf.memory = V4L2_MEMORY_MMAP;
buf.index = i;
if (ioctl(fd, VIDIOC_QBUF, &buf) < 0)
{
perror("VIDIOC_QBUF");
close(fd);
return 1;
}
}
// 6. Start streaming
enum v4l2_buf_type type = V4L2_BUF_TYPE_VIDEO_CAPTURE;
if (ioctl(fd, VIDIOC_STREAMON, &type) < 0)
{
perror("VIDIOC_STREAMON");
close(fd);
return 1;
}
// 7. Benchmark
// 使用宏控制打印表头
PRINT_TIME("\nRunning benchmark (100 frames)...\n");
PRINT_TIME("Frame | Capture | Decode | Scale | Encode | Total | Size | Marker\n");
PRINT_TIME("------|---------|--------|-------|--------|-------|------|--------\n");
for (int i = 0; i < 100; i++)
{
// 只有在开启 DEBUG_TIMING 时才声明这些时间变量
#ifdef DEBUG_TIMING
double total_start = get_time_ms();
double capture_start, capture_end;
double decode_start, decode_end;
double scale_start, scale_end;
double encode_start, encode_end;
#endif
// Wait for frame
fd_set fds;
FD_ZERO(&fds);
FD_SET(fd, &fds);
struct timeval tv = {2, 0};
int r = select(fd + 1, &fds, NULL, NULL, &tv);
if (r <= 0)
{
PRINT_TIME("Timeout waiting for frame\n");
break;
}
#ifdef DEBUG_TIMING
capture_start = get_time_ms();
#endif
// Dequeue buffer
struct v4l2_buffer buf = {0};
buf.type = V4L2_BUF_TYPE_VIDEO_CAPTURE;
buf.memory = V4L2_MEMORY_MMAP;
if (ioctl(fd, VIDIOC_DQBUF, &buf) < 0)
{
perror("VIDIOC_DQBUF");
break;
}
#ifdef DEBUG_TIMING
capture_end = get_time_ms();
#endif
// Decode
#ifdef DEBUG_TIMING
decode_start = get_time_ms();
#endif
AVFrame *decoded_frame = NULL;
int ret = decode_mjpeg_frame(decoder,
(uint8_t *)buffers[buf.index].start, buf.bytesused, &decoded_frame);
#ifdef DEBUG_TIMING
decode_end = get_time_ms();
#endif
if (ret < 0 || !decoded_frame)
{
PRINT_TIME("Frame %d: Decode failed\n", i + 1);
ioctl(fd, VIDIOC_QBUF, &buf);
continue;
}
// Scale
#ifdef DEBUG_TIMING
scale_start = get_time_ms();
#endif
AVFrame *scaled_frame = NULL;
if (scale_frame(decoded_frame, &scaled_frame) < 0)
{
PRINT_TIME("Frame %d: Scale failed\n", i + 1);
av_frame_free(&decoded_frame);
ioctl(fd, VIDIOC_QBUF, &buf);
continue;
}
#ifdef DEBUG_TIMING
scale_end = get_time_ms();
#endif
// Encode
#ifdef DEBUG_TIMING
encode_start = get_time_ms();
#endif
AVPacket *encoded_pkt = NULL;
if (encode_frame(encoder, scaled_frame, &encoded_pkt) < 0)
{
PRINT_TIME("Frame %d: Encode failed\n", i + 1);
}
#ifdef DEBUG_TIMING
if (encoded_pkt && i % 50 == 0)
{
char filename[100];
sprintf(filename, "frame_%04d.jpg", i + 1);
FILE *f = fopen(filename, "wb");
if (f)
{
fwrite(encoded_pkt->data, 1, encoded_pkt->size, f);
fclose(f);
PRINT_TIME("Saved as %s\n", filename);
}
}
#endif
#ifdef DEBUG_TIMING
encode_end = get_time_ms();
double total_end = get_time_ms();
#endif
// 打印结果
#ifdef DEBUG_TIMING
PRINT_TIME("%5d | %7.1f | %6.1f | %5.1f | %6.1f | %5.1f | %4d KB | 0x%02x\n",
i + 1,
capture_end - capture_start,
decode_end - decode_start,
scale_end - scale_start,
encode_end - encode_start,
total_end - total_start,
encoded_pkt ? encoded_pkt->size / 1024 : 0,
encoded_pkt && encoded_pkt->size > 1 ? encoded_pkt->data[1] : 0);
#else
// 如果不开启宏,也打印一些基本信息
printf("Frame %d processed\n", i + 1);
#endif
if (encoded_pkt && video_sender_send_packet(&sender, encoded_pkt) != 0)
{
perror("video_sender_send_packet");
av_frame_free(&decoded_frame);
av_frame_free(&scaled_frame);
av_packet_free(&encoded_pkt);
ioctl(fd, VIDIOC_QBUF, &buf);
break;
}
// Cleanup
av_frame_free(&decoded_frame);
av_frame_free(&scaled_frame);
if (encoded_pkt)
av_packet_free(&encoded_pkt);
// Requeue buffer
if (ioctl(fd, VIDIOC_QBUF, &buf) < 0)
{
perror("VIDIOC_QBUF");
break;
}
}
// 8. Stop streaming
ioctl(fd, VIDIOC_STREAMOFF, &type);
// 9. Cleanup
for (int i = 0; i < num_buffers; i++)
{
if (buffers[i].start != MAP_FAILED)
{
munmap(buffers[i].start, buffers[i].length);
}
}
free(buffers);
video_sender_close(&sender);
avcodec_free_context(&encoder);
avcodec_free_context(&decoder);
close(fd);
return 0;
}
// gcc -o v1_camera_pipeline_ifdef v1_camera_pipeline_ifdef.c $(pkg-config --cflags --libs libavformat libavcodec libavutil libswscale)

View File

@@ -0,0 +1,51 @@
transport:
server_addr: "81.70.156.140:10909"
relay_via: "106.55.173.235:10909"
bind_ip: ""
bind_device: ""
control_sender:
peer_id: "peer-a-ctrl"
target_peer: "peer-b-ctrl"
nodelay: 1
interval_ms: 5
resend: 2
nc: 1
sndwnd: 32
rcvwnd: 32
mtu: 1400
stats_interval_ms: 100
video_receiver:
peer_id: "peer-a-video"
buffer_bytes: 1048576
nodelay: 1
interval_ms: 10
resend: 2
nc: 1
sndwnd: 256
rcvwnd: 256
mtu: 1400
stats_interval_ms: 100
daemon:
socket_path: "/tmp/omnisocket-a-side.sock"
reconnect_delay_ms: 2000
telemetry_interval_ms: 100
analog_send_hz: 100
frame_stale_ms: 500
policy:
health_window_ms: 2000
green_srtt_ms: 35
yellow_srtt_ms: 60
retrans_red_threshold: 10
profile_green:
fps: 15
max_frame_kb: 60
profile_yellow:
fps: 10
max_frame_kb: 40
profile_red:
fps: 5
max_frame_kb: 20

View File

@@ -0,0 +1,58 @@
transport:
server_addr: ""
relay_via: "81.70.156.140:10909"
bind_ip: ""
bind_device: ""
control_receiver:
peer_id: "peer-b-ctrl"
nodelay: 1
interval_ms: 5
resend: 2
nc: 1
sndwnd: 32
rcvwnd: 32
mtu: 1400
stats_interval_ms: 100
queue_capacity: 256
video_sender:
enabled: true
peer_id: "peer-b-video"
target_peer: "peer-a-video"
binary_path: "bin/b_side_video_sender"
device: "/dev/video0"
capture_width: 1280
capture_height: 720
output_width: 640
output_height: 360
fps: 10
jpeg_quality_qscale: 8
max_frame_bytes: 40960
stats_interval_ms: 100
daemon:
socket_path: "/tmp/omnisocket-b-side.sock"
ctrl_socket_path: "/tmp/omnisocket-b-ctrl.sock"
reconnect_delay_ms: 2000
telemetry_interval_ms: 100
worker_restart_delay_ms: 2000
policy:
mode: "manual"
health_window_ms: 2000
green_srtt_ms: 30
yellow_srtt_ms: 55
retrans_red_threshold: 8
profile_green:
fps: 10
jpeg_quality_qscale: 8
max_frame_bytes: 40960
profile_yellow:
fps: 7
jpeg_quality_qscale: 12
max_frame_bytes: 28672
profile_red:
fps: 5
jpeg_quality_qscale: 16
max_frame_bytes: 20480

View File

@@ -0,0 +1,41 @@
transport:
server_addr: "127.0.0.1:10909"
relay_via: ""
bind_ip: ""
bind_device: ""
control_sender:
peer_id: "peer-a-ctrl"
target_peer: "peer-b-ctrl"
joy_topic: "/xbox_data"
deadzone: 0.10
analog_epsilon: 0.01
dpad_threshold: 0.50
trigger_pressed_threshold: -0.50
control_receiver:
peer_id: "peer-b-ctrl"
motion:
initial_lift: 0.89
lift_step: 0.05
max_surge: 1.0
max_sway: 0.5
max_spin: 0.5
max_lift: 0.90
min_lift: 0.65
surge_step: 0.1
sway_step: 0.1
spin_step: 0.1
video_sender:
peer_id: "peer-b-video"
target_peer: "peer-a-video"
frame_bytes: 30720
frame_interval_ms: 66
video_receiver:
peer_id: "peer-a-video"
# recv_into() requires a buffer large enough for the whole frame.
# If buffer_bytes is smaller than video_sender.frame_bytes, the oversize frame is dropped.
buffer_bytes: 65536

View File

@@ -8,13 +8,26 @@ extern "C" {
#endif
typedef struct kcp_client kcp_client_t;
typedef struct kcp_client_recv_meta {
message_type_t type;
uint64_t id;
char from[OMNI_MAX_PEER_ID];
char to[OMNI_MAX_PEER_ID];
char file_name[OMNI_MAX_FILE_NAME];
size_t body_len;
} kcp_client_recv_meta_t;
kcp_client_t *kcp_client_dial_with_options(const char *server_addr, const char *dial_addr, const char *peer_id, const char *bind_ip, const char *bind_device, const kcp_conn_options_t *options, latency_logger_t *logger, kcp_packet_debug_logger_t *packet_logger, kcp_session_stats_logger_t *stats_logger, int stats_interval_ms);
kcp_client_t *kcp_client_dial(const char *server_addr, const char *dial_addr, const char *peer_id, const char *bind_ip, const char *bind_device, latency_logger_t *logger, kcp_packet_debug_logger_t *packet_logger, kcp_session_stats_logger_t *stats_logger, int stats_interval_ms);
const char *kcp_client_id(const kcp_client_t *client);
int kcp_client_send_text(kcp_client_t *client, const char *to, const char *text);
int kcp_client_send_binary(kcp_client_t *client, const char *to, const void *data, size_t data_len);
int kcp_client_send_file_path(kcp_client_t *client, const char *to, const char *path);
int kcp_client_receive_timed(kcp_client_t *client, message_t *out_msg, int timeout_ms);
int kcp_client_receive(kcp_client_t *client, message_t *out_msg);
int kcp_client_receive_binary_into(kcp_client_t *client, void *buffer, size_t buffer_len, kcp_client_recv_meta_t *out_meta, int timeout_ms);
int kcp_client_persist_message(kcp_client_t *client, const message_t *msg, const char *inbox_dir, char *out_path, size_t out_path_len);
int kcp_client_metrics_snapshot(kcp_client_t *client, kcp_conn_metrics_t *out_metrics);
int kcp_client_close(kcp_client_t *client);
void kcp_client_free(kcp_client_t *client);

View File

@@ -12,6 +12,7 @@ typedef enum message_type {
MSG_TYPE_FILE = 1,
MSG_TYPE_REGISTER = 2,
MSG_TYPE_ERROR = 3,
MSG_TYPE_BINARY = 4,
MSG_TYPE_INVALID = 255
} message_type_t;

View File

@@ -9,25 +9,92 @@
extern "C" {
#endif
#define KCP_NODELAY 1
#define KCP_INTERVAL 10
#define KCP_RESEND 2
#define KCP_NC 1
#define KCP_WND_SIZE 256
#define KCP_MTU 1400
#define KCP_DEFAULT_NODELAY 1
#define KCP_DEFAULT_INTERVAL_MS 10
#define KCP_DEFAULT_RESEND 2
#define KCP_DEFAULT_NC 1
#define KCP_DEFAULT_SND_WND 256
#define KCP_DEFAULT_RCV_WND 256
#define KCP_DEFAULT_MTU 1400
#define KCP_DEFAULT_STATS_INTERVAL_MS 100
#define KCP_CONTROL_NODELAY 1
#define KCP_CONTROL_INTERVAL_MS 5
#define KCP_CONTROL_RESEND 2
#define KCP_CONTROL_NC 1
#define KCP_CONTROL_SND_WND 32
#define KCP_CONTROL_RCV_WND 32
#define KCP_CONTROL_MTU 1400
#define KCP_VIDEO_NODELAY 1
#define KCP_VIDEO_INTERVAL_MS 10
#define KCP_VIDEO_RESEND 2
#define KCP_VIDEO_NC 1
#define KCP_VIDEO_SND_WND 256
#define KCP_VIDEO_RCV_WND 256
#define KCP_VIDEO_MTU 1400
#define KCP_NODELAY KCP_DEFAULT_NODELAY
#define KCP_INTERVAL KCP_DEFAULT_INTERVAL_MS
#define KCP_RESEND KCP_DEFAULT_RESEND
#define KCP_NC KCP_DEFAULT_NC
#define KCP_WND_SIZE KCP_DEFAULT_SND_WND
#define KCP_MTU KCP_DEFAULT_MTU
typedef struct kcp_conn kcp_conn_t;
typedef struct kcp_listener kcp_listener_t;
typedef struct kcp_conn_metrics {
int connected;
int has_conv;
uint32_t conv;
char local_addr[OMNI_MAX_ADDR_TEXT];
char remote_addr[OMNI_MAX_ADDR_TEXT];
uint32_t rto_ms;
int32_t srtt_ms;
int32_t srttvar_ms;
uint64_t bytes_sent;
uint64_t bytes_received;
uint64_t in_pkts;
uint64_t out_pkts;
uint64_t in_segs;
uint64_t out_segs;
uint64_t retrans_segs;
uint64_t fast_retrans_segs;
uint64_t early_retrans_segs;
uint64_t lost_segs;
uint64_t repeat_segs;
uint64_t in_errs;
uint64_t kcp_in_errs;
uint64_t ring_buffer_snd_queue;
uint64_t ring_buffer_rcv_queue;
uint64_t ring_buffer_snd_buffer;
} kcp_conn_metrics_t;
typedef struct kcp_conn_options {
int nodelay;
int interval_ms;
int resend;
int nc;
int sndwnd;
int rcvwnd;
int mtu;
} kcp_conn_options_t;
void kcp_conn_options_init(kcp_conn_options_t *options);
void kcp_conn_options_set_control_defaults(kcp_conn_options_t *options);
void kcp_conn_options_set_video_defaults(kcp_conn_options_t *options);
kcp_conn_t *kcp_conn_dial_with_options(const char *server_addr, const char *bind_ip, const char *bind_device, const kcp_conn_options_t *options, kcp_packet_debug_logger_t *packet_logger, latency_logger_t *logger, const char *node_role, const char *node_id, kcp_session_stats_logger_t *stats_logger, int stats_interval_ms);
kcp_conn_t *kcp_conn_dial(const char *server_addr, const char *bind_ip, const char *bind_device, kcp_packet_debug_logger_t *packet_logger, latency_logger_t *logger, const char *node_role, const char *node_id, kcp_session_stats_logger_t *stats_logger, int stats_interval_ms);
int kcp_conn_configure_runtime(kcp_conn_t *conn, latency_logger_t *logger, const char *node_role, const char *node_id, kcp_session_stats_logger_t *stats_logger, int stats_interval_ms);
int kcp_conn_apply_options(kcp_conn_t *conn, const kcp_conn_options_t *options);
int kcp_conn_send(kcp_conn_t *conn, const message_t *msg);
int kcp_conn_receive_timed(kcp_conn_t *conn, message_t *out_msg, int timeout_ms);
int kcp_conn_receive(kcp_conn_t *conn, message_t *out_msg);
int kcp_conn_close(kcp_conn_t *conn);
void kcp_conn_free(kcp_conn_t *conn);
uint32_t kcp_conn_conv(const kcp_conn_t *conn);
int kcp_conn_local_addr(const kcp_conn_t *conn, struct sockaddr_storage *addr, socklen_t *addr_len);
int kcp_conn_metrics_snapshot(kcp_conn_t *conn, kcp_conn_metrics_t *out_metrics);
kcp_listener_t *kcp_listener_listen(const char *listen_addr, const char *bind_device, kcp_packet_debug_logger_t *packet_logger, const char *node_role, const char *node_id);
kcp_conn_t *kcp_listener_accept(kcp_listener_t *listener);

View File

@@ -0,0 +1,44 @@
try:
from ._omnisocket import (
MSG_TYPE_BINARY,
MSG_TYPE_ERROR,
MSG_TYPE_FILE,
MSG_TYPE_REGISTER,
MSG_TYPE_TEXT,
Session,
)
except ImportError as exc:
raise ImportError(
"omnisocket extension is not built; run `make python-ext` on a Linux host first"
) from exc
CONTROL_DEFAULTS = {
"nodelay": 1,
"interval_ms": 5,
"resend": 2,
"nc": 1,
"sndwnd": 32,
"rcvwnd": 32,
"mtu": 1400,
}
VIDEO_DEFAULTS = {
"nodelay": 1,
"interval_ms": 10,
"resend": 2,
"nc": 1,
"sndwnd": 256,
"rcvwnd": 256,
"mtu": 1400,
}
__all__ = [
"CONTROL_DEFAULTS",
"VIDEO_DEFAULTS",
"MSG_TYPE_BINARY",
"MSG_TYPE_ERROR",
"MSG_TYPE_FILE",
"MSG_TYPE_REGISTER",
"MSG_TYPE_TEXT",
"Session",
]

View File

@@ -0,0 +1,409 @@
#define PY_SSIZE_T_CLEAN
#include <Python.h>
#include "omnisocket_client.h"
typedef struct PyOmniSession {
PyObject_HEAD
omnisocket_session_t session;
} PyOmniSession;
PyDoc_STRVAR(
PyOmniSession_recv_doc,
"recv(timeout_ms=-1) -> (from_peer, msg_type, payload) | None"
);
PyDoc_STRVAR(
PyOmniSession_recv_into_doc,
"recv_into(buffer, timeout_ms=-1) -> dict | None\n"
"\n"
"The writable buffer must be large enough for the full message body.\n"
"If it is too small, BufferError reports the required size but the\n"
"current frame has already been consumed and is lost."
);
PyDoc_STRVAR(
PyOmniSession_kcp_metrics_doc,
"kcp_metrics() -> dict\n"
"\n"
"Return a snapshot of low-level KCP metrics for the current session."
);
static PyObject *PyOmniSession_new(PyTypeObject *type, PyObject *args, PyObject *kwargs) {
PyOmniSession *self;
(void) args;
(void) kwargs;
self = (PyOmniSession *) type->tp_alloc(type, 0);
if (self == NULL) {
return NULL;
}
if (omnisocket_session_init(&self->session) != 0) {
type->tp_free((PyObject *) self);
return PyErr_SetFromErrno(PyExc_OSError);
}
return (PyObject *) self;
}
static void PyOmniSession_dealloc(PyOmniSession *self) {
omnisocket_session_destroy(&self->session);
Py_TYPE(self)->tp_free((PyObject *) self);
}
static PyObject *PyOmniSession_connect(PyOmniSession *self, PyObject *args, PyObject *kwargs) {
const char *server_addr;
const char *peer_id;
const char *relay_via = "";
const char *bind_ip = "";
const char *bind_device = "";
int nodelay = KCP_DEFAULT_NODELAY;
int interval_ms = KCP_DEFAULT_INTERVAL_MS;
int resend = KCP_DEFAULT_RESEND;
int nc = KCP_DEFAULT_NC;
int sndwnd = KCP_DEFAULT_SND_WND;
int rcvwnd = KCP_DEFAULT_RCV_WND;
int mtu = KCP_DEFAULT_MTU;
int stats_interval_ms = KCP_DEFAULT_STATS_INTERVAL_MS;
kcp_conn_options_t options;
int rc;
static char *kwlist[] = {
"server_addr",
"peer_id",
"relay_via",
"bind_ip",
"bind_device",
"nodelay",
"interval_ms",
"resend",
"nc",
"sndwnd",
"rcvwnd",
"mtu",
"stats_interval_ms",
NULL
};
if (!PyArg_ParseTupleAndKeywords(
args,
kwargs,
"ss|sssiiiiiiii",
kwlist,
&server_addr,
&peer_id,
&relay_via,
&bind_ip,
&bind_device,
&nodelay,
&interval_ms,
&resend,
&nc,
&sndwnd,
&rcvwnd,
&mtu,
&stats_interval_ms)) {
return NULL;
}
kcp_conn_options_init(&options);
options.nodelay = nodelay;
options.interval_ms = interval_ms;
options.resend = resend;
options.nc = nc;
options.sndwnd = sndwnd;
options.rcvwnd = rcvwnd;
options.mtu = mtu;
Py_BEGIN_ALLOW_THREADS
rc = omnisocket_session_connect(
&self->session,
server_addr,
relay_via,
peer_id,
bind_ip,
bind_device,
&options,
stats_interval_ms
);
Py_END_ALLOW_THREADS
if (rc != 0) {
return PyErr_SetFromErrno(PyExc_OSError);
}
Py_RETURN_NONE;
}
static PyObject *PyOmniSession_close(PyOmniSession *self, PyObject *Py_UNUSED(ignored)) {
int rc;
Py_BEGIN_ALLOW_THREADS
rc = omnisocket_session_close(&self->session);
Py_END_ALLOW_THREADS
if (rc != 0) {
return PyErr_SetFromErrno(PyExc_OSError);
}
Py_RETURN_NONE;
}
static PyObject *PyOmniSession_send(PyOmniSession *self, PyObject *args, PyObject *kwargs) {
const char *to;
Py_buffer payload;
int rc;
static char *kwlist[] = {"to", "data", NULL};
memset(&payload, 0, sizeof(payload));
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "sy*", kwlist, &to, &payload)) {
return NULL;
}
Py_BEGIN_ALLOW_THREADS
rc = omnisocket_session_send(&self->session, to, payload.buf, (size_t) payload.len);
Py_END_ALLOW_THREADS
PyBuffer_Release(&payload);
if (rc != 0) {
return PyErr_SetFromErrno(PyExc_OSError);
}
Py_RETURN_NONE;
}
static PyObject *PyOmniSession_recv(PyOmniSession *self, PyObject *args, PyObject *kwargs) {
int timeout_ms = -1;
int rc;
message_t msg;
PyObject *body = NULL;
PyObject *result = NULL;
static char *kwlist[] = {"timeout_ms", NULL};
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|i", kwlist, &timeout_ms)) {
return NULL;
}
protocol_message_init(&msg);
Py_BEGIN_ALLOW_THREADS
rc = omnisocket_session_recv(&self->session, &msg, timeout_ms);
Py_END_ALLOW_THREADS
if (rc == 1) {
protocol_message_clear(&msg);
Py_RETURN_NONE;
}
if (rc != 0) {
protocol_message_clear(&msg);
return PyErr_SetFromErrno(PyExc_OSError);
}
body = PyBytes_FromStringAndSize((const char *) msg.body, (Py_ssize_t) msg.body_len);
if (body == NULL) {
protocol_message_clear(&msg);
return NULL;
}
result = Py_BuildValue("(siO)", msg.from, (int) msg.type, body);
Py_DECREF(body);
protocol_message_clear(&msg);
return result;
}
static PyObject *PyOmniSession_recv_into(PyOmniSession *self, PyObject *args, PyObject *kwargs) {
PyObject *buffer_obj;
Py_buffer view;
int timeout_ms = -1;
int rc;
kcp_client_recv_meta_t meta;
PyObject *result = NULL;
static char *kwlist[] = {"buffer", "timeout_ms", NULL};
memset(&view, 0, sizeof(view));
memset(&meta, 0, sizeof(meta));
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|i", kwlist, &buffer_obj, &timeout_ms)) {
return NULL;
}
if (PyObject_GetBuffer(buffer_obj, &view, PyBUF_WRITABLE) != 0) {
return NULL;
}
Py_BEGIN_ALLOW_THREADS
rc = omnisocket_session_recv_into(&self->session, view.buf, (size_t) view.len, &meta, timeout_ms);
Py_END_ALLOW_THREADS
PyBuffer_Release(&view);
if (rc == 1) {
Py_RETURN_NONE;
}
if (rc == 2) {
PyErr_Format(
PyExc_BufferError,
"buffer too small: need %zu bytes; current frame was already consumed and dropped",
meta.body_len
);
return NULL;
}
if (rc != 0) {
return PyErr_SetFromErrno(PyExc_OSError);
}
result = Py_BuildValue(
"{s:s,s:s,s:s,s:i,s:K,s:K}",
"from",
meta.from,
"to",
meta.to,
"file_name",
meta.file_name,
"msg_type",
(int) meta.type,
"message_id",
(unsigned long long) meta.id,
"body_len",
(unsigned long long) meta.body_len
);
return result;
}
static PyObject *PyOmniSession_stats(PyOmniSession *self, PyObject *Py_UNUSED(ignored)) {
omnisocket_session_stats_t stats;
memset(&stats, 0, sizeof(stats));
omnisocket_session_stats_snapshot(&self->session, &stats);
return Py_BuildValue(
"{s:K,s:K,s:K,s:K,s:K,s:K,s:K,s:i}",
"send_calls",
(unsigned long long) stats.send_calls,
"send_bytes",
(unsigned long long) stats.send_bytes,
"send_errors",
(unsigned long long) stats.send_errors,
"recv_calls",
(unsigned long long) stats.recv_calls,
"recv_bytes",
(unsigned long long) stats.recv_bytes,
"recv_timeouts",
(unsigned long long) stats.recv_timeouts,
"recv_errors",
(unsigned long long) stats.recv_errors,
"connected",
stats.connected
);
}
static PyObject *PyOmniSession_kcp_metrics(PyOmniSession *self, PyObject *Py_UNUSED(ignored)) {
omnisocket_session_kcp_metrics_t metrics;
memset(&metrics, 0, sizeof(metrics));
if (omnisocket_session_kcp_metrics_snapshot(&self->session, &metrics) != 0) {
return PyErr_SetFromErrno(PyExc_OSError);
}
return Py_BuildValue(
"{s:i,s:i,s:I,s:s,s:s,s:I,s:i,s:i,s:K,s:K,s:K,s:K,s:K,s:K,s:K,s:K,s:K,s:K,s:K,s:K,s:K,s:K,s:K}",
"connected",
metrics.connected,
"has_conv",
metrics.has_conv,
"conv",
metrics.conv,
"local_addr",
metrics.local_addr,
"remote_addr",
metrics.remote_addr,
"rto_ms",
metrics.rto_ms,
"srtt_ms",
metrics.srtt_ms,
"srttvar_ms",
metrics.srttvar_ms,
"bytes_sent",
(unsigned long long) metrics.bytes_sent,
"bytes_received",
(unsigned long long) metrics.bytes_received,
"in_pkts",
(unsigned long long) metrics.in_pkts,
"out_pkts",
(unsigned long long) metrics.out_pkts,
"in_segs",
(unsigned long long) metrics.in_segs,
"out_segs",
(unsigned long long) metrics.out_segs,
"retrans_segs",
(unsigned long long) metrics.retrans_segs,
"fast_retrans_segs",
(unsigned long long) metrics.fast_retrans_segs,
"early_retrans_segs",
(unsigned long long) metrics.early_retrans_segs,
"lost_segs",
(unsigned long long) metrics.lost_segs,
"repeat_segs",
(unsigned long long) metrics.repeat_segs,
"in_errs",
(unsigned long long) metrics.in_errs,
"kcp_in_errs",
(unsigned long long) metrics.kcp_in_errs,
"ring_buffer_snd_queue",
(unsigned long long) metrics.ring_buffer_snd_queue,
"ring_buffer_rcv_queue",
(unsigned long long) metrics.ring_buffer_rcv_queue,
"ring_buffer_snd_buffer",
(unsigned long long) metrics.ring_buffer_snd_buffer
);
}
static PyMethodDef PyOmniSession_methods[] = {
{"connect", (PyCFunction) PyOmniSession_connect, METH_VARARGS | METH_KEYWORDS, NULL},
{"close", (PyCFunction) PyOmniSession_close, METH_NOARGS, NULL},
{"send", (PyCFunction) PyOmniSession_send, METH_VARARGS | METH_KEYWORDS, NULL},
{"recv", (PyCFunction) PyOmniSession_recv, METH_VARARGS | METH_KEYWORDS, PyOmniSession_recv_doc},
{"recv_into", (PyCFunction) PyOmniSession_recv_into, METH_VARARGS | METH_KEYWORDS, PyOmniSession_recv_into_doc},
{"stats", (PyCFunction) PyOmniSession_stats, METH_NOARGS, NULL},
{"kcp_metrics", (PyCFunction) PyOmniSession_kcp_metrics, METH_NOARGS, PyOmniSession_kcp_metrics_doc},
{NULL, NULL, 0, NULL}
};
static PyTypeObject PyOmniSessionType = {
PyVarObject_HEAD_INIT(NULL, 0)
};
static PyModuleDef omnisocket_module = {
PyModuleDef_HEAD_INIT,
.m_name = "_omnisocket",
.m_size = -1,
};
PyMODINIT_FUNC PyInit__omnisocket(void) {
PyObject *module;
PyOmniSessionType.tp_name = "omnisocket.Session";
PyOmniSessionType.tp_basicsize = sizeof(PyOmniSession);
PyOmniSessionType.tp_flags = Py_TPFLAGS_DEFAULT;
PyOmniSessionType.tp_new = PyOmniSession_new;
PyOmniSessionType.tp_dealloc = (destructor) PyOmniSession_dealloc;
PyOmniSessionType.tp_methods = PyOmniSession_methods;
if (PyType_Ready(&PyOmniSessionType) < 0) {
return NULL;
}
module = PyModule_Create(&omnisocket_module);
if (module == NULL) {
return NULL;
}
Py_INCREF(&PyOmniSessionType);
if (PyModule_AddObject(module, "Session", (PyObject *) &PyOmniSessionType) != 0) {
Py_DECREF(&PyOmniSessionType);
Py_DECREF(module);
return NULL;
}
if (PyModule_AddIntConstant(module, "MSG_TYPE_TEXT", MSG_TYPE_TEXT) != 0 ||
PyModule_AddIntConstant(module, "MSG_TYPE_FILE", MSG_TYPE_FILE) != 0 ||
PyModule_AddIntConstant(module, "MSG_TYPE_REGISTER", MSG_TYPE_REGISTER) != 0 ||
PyModule_AddIntConstant(module, "MSG_TYPE_ERROR", MSG_TYPE_ERROR) != 0 ||
PyModule_AddIntConstant(module, "MSG_TYPE_BINARY", MSG_TYPE_BINARY) != 0) {
Py_DECREF(module);
return NULL;
}
return module;
}

View File

@@ -0,0 +1,317 @@
#include "omnisocket_client.h"
int omnisocket_session_init(omnisocket_session_t *session) {
int rc;
if (session == NULL) {
errno = EINVAL;
return -1;
}
memset(session, 0, sizeof(*session));
rc = pthread_mutex_init(&session->mutex, NULL);
if (rc != 0) {
errno = rc;
return -1;
}
rc = pthread_cond_init(&session->idle_cond, NULL);
if (rc != 0) {
pthread_mutex_destroy(&session->mutex);
errno = rc;
return -1;
}
return 0;
}
void omnisocket_session_destroy(omnisocket_session_t *session) {
if (session == NULL) {
return;
}
(void) omnisocket_session_close(session);
pthread_cond_destroy(&session->idle_cond);
pthread_mutex_destroy(&session->mutex);
}
static int omnisocket_session_begin_client_op(omnisocket_session_t *session, kcp_client_t **out_client) {
if (session == NULL || out_client == NULL) {
errno = EINVAL;
return -1;
}
pthread_mutex_lock(&session->mutex);
if (session->closing) {
pthread_mutex_unlock(&session->mutex);
errno = ECANCELED;
return -1;
}
if (session->client == NULL) {
pthread_mutex_unlock(&session->mutex);
errno = ENOTCONN;
return -1;
}
*out_client = session->client;
session->active_ops += 1;
pthread_mutex_unlock(&session->mutex);
return 0;
}
int omnisocket_session_connect(
omnisocket_session_t *session,
const char *server_addr,
const char *relay_via,
const char *peer_id,
const char *bind_ip,
const char *bind_device,
const kcp_conn_options_t *options,
int stats_interval_ms
) {
kcp_client_t *client;
if (session == NULL || server_addr == NULL || peer_id == NULL) {
errno = EINVAL;
return -1;
}
pthread_mutex_lock(&session->mutex);
while (session->closing) {
pthread_cond_wait(&session->idle_cond, &session->mutex);
}
if (session->client != NULL) {
pthread_mutex_unlock(&session->mutex);
errno = EISCONN;
return -1;
}
client = kcp_client_dial_with_options(
server_addr,
relay_via,
peer_id,
bind_ip,
bind_device,
options,
NULL,
NULL,
NULL,
stats_interval_ms
);
if (client == NULL) {
pthread_mutex_unlock(&session->mutex);
return -1;
}
session->client = client;
session->stats.connected = 1;
pthread_mutex_unlock(&session->mutex);
return 0;
}
int omnisocket_session_close(omnisocket_session_t *session) {
kcp_client_t *client;
if (session == NULL) {
errno = EINVAL;
return -1;
}
pthread_mutex_lock(&session->mutex);
while (session->closing) {
pthread_cond_wait(&session->idle_cond, &session->mutex);
}
client = session->client;
if (client != NULL) {
session->closing = 1;
session->client = NULL;
}
session->stats.connected = 0;
pthread_mutex_unlock(&session->mutex);
if (client != NULL) {
kcp_client_close(client);
pthread_mutex_lock(&session->mutex);
while (session->active_ops > 0) {
pthread_cond_wait(&session->idle_cond, &session->mutex);
}
pthread_mutex_unlock(&session->mutex);
kcp_client_free(client);
pthread_mutex_lock(&session->mutex);
session->closing = 0;
pthread_cond_broadcast(&session->idle_cond);
pthread_mutex_unlock(&session->mutex);
}
return 0;
}
int omnisocket_session_send(omnisocket_session_t *session, const char *to, const void *data, size_t data_len) {
kcp_client_t *client;
int rc;
if (session == NULL || to == NULL || (data == NULL && data_len > 0)) {
errno = EINVAL;
return -1;
}
if (omnisocket_session_begin_client_op(session, &client) != 0) {
return -1;
}
rc = kcp_client_send_binary(client, to, data, data_len);
pthread_mutex_lock(&session->mutex);
if (rc == 0) {
session->stats.send_calls += 1;
session->stats.send_bytes += (uint64_t) data_len;
} else {
session->stats.send_errors += 1;
}
if (session->active_ops > 0) {
session->active_ops -= 1;
}
if (session->closing && session->active_ops == 0) {
pthread_cond_broadcast(&session->idle_cond);
}
pthread_mutex_unlock(&session->mutex);
return rc;
}
int omnisocket_session_recv(omnisocket_session_t *session, message_t *out_msg, int timeout_ms) {
kcp_client_t *client;
int rc;
if (session == NULL || out_msg == NULL) {
errno = EINVAL;
return -1;
}
if (omnisocket_session_begin_client_op(session, &client) != 0) {
return -1;
}
rc = kcp_client_receive_timed(client, out_msg, timeout_ms);
pthread_mutex_lock(&session->mutex);
if (rc == 0) {
session->stats.recv_calls += 1;
session->stats.recv_bytes += (uint64_t) out_msg->body_len;
} else if (rc == 1) {
session->stats.recv_timeouts += 1;
} else {
session->stats.recv_errors += 1;
}
if (session->active_ops > 0) {
session->active_ops -= 1;
}
if (session->closing && session->active_ops == 0) {
pthread_cond_broadcast(&session->idle_cond);
}
pthread_mutex_unlock(&session->mutex);
return rc;
}
int omnisocket_session_recv_into(
omnisocket_session_t *session,
void *buffer,
size_t buffer_len,
kcp_client_recv_meta_t *out_meta,
int timeout_ms
) {
kcp_client_t *client;
int rc;
if (session == NULL || out_meta == NULL || (buffer == NULL && buffer_len > 0)) {
errno = EINVAL;
return -1;
}
if (omnisocket_session_begin_client_op(session, &client) != 0) {
return -1;
}
rc = kcp_client_receive_binary_into(client, buffer, buffer_len, out_meta, timeout_ms);
pthread_mutex_lock(&session->mutex);
if (rc == 0) {
session->stats.recv_calls += 1;
session->stats.recv_bytes += (uint64_t) out_meta->body_len;
} else if (rc == 1) {
session->stats.recv_timeouts += 1;
} else {
session->stats.recv_errors += 1;
}
if (session->active_ops > 0) {
session->active_ops -= 1;
}
if (session->closing && session->active_ops == 0) {
pthread_cond_broadcast(&session->idle_cond);
}
pthread_mutex_unlock(&session->mutex);
return rc;
}
void omnisocket_session_stats_snapshot(omnisocket_session_t *session, omnisocket_session_stats_t *out_stats) {
if (session == NULL || out_stats == NULL) {
return;
}
pthread_mutex_lock(&session->mutex);
*out_stats = session->stats;
pthread_mutex_unlock(&session->mutex);
}
int omnisocket_session_kcp_metrics_snapshot(
omnisocket_session_t *session,
omnisocket_session_kcp_metrics_t *out_metrics
) {
kcp_client_t *client = NULL;
kcp_conn_metrics_t metrics;
int rc = 0;
if (session == NULL || out_metrics == NULL) {
errno = EINVAL;
return -1;
}
memset(out_metrics, 0, sizeof(*out_metrics));
pthread_mutex_lock(&session->mutex);
if (session->client != NULL && !session->closing) {
client = session->client;
session->active_ops += 1;
}
pthread_mutex_unlock(&session->mutex);
if (client == NULL) {
return 0;
}
memset(&metrics, 0, sizeof(metrics));
rc = kcp_client_metrics_snapshot(client, &metrics);
pthread_mutex_lock(&session->mutex);
if (session->active_ops > 0) {
session->active_ops -= 1;
}
if (session->closing && session->active_ops == 0) {
pthread_cond_broadcast(&session->idle_cond);
}
pthread_mutex_unlock(&session->mutex);
if (rc != 0) {
return rc;
}
out_metrics->connected = metrics.connected;
out_metrics->has_conv = metrics.has_conv;
out_metrics->conv = metrics.conv;
snprintf(out_metrics->local_addr, sizeof(out_metrics->local_addr), "%s", metrics.local_addr);
snprintf(out_metrics->remote_addr, sizeof(out_metrics->remote_addr), "%s", metrics.remote_addr);
out_metrics->rto_ms = metrics.rto_ms;
out_metrics->srtt_ms = metrics.srtt_ms;
out_metrics->srttvar_ms = metrics.srttvar_ms;
out_metrics->bytes_sent = metrics.bytes_sent;
out_metrics->bytes_received = metrics.bytes_received;
out_metrics->in_pkts = metrics.in_pkts;
out_metrics->out_pkts = metrics.out_pkts;
out_metrics->in_segs = metrics.in_segs;
out_metrics->out_segs = metrics.out_segs;
out_metrics->retrans_segs = metrics.retrans_segs;
out_metrics->fast_retrans_segs = metrics.fast_retrans_segs;
out_metrics->early_retrans_segs = metrics.early_retrans_segs;
out_metrics->lost_segs = metrics.lost_segs;
out_metrics->repeat_segs = metrics.repeat_segs;
out_metrics->in_errs = metrics.in_errs;
out_metrics->kcp_in_errs = metrics.kcp_in_errs;
out_metrics->ring_buffer_snd_queue = metrics.ring_buffer_snd_queue;
out_metrics->ring_buffer_rcv_queue = metrics.ring_buffer_rcv_queue;
out_metrics->ring_buffer_snd_buffer = metrics.ring_buffer_snd_buffer;
return 0;
}

View File

@@ -0,0 +1,82 @@
#ifndef OMNISOCKET_PY_CLIENT_H
#define OMNISOCKET_PY_CLIENT_H
#include "peer_kcp_client.h"
typedef struct omnisocket_session_stats {
uint64_t send_calls;
uint64_t send_bytes;
uint64_t send_errors;
uint64_t recv_calls;
uint64_t recv_bytes;
uint64_t recv_timeouts;
uint64_t recv_errors;
int connected;
} omnisocket_session_stats_t;
typedef struct omnisocket_session_kcp_metrics {
int connected;
int has_conv;
uint32_t conv;
char local_addr[OMNI_MAX_ADDR_TEXT];
char remote_addr[OMNI_MAX_ADDR_TEXT];
uint32_t rto_ms;
int32_t srtt_ms;
int32_t srttvar_ms;
uint64_t bytes_sent;
uint64_t bytes_received;
uint64_t in_pkts;
uint64_t out_pkts;
uint64_t in_segs;
uint64_t out_segs;
uint64_t retrans_segs;
uint64_t fast_retrans_segs;
uint64_t early_retrans_segs;
uint64_t lost_segs;
uint64_t repeat_segs;
uint64_t in_errs;
uint64_t kcp_in_errs;
uint64_t ring_buffer_snd_queue;
uint64_t ring_buffer_rcv_queue;
uint64_t ring_buffer_snd_buffer;
} omnisocket_session_kcp_metrics_t;
typedef struct omnisocket_session {
pthread_mutex_t mutex;
pthread_cond_t idle_cond;
kcp_client_t *client;
size_t active_ops;
int closing;
omnisocket_session_stats_t stats;
} omnisocket_session_t;
int omnisocket_session_init(omnisocket_session_t *session);
void omnisocket_session_destroy(omnisocket_session_t *session);
int omnisocket_session_connect(
omnisocket_session_t *session,
const char *server_addr,
const char *relay_via,
const char *peer_id,
const char *bind_ip,
const char *bind_device,
const kcp_conn_options_t *options,
int stats_interval_ms
);
int omnisocket_session_close(omnisocket_session_t *session);
int omnisocket_session_send(omnisocket_session_t *session, const char *to, const void *data, size_t data_len);
int omnisocket_session_recv(omnisocket_session_t *session, message_t *out_msg, int timeout_ms);
int omnisocket_session_recv_into(
omnisocket_session_t *session,
void *buffer,
size_t buffer_len,
kcp_client_recv_meta_t *out_meta,
int timeout_ms
);
void omnisocket_session_stats_snapshot(omnisocket_session_t *session, omnisocket_session_stats_t *out_stats);
int omnisocket_session_kcp_metrics_snapshot(
omnisocket_session_t *session,
omnisocket_session_kcp_metrics_t *out_metrics
);
#endif

View File

@@ -0,0 +1,9 @@
from pathlib import Path
PACKAGE_ROOT = Path(__file__).resolve().parent
PYTHON_ROOT = PACKAGE_ROOT.parent
REPO_ROOT = PYTHON_ROOT.parent
DEFAULT_SOCKET_PATH = "/tmp/omnisocket-a-side.sock"
DEFAULT_CONFIG_PATH = REPO_ROOT / "config" / "a_side_omnidaemon.yaml"
VERSION = "0.1.0"

View File

@@ -0,0 +1,5 @@
from .daemon import main
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,149 @@
"""Local Unix-domain HTTP client for the A-side OmniDaemon."""
from __future__ import annotations
import http.client
import json
import os
import socket
import threading
from typing import Any
from . import DEFAULT_SOCKET_PATH
class OmniDaemonError(RuntimeError):
def __init__(self, message: str, status_code: int | None = None) -> None:
super().__init__(message)
self.status_code = status_code
class UnixHTTPConnection(http.client.HTTPConnection):
def __init__(self, socket_path: str, timeout: float = 2.0) -> None:
super().__init__("localhost", timeout=timeout)
self.socket_path = socket_path
def connect(self) -> None: # pragma: no cover - runtime depends on Linux socket support
if not hasattr(socket, "AF_UNIX"):
raise OSError("AF_UNIX sockets are not available on this platform")
self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
self.sock.settimeout(self.timeout)
self.sock.connect(self.socket_path)
class OmniDaemonClient:
def __init__(self, socket_path: str | None = None, timeout: float = 2.0) -> None:
self.socket_path = socket_path or os.getenv("OMNIDAEMON_SOCKET", DEFAULT_SOCKET_PATH)
self.timeout = timeout
self._local = threading.local()
def get_health(self) -> dict[str, Any]:
return self._request_json("GET", "/v1/health")
def get_state(self) -> dict[str, Any]:
return self._request_json("GET", "/v1/state")
def get_video_frame(self) -> bytes:
return self._request_bytes("GET", "/v1/video/frame")
def get_control_status(self) -> dict[str, Any]:
return self._request_json("GET", "/v1/control/status")
def send_control_event(
self,
*,
source: str,
event_code: str,
drive_value: float = 1.0,
client_time_ms: int | None = None,
) -> dict[str, Any]:
payload = {
"source": source,
"event_code": event_code,
"drive_value": float(drive_value),
"client_time_ms": client_time_ms,
}
return self._request_json("POST", "/v1/control/event", payload)
def close(self) -> None:
self._reset_connection()
def _request_json(
self,
method: str,
path: str,
payload: dict[str, Any] | None = None,
) -> dict[str, Any]:
raw = self._request_bytes(method, path, payload)
if not raw:
return {}
try:
return json.loads(raw.decode("utf-8"))
except json.JSONDecodeError as error:
raise OmniDaemonError(f"invalid daemon JSON response: {error}") from error
def _request_bytes(
self,
method: str,
path: str,
payload: dict[str, Any] | None = None,
) -> bytes:
body = b""
headers: dict[str, str] = {}
if payload is not None:
body = json.dumps(payload).encode("utf-8")
headers["Content-Type"] = "application/json"
headers["Content-Length"] = str(len(body))
headers.setdefault("Connection", "keep-alive")
for attempt in range(2):
connection = self._get_connection()
try:
connection.request(method, path, body=body, headers=headers)
response = connection.getresponse()
raw = response.read()
except FileNotFoundError as error:
self._reset_connection()
raise OmniDaemonError(
f"daemon socket not found: {self.socket_path}"
) from error
except (OSError, http.client.HTTPException) as error:
self._reset_connection()
if attempt == 0:
continue
raise OmniDaemonError(
f"daemon request failed via {self.socket_path}: {error}"
) from error
if getattr(response, "will_close", False):
self._reset_connection()
if response.status >= 400:
message = raw.decode("utf-8", errors="replace").strip() or response.reason
try:
parsed = json.loads(message)
if isinstance(parsed, dict) and "error" in parsed:
message = str(parsed["error"])
except json.JSONDecodeError:
pass
raise OmniDaemonError(message, status_code=response.status)
return raw
raise OmniDaemonError(f"daemon request failed via {self.socket_path}: retry exhausted")
def _get_connection(self) -> UnixHTTPConnection:
connection = getattr(self._local, "connection", None)
if connection is None:
connection = UnixHTTPConnection(self.socket_path, timeout=self.timeout)
self._local.connection = connection
return connection
def _reset_connection(self) -> None:
connection = getattr(self._local, "connection", None)
if connection is None:
return
try:
connection.close()
except OSError:
pass
self._local.connection = None

View File

@@ -0,0 +1,90 @@
"""Binary control packet codec shared by the daemon and local clients."""
from __future__ import annotations
from dataclasses import dataclass
import struct
import time
CONTROL_PACKET_VERSION = 1
CONTROL_PACKET_STRUCT = struct.Struct("!BBHIfQ")
EVENT_NAME_TO_ID = {
"pose_home": 1,
"pose_hold": 2,
"mode_stride": 3,
"surge_up": 6,
"surge_down": 7,
"sway_left": 8,
"sway_right": 9,
"spin_left": 10,
"spin_right": 11,
"set_surge": 12,
"set_sway": 13,
"set_spin": 14,
"set_lift": 15,
"lift_up": 16,
"lift_down": 17,
"trim_reset": 18,
"session_quit": 19,
}
EVENT_ID_TO_NAME = {value: key for key, value in EVENT_NAME_TO_ID.items()}
ANALOG_EVENT_CODES = {"set_surge", "set_sway", "set_spin"}
@dataclass(slots=True)
class ControlPacket:
seq_id: int
event_id: int
drive_value: float = 1.0
sent_at_ns: int = 0
@property
def event_name(self) -> str:
return EVENT_ID_TO_NAME.get(self.event_id, f"unknown_{self.event_id}")
def encode(self) -> bytes:
sent_at_ns = self.sent_at_ns or time.time_ns()
return CONTROL_PACKET_STRUCT.pack(
CONTROL_PACKET_VERSION,
self.event_id,
0,
int(self.seq_id),
float(self.drive_value),
int(sent_at_ns),
)
@classmethod
def decode(cls, payload: bytes) -> "ControlPacket":
if len(payload) != CONTROL_PACKET_STRUCT.size:
raise ValueError(
f"invalid control packet length {len(payload)}; "
f"want {CONTROL_PACKET_STRUCT.size}"
)
version, event_id, _reserved, seq_id, drive_value, sent_at_ns = (
CONTROL_PACKET_STRUCT.unpack(payload)
)
if version != CONTROL_PACKET_VERSION:
raise ValueError(f"unsupported control packet version {version}")
return cls(
seq_id=int(seq_id),
event_id=int(event_id),
drive_value=float(drive_value),
sent_at_ns=int(sent_at_ns),
)
def make_control_packet(
seq_id: int,
event_name: str,
drive_value: float = 1.0,
sent_at_ns: int | None = None,
) -> ControlPacket:
event_id = EVENT_NAME_TO_ID[event_name]
return ControlPacket(
seq_id=seq_id,
event_id=event_id,
drive_value=drive_value,
sent_at_ns=time.time_ns() if sent_at_ns is None else sent_at_ns,
)

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,10 @@
from pathlib import Path
PACKAGE_ROOT = Path(__file__).resolve().parent
PYTHON_ROOT = PACKAGE_ROOT.parent
REPO_ROOT = PYTHON_ROOT.parent
DEFAULT_SOCKET_PATH = "/tmp/omnisocket-b-side.sock"
DEFAULT_CTRL_SOCKET_PATH = "/tmp/omnisocket-b-ctrl.sock"
DEFAULT_CONFIG_PATH = REPO_ROOT / "config" / "b_side_omnidaemon.yaml"
VERSION = "0.1.0"

View File

@@ -0,0 +1,5 @@
from .daemon import main
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,54 @@
"""Local AF_UNIX SOCK_SEQPACKET client for B-side control delivery."""
from __future__ import annotations
import os
import socket
from omnisocket_a_side.control_codec import CONTROL_PACKET_STRUCT
from . import DEFAULT_CTRL_SOCKET_PATH
class BSideControlClient:
def __init__(self, socket_path: str | None = None) -> None:
self.socket_path = socket_path or os.getenv(
"OMNIBDAEMON_CTRL_SOCKET",
DEFAULT_CTRL_SOCKET_PATH,
)
self._sock: socket.socket | None = None
def connect(self) -> None:
if not hasattr(socket, "AF_UNIX"):
raise OSError("AF_UNIX sockets are not available on this platform")
if not hasattr(socket, "SOCK_SEQPACKET"):
raise OSError("SOCK_SEQPACKET is not available on this platform")
sock = socket.socket(socket.AF_UNIX, socket.SOCK_SEQPACKET)
sock.connect(self.socket_path)
self._sock = sock
def recv_control_packet(self, timeout_ms: int = 100) -> bytes | None:
if self._sock is None:
raise RuntimeError("B-side control client is not connected")
self._sock.settimeout(max(0.001, timeout_ms / 1000.0))
try:
payload = self._sock.recv(CONTROL_PACKET_STRUCT.size)
except socket.timeout:
return None
except BlockingIOError:
return None
if payload == b"":
raise ConnectionResetError("daemon control socket closed")
if len(payload) != CONTROL_PACKET_STRUCT.size:
return None
return payload
def close(self) -> None:
if self._sock is None:
return
try:
self._sock.close()
finally:
self._sock = None

File diff suppressed because it is too large Load Diff

67
python/setup.py Normal file
View File

@@ -0,0 +1,67 @@
from pathlib import Path
import sys
from setuptools import Extension, setup
ROOT = Path(__file__).resolve().parent.parent
PY_ROOT = Path(__file__).resolve().parent
if sys.platform != "linux":
raise RuntimeError("omnisocket Python extension can only be built on Linux")
COMMON_SOURCES = [
ROOT / "src" / "omni_common.c",
ROOT / "src" / "protocol.c",
ROOT / "src" / "latencylog.c",
ROOT / "src" / "tx_timestamp_debug.c",
ROOT / "src" / "kcp_packet_debug.c",
ROOT / "src" / "kcp_session_stats.c",
ROOT / "src" / "linux_timestamping.c",
ROOT / "src" / "interactive.c",
ROOT / "src" / "transport_udp.c",
ROOT / "src" / "transport_kcp.c",
ROOT / "src" / "server_udp_relay.c",
ROOT / "src" / "server_udp_hub.c",
ROOT / "src" / "server_kcp_hub.c",
ROOT / "src" / "peer_udp_client.c",
ROOT / "src" / "peer_kcp_client.c",
ROOT / "third_party" / "cjson" / "cJSON.c",
ROOT / "third_party" / "kcp" / "ikcp.c",
]
setup(
name="omnisocket",
version="0.1.0",
packages=["omnisocket", "omnisocket_a_side", "omnisocket_b_side"],
install_requires=[
"PyYAML>=6.0",
],
entry_points={
"console_scripts": [
"omnisocket-a-side-daemon=omnisocket_a_side.daemon:main",
"omnisocket-b-side-daemon=omnisocket_b_side.daemon:main",
]
},
ext_modules=[
Extension(
"omnisocket._omnisocket",
sources=[
str(PY_ROOT / "omnisocket" / "_omnisocket.c"),
str(PY_ROOT / "omnisocket" / "omnisocket_client.c"),
*[str(path) for path in COMMON_SOURCES],
],
include_dirs=[
str(ROOT / "include"),
str(ROOT / "third_party" / "cjson"),
str(ROOT / "third_party" / "kcp"),
str(PY_ROOT / "omnisocket"),
],
define_macros=[("_GNU_SOURCE", None)],
extra_compile_args=["-std=c11", "-O2", "-pthread"],
extra_link_args=["-pthread"],
)
],
)

View File

@@ -0,0 +1,76 @@
"""Send high-rate control packets to benchmark the KCP control session."""
from __future__ import annotations
import argparse
from pathlib import Path
import sys
import time
ROOT = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(ROOT))
import yaml
from omnisocket_control import make_control_packet
try:
from omnisocket import CONTROL_DEFAULTS, Session
except ImportError:
sys.path.insert(0, str(ROOT / "python"))
from omnisocket import CONTROL_DEFAULTS, Session
def load_config() -> dict:
config_path = ROOT / "config" / "omnisocket_demo.yaml"
if not config_path.exists():
return {}
with config_path.open("r", encoding="utf-8") as file:
return yaml.safe_load(file) or {}
def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument("--rate", type=float, default=200.0, help="send rate in Hz")
parser.add_argument("--count", type=int, default=1000, help="packets to send")
args = parser.parse_args()
config = load_config()
transport_cfg = config.get("transport", {})
sender_cfg = config.get("control_sender", {})
session = Session()
session.connect(
server_addr=str(transport_cfg.get("server_addr", "127.0.0.1:10909")),
peer_id=str(sender_cfg.get("peer_id", "peer-a-ctrl")),
relay_via=str(transport_cfg.get("relay_via", "")),
bind_ip=str(transport_cfg.get("bind_ip", "")),
bind_device=str(transport_cfg.get("bind_device", "")),
**CONTROL_DEFAULTS,
)
target_peer = str(sender_cfg.get("target_peer", "peer-b-ctrl"))
spacing = 1.0 / args.rate if args.rate > 0 else 0.0
start = time.perf_counter()
try:
for seq_id in range(args.count):
packet = make_control_packet(seq_id, "set_surge", drive_value=0.25)
session.send(to=target_peer, data=packet.encode())
if spacing > 0:
target = start + (seq_id + 1) * spacing
remaining = target - time.perf_counter()
if remaining > 0:
time.sleep(remaining)
finally:
elapsed = time.perf_counter() - start
print(
f"sent {args.count} control packets in {elapsed:.3f}s "
f"({(args.count / elapsed) if elapsed > 0 else 0.0:.1f} pkt/s)"
)
print(f"stats={session.stats()}")
session.close()
if __name__ == "__main__":
main()

11
scripts/start_b_side.sh Executable file
View File

@@ -0,0 +1,11 @@
#!/usr/bin/env bash
set -euo pipefail
ROOT="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)"
CONFIG_PATH="${1:-$ROOT/config/b_side_omnidaemon.yaml}"
export OMNIBDAEMON_CONFIG="$CONFIG_PATH"
export PYTHONPATH="$ROOT/python${PYTHONPATH:+:$PYTHONPATH}"
cd "$ROOT"
exec python3 -m omnisocket_b_side.daemon --config "$CONFIG_PATH"

View File

@@ -113,7 +113,7 @@ int latencylog_is_business_message(const message_t *msg) {
if (msg == NULL) {
return 0;
}
return msg->type == MSG_TYPE_TEXT || msg->type == MSG_TYPE_FILE;
return msg->type == MSG_TYPE_TEXT || msg->type == MSG_TYPE_FILE || msg->type == MSG_TYPE_BINARY;
}
void latencylog_log_message_event(latency_logger_t *logger, const char *node_role, const char *node_id, const char *event_name, const message_t *msg) {

View File

@@ -66,6 +66,11 @@ static int kcp_client_persist_message_to_disk(const message_t *msg, const char *
if (omni_write_file(path, msg->body, msg->body_len) != 0) {
return -1;
}
} else if (msg->type == MSG_TYPE_BINARY) {
snprintf(path, sizeof(path), "%s/%s-%" PRIu64 ".bin", inbox_dir, msg->from, msg->id);
if (omni_write_file(path, msg->body, msg->body_len) != 0) {
return -1;
}
} else {
errno = EINVAL;
return -1;
@@ -77,7 +82,20 @@ static int kcp_client_persist_message_to_disk(const message_t *msg, const char *
return 0;
}
kcp_client_t *kcp_client_dial(const char *server_addr, const char *dial_addr, const char *peer_id, const char *bind_ip, const char *bind_device, latency_logger_t *logger, kcp_packet_debug_logger_t *packet_logger, kcp_session_stats_logger_t *stats_logger, int stats_interval_ms) {
static void kcp_client_fill_recv_meta(kcp_client_recv_meta_t *meta, const message_t *msg) {
if (meta == NULL || msg == NULL) {
return;
}
memset(meta, 0, sizeof(*meta));
meta->type = msg->type;
meta->id = msg->id;
meta->body_len = msg->body_len;
snprintf(meta->from, sizeof(meta->from), "%s", msg->from);
snprintf(meta->to, sizeof(meta->to), "%s", msg->to);
snprintf(meta->file_name, sizeof(meta->file_name), "%s", msg->file_name);
}
kcp_client_t *kcp_client_dial_with_options(const char *server_addr, const char *dial_addr, const char *peer_id, const char *bind_ip, const char *bind_device, const kcp_conn_options_t *options, latency_logger_t *logger, kcp_packet_debug_logger_t *packet_logger, kcp_session_stats_logger_t *stats_logger, int stats_interval_ms) {
kcp_client_t *client;
const char *actual_dial_addr = (dial_addr != NULL && dial_addr[0] != '\0') ? dial_addr : server_addr;
message_t register_msg;
@@ -91,7 +109,7 @@ kcp_client_t *kcp_client_dial(const char *server_addr, const char *dial_addr, co
snprintf(client->server_addr, sizeof(client->server_addr), "%s", server_addr == NULL ? "" : server_addr);
pthread_mutex_init(&client->id_mu, NULL);
client->logger = logger;
client->conn = kcp_conn_dial(actual_dial_addr, bind_ip, bind_device, packet_logger, logger, OMNI_NODE_ROLE_PEER, peer_id, stats_logger, stats_interval_ms);
client->conn = kcp_conn_dial_with_options(actual_dial_addr, bind_ip, bind_device, options, packet_logger, logger, OMNI_NODE_ROLE_PEER, peer_id, stats_logger, stats_interval_ms);
if (client->conn == NULL) {
saved_errno = errno;
kcp_client_free(client);
@@ -113,6 +131,10 @@ kcp_client_t *kcp_client_dial(const char *server_addr, const char *dial_addr, co
return client;
}
kcp_client_t *kcp_client_dial(const char *server_addr, const char *dial_addr, const char *peer_id, const char *bind_ip, const char *bind_device, latency_logger_t *logger, kcp_packet_debug_logger_t *packet_logger, kcp_session_stats_logger_t *stats_logger, int stats_interval_ms) {
return kcp_client_dial_with_options(server_addr, dial_addr, peer_id, bind_ip, bind_device, NULL, logger, packet_logger, stats_logger, stats_interval_ms);
}
const char *kcp_client_id(const kcp_client_t *client) {
return client == NULL ? "" : client->id;
}
@@ -121,6 +143,10 @@ int kcp_client_send_text(kcp_client_t *client, const char *to, const char *text)
message_t msg;
uint64_t id;
if (client == NULL || to == NULL || text == NULL) {
errno = EINVAL;
return -1;
}
protocol_message_init(&msg);
kcp_client_next_message_id(client, &id);
msg.type = MSG_TYPE_TEXT;
@@ -141,6 +167,37 @@ int kcp_client_send_text(kcp_client_t *client, const char *to, const char *text)
return 0;
}
int kcp_client_send_binary(kcp_client_t *client, const char *to, const void *data, size_t data_len) {
message_t msg;
uint64_t id;
if (client == NULL || to == NULL || (data == NULL && data_len > 0)) {
errno = EINVAL;
return -1;
}
protocol_message_init(&msg);
kcp_client_next_message_id(client, &id);
msg.type = MSG_TYPE_BINARY;
msg.id = id;
snprintf(msg.from, sizeof(msg.from), "%s", client->id);
snprintf(msg.to, sizeof(msg.to), "%s", to);
if (data_len > 0) {
msg.body = (uint8_t *) malloc(data_len);
if (msg.body == NULL) {
return -1;
}
memcpy(msg.body, data, data_len);
}
msg.body_len = data_len;
latencylog_log_message_event(client->logger, OMNI_NODE_ROLE_PEER, client->id, EVENT_A_APP_PREP_BEGIN, &msg);
if (kcp_conn_send(client->conn, &msg) != 0) {
protocol_message_clear(&msg);
return -1;
}
protocol_message_clear(&msg);
return 0;
}
int kcp_client_send_file_path(kcp_client_t *client, const char *to, const char *path) {
message_t msg;
uint64_t id;
@@ -148,6 +205,10 @@ int kcp_client_send_file_path(kcp_client_t *client, const char *to, const char *
size_t body_len = 0;
const char *base_name = strrchr(path, '/');
if (client == NULL || to == NULL || path == NULL) {
errno = EINVAL;
return -1;
}
if (omni_read_file(path, &body, &body_len) != 0) {
return -1;
}
@@ -169,14 +230,57 @@ int kcp_client_send_file_path(kcp_client_t *client, const char *to, const char *
return 0;
}
int kcp_client_receive(kcp_client_t *client, message_t *out_msg) {
if (kcp_conn_receive(client->conn, out_msg) != 0) {
int kcp_client_receive_timed(kcp_client_t *client, message_t *out_msg, int timeout_ms) {
int rc;
if (client == NULL || out_msg == NULL) {
errno = EINVAL;
return -1;
}
rc = kcp_conn_receive_timed(client->conn, out_msg, timeout_ms);
if (rc != 0) {
return rc;
}
latencylog_log_message_event(client->logger, OMNI_NODE_ROLE_PEER, client->id, EVENT_B_APP_RECV, out_msg);
return 0;
}
int kcp_client_receive(kcp_client_t *client, message_t *out_msg) {
if (kcp_client_receive_timed(client, out_msg, -1) != 0) {
return -1;
}
return 0;
}
int kcp_client_receive_binary_into(kcp_client_t *client, void *buffer, size_t buffer_len, kcp_client_recv_meta_t *out_meta, int timeout_ms) {
message_t msg;
int rc;
if (client == NULL || (buffer == NULL && buffer_len > 0) || out_meta == NULL) {
errno = EINVAL;
return -1;
}
protocol_message_init(&msg);
rc = kcp_client_receive_timed(client, &msg, timeout_ms);
if (rc != 0) {
return rc;
}
kcp_client_fill_recv_meta(out_meta, &msg);
if (msg.body_len > buffer_len) {
protocol_message_clear(&msg);
errno = EMSGSIZE;
return 2;
}
if (msg.body_len > 0) {
memcpy(buffer, msg.body, msg.body_len);
}
protocol_message_clear(&msg);
return 0;
}
int kcp_client_persist_message(kcp_client_t *client, const message_t *msg, const char *inbox_dir, char *out_path, size_t out_path_len) {
if (!latencylog_is_business_message(msg)) {
errno = EINVAL;
@@ -190,6 +294,14 @@ int kcp_client_persist_message(kcp_client_t *client, const message_t *msg, const
return 0;
}
int kcp_client_metrics_snapshot(kcp_client_t *client, kcp_conn_metrics_t *out_metrics) {
if (client == NULL || out_metrics == NULL) {
errno = EINVAL;
return -1;
}
return kcp_conn_metrics_snapshot(client->conn, out_metrics);
}
int kcp_client_close(kcp_client_t *client) {
return client == NULL ? 0 : kcp_conn_close(client->conn);
}

View File

@@ -55,6 +55,11 @@ static int client_persist_message_to_disk(const message_t *msg, const char *inbo
if (omni_write_file(path, msg->body, msg->body_len) != 0) {
return -1;
}
} else if (msg->type == MSG_TYPE_BINARY) {
snprintf(path, sizeof(path), "%s/%s-%" PRIu64 ".bin", inbox_dir, msg->from, msg->id);
if (omni_write_file(path, msg->body, msg->body_len) != 0) {
return -1;
}
} else {
errno = EINVAL;
return -1;

View File

@@ -8,11 +8,12 @@ static const char *protocol_message_type_table[] = {
"text",
"file",
"register",
"error"
"error",
"binary"
};
const char *protocol_message_type_name(message_type_t type) {
if ((int) type < 0 || type >= MSG_TYPE_INVALID) {
if ((int) type < 0 || (size_t) type >= OMNI_ARRAY_LEN(protocol_message_type_table)) {
return "invalid";
}
return protocol_message_type_table[type];
@@ -102,6 +103,11 @@ int protocol_validate_message(const message_t *msg, char *err, size_t err_len) {
return protocol_set_err(err, err_len, "protocol: missing file name");
}
break;
case MSG_TYPE_BINARY:
if (msg->file_name[0] != '\0') {
return protocol_set_err(err, err_len, "protocol: unexpected file name");
}
break;
case MSG_TYPE_REGISTER:
if (strcmp(msg->to, SERVER_PEER_ID) != 0) {
return protocol_set_err(err, err_len, "protocol: invalid register target");

View File

@@ -63,6 +63,36 @@ static kcp_peer_entry_t *kcp_hub_find_peer(kcp_hub_t *hub, const char *peer_id)
return NULL;
}
static int kcp_hub_peer_id_has_suffix(const char *peer_id, const char *suffix) {
size_t peer_len;
size_t suffix_len;
if (peer_id == NULL || suffix == NULL) {
return 0;
}
peer_len = strlen(peer_id);
suffix_len = strlen(suffix);
return peer_len >= suffix_len && strcmp(peer_id + peer_len - suffix_len, suffix) == 0;
}
static int kcp_hub_configure_peer_transport(kcp_conn_t *conn, const char *peer_id) {
kcp_conn_options_t options;
if (conn == NULL || peer_id == NULL) {
errno = EINVAL;
return -1;
}
if (kcp_hub_peer_id_has_suffix(peer_id, "-ctrl")) {
kcp_conn_options_set_control_defaults(&options);
return kcp_conn_apply_options(conn, &options);
}
if (kcp_hub_peer_id_has_suffix(peer_id, "-video")) {
kcp_conn_options_set_video_defaults(&options);
return kcp_conn_apply_options(conn, &options);
}
return 0;
}
static int kcp_hub_send_server_error(kcp_conn_t *conn, const char *to, const char *message) {
message_t msg;
protocol_message_init(&msg);
@@ -255,6 +285,7 @@ static int kcp_hub_handle_peer_message(kcp_hub_t *hub, const char *peer_id, kcp_
switch (msg->type) {
case MSG_TYPE_TEXT:
case MSG_TYPE_FILE:
case MSG_TYPE_BINARY:
snprintf(msg->from, sizeof(msg->from), "%s", peer_id);
if (kcp_hub_deliver_to_local_peer(hub, msg) == 0) {
return 0;
@@ -294,7 +325,7 @@ static int kcp_hub_handle_peer_message(kcp_hub_t *hub, const char *peer_id, kcp_
return 0;
case MSG_TYPE_REGISTER:
case MSG_TYPE_ERROR:
if (kcp_hub_send_server_error(conn, peer_id, "registered peers can only send text or file messages") != 0) {
if (kcp_hub_send_server_error(conn, peer_id, "registered peers can only send text, file, or binary messages") != 0) {
return -1;
}
errno = EPROTO;
@@ -358,6 +389,11 @@ static int kcp_hub_register_conn(kcp_hub_t *hub, kcp_conn_t *conn, char *peer_id
pthread_rwlock_unlock(&hub->lock);
snprintf(peer_id, peer_id_len, "%s", msg.from);
if (kcp_hub_configure_peer_transport(conn, peer_id) != 0) {
kcp_hub_unregister(hub, peer_id, conn);
protocol_message_clear(&msg);
return -1;
}
protocol_message_clear(&msg);
return 0;
}
@@ -518,7 +554,7 @@ int kcp_hub_serve_relay(kcp_hub_t *hub) {
protocol_message_clear(&msg);
continue;
}
if (msg.type != MSG_TYPE_TEXT && msg.type != MSG_TYPE_FILE && msg.type != MSG_TYPE_ERROR) {
if (msg.type != MSG_TYPE_TEXT && msg.type != MSG_TYPE_FILE && msg.type != MSG_TYPE_BINARY && msg.type != MSG_TYPE_ERROR) {
protocol_message_clear(&msg);
continue;
}

View File

@@ -112,7 +112,7 @@ int udp_hub_serve(udp_hub_t *hub) {
pthread_rwlock_unlock(&hub->lock);
continue;
}
if (msg.type != MSG_TYPE_TEXT && msg.type != MSG_TYPE_FILE) {
if (msg.type != MSG_TYPE_TEXT && msg.type != MSG_TYPE_FILE && msg.type != MSG_TYPE_BINARY) {
if (msg.type == MSG_TYPE_ERROR) {
udp_hub_send_error(hub, &addr, addr_len, msg.from, "peers cannot send error messages");
} else {

View File

@@ -5,6 +5,13 @@
#define UDP_RELAY_BUF_SIZE (64U * 1024U)
typedef struct udp_relay_client_entry {
struct udp_relay_client_entry *next;
uint32_t conv;
struct sockaddr_storage addr;
socklen_t addr_len;
} udp_relay_client_entry_t;
struct udp_relay {
int downstream_fd;
int upstream_fd;
@@ -12,9 +19,10 @@ struct udp_relay {
socklen_t upstream_addr_len;
char downstream_local_addr[OMNI_MAX_ADDR_TEXT];
char upstream_local_addr[OMNI_MAX_ADDR_TEXT];
struct sockaddr_storage client_addr;
socklen_t client_addr_len;
int has_client;
struct sockaddr_storage last_client_addr;
socklen_t last_client_addr_len;
int has_last_client;
udp_relay_client_entry_t *clients;
pthread_mutex_t lock;
pthread_mutex_t log_mu;
pthread_mutex_t state_mu;
@@ -131,22 +139,55 @@ static void udp_relay_note_result(udp_relay_t *relay, int rc, int errnum) {
pthread_mutex_unlock(&relay->state_mu);
}
static void udp_relay_record_client(udp_relay_t *relay, const struct sockaddr_storage *addr, socklen_t addr_len) {
static udp_relay_client_entry_t *udp_relay_find_client_locked(udp_relay_t *relay, uint32_t conv) {
udp_relay_client_entry_t *entry;
for (entry = relay->clients; entry != NULL; entry = entry->next) {
if (entry->conv == conv) {
return entry;
}
}
return NULL;
}
static void udp_relay_record_client(udp_relay_t *relay, int has_conv, uint32_t conv, const struct sockaddr_storage *addr, socklen_t addr_len) {
pthread_mutex_lock(&relay->lock);
memcpy(&relay->client_addr, addr, sizeof(*addr));
relay->client_addr_len = addr_len;
relay->has_client = 1;
memcpy(&relay->last_client_addr, addr, sizeof(*addr));
relay->last_client_addr_len = addr_len;
relay->has_last_client = 1;
if (has_conv) {
udp_relay_client_entry_t *entry = udp_relay_find_client_locked(relay, conv);
if (entry == NULL) {
entry = (udp_relay_client_entry_t *) calloc(1, sizeof(*entry));
if (entry != NULL) {
entry->conv = conv;
entry->next = relay->clients;
relay->clients = entry;
}
}
if (entry != NULL) {
memcpy(&entry->addr, addr, sizeof(*addr));
entry->addr_len = addr_len;
}
}
pthread_mutex_unlock(&relay->lock);
}
static int udp_relay_copy_client(udp_relay_t *relay, struct sockaddr_storage *addr, socklen_t *addr_len) {
int has_client;
static int udp_relay_copy_client(udp_relay_t *relay, int has_conv, uint32_t conv, struct sockaddr_storage *addr, socklen_t *addr_len) {
int has_client = 0;
pthread_mutex_lock(&relay->lock);
has_client = relay->has_client;
if (has_client) {
memcpy(addr, &relay->client_addr, sizeof(*addr));
*addr_len = relay->client_addr_len;
if (has_conv) {
udp_relay_client_entry_t *entry = udp_relay_find_client_locked(relay, conv);
if (entry != NULL) {
memcpy(addr, &entry->addr, sizeof(*addr));
*addr_len = entry->addr_len;
has_client = 1;
}
} else if (relay->has_last_client) {
memcpy(addr, &relay->last_client_addr, sizeof(*addr));
*addr_len = relay->last_client_addr_len;
has_client = 1;
}
pthread_mutex_unlock(&relay->lock);
return has_client;
@@ -160,6 +201,8 @@ static void *udp_relay_forward_downstream_to_upstream(void *arg) {
struct sockaddr_storage source;
socklen_t source_len = sizeof(source);
ssize_t n = recvfrom(relay->downstream_fd, buffer, sizeof(buffer), 0, (struct sockaddr *) &source, &source_len);
int has_conv = 0;
uint32_t conv = 0;
if (n < 0) {
int errnum = errno;
@@ -174,7 +217,8 @@ static void *udp_relay_forward_downstream_to_upstream(void *arg) {
return NULL;
}
udp_relay_record_client(relay, &source, source_len);
udp_relay_parse_kcp_summary(buffer, (size_t) n, &has_conv, &conv, NULL);
udp_relay_record_client(relay, has_conv, conv, &source, source_len);
udp_relay_print_packet(relay, "relay_downstream_rx", relay->downstream_local_addr, &source, source_len, buffer, (size_t) n);
for (;;) {
if (send(relay->upstream_fd, buffer, (size_t) n, 0) >= 0) {
@@ -205,6 +249,8 @@ static void *udp_relay_forward_upstream_to_downstream(void *arg) {
struct sockaddr_storage client_addr;
socklen_t client_addr_len = 0;
ssize_t n = recv(relay->upstream_fd, buffer, sizeof(buffer), 0);
int has_conv = 0;
uint32_t conv = 0;
if (n < 0) {
int errnum = errno;
@@ -220,7 +266,8 @@ static void *udp_relay_forward_upstream_to_downstream(void *arg) {
}
udp_relay_print_packet(relay, "relay_upstream_rx", relay->upstream_local_addr, &relay->upstream_addr, relay->upstream_addr_len, buffer, (size_t) n);
if (!udp_relay_copy_client(relay, &client_addr, &client_addr_len)) {
udp_relay_parse_kcp_summary(buffer, (size_t) n, &has_conv, &conv, NULL);
if (!udp_relay_copy_client(relay, has_conv, conv, &client_addr, &client_addr_len)) {
udp_relay_print_packet(relay, "relay_upstream_drop_no_client", relay->upstream_local_addr, &relay->upstream_addr, relay->upstream_addr_len, buffer, (size_t) n);
continue;
}
@@ -404,11 +451,18 @@ int udp_relay_close(udp_relay_t *relay) {
}
void udp_relay_free(udp_relay_t *relay) {
udp_relay_client_entry_t *entry;
udp_relay_client_entry_t *next;
if (relay == NULL) {
return;
}
udp_relay_close(relay);
udp_relay_join_threads(relay);
for (entry = relay->clients; entry != NULL; entry = next) {
next = entry->next;
free(entry);
}
pthread_mutex_destroy(&relay->lock);
pthread_mutex_destroy(&relay->log_mu);
pthread_cond_destroy(&relay->state_cond);

View File

@@ -60,6 +60,8 @@ struct kcp_conn {
int update_thread_started;
pthread_t stats_thread;
int stats_thread_started;
kcp_conn_options_t options;
int update_interval_ms;
uint64_t pending_bytes_sent;
uint64_t pending_bytes_received;
uint64_t pending_in_pkts;
@@ -141,6 +143,82 @@ struct kcp_process_sampler {
static pthread_mutex_t g_kcp_process_sampler_mu = PTHREAD_MUTEX_INITIALIZER;
static kcp_process_sampler_t *g_kcp_process_samplers = NULL;
void kcp_conn_options_init(kcp_conn_options_t *options) {
if (options == NULL) {
return;
}
memset(options, 0, sizeof(*options));
options->nodelay = KCP_DEFAULT_NODELAY;
options->interval_ms = KCP_DEFAULT_INTERVAL_MS;
options->resend = KCP_DEFAULT_RESEND;
options->nc = KCP_DEFAULT_NC;
options->sndwnd = KCP_DEFAULT_SND_WND;
options->rcvwnd = KCP_DEFAULT_RCV_WND;
options->mtu = KCP_DEFAULT_MTU;
}
void kcp_conn_options_set_control_defaults(kcp_conn_options_t *options) {
if (options == NULL) {
return;
}
memset(options, 0, sizeof(*options));
options->nodelay = KCP_CONTROL_NODELAY;
options->interval_ms = KCP_CONTROL_INTERVAL_MS;
options->resend = KCP_CONTROL_RESEND;
options->nc = KCP_CONTROL_NC;
options->sndwnd = KCP_CONTROL_SND_WND;
options->rcvwnd = KCP_CONTROL_RCV_WND;
options->mtu = KCP_CONTROL_MTU;
}
void kcp_conn_options_set_video_defaults(kcp_conn_options_t *options) {
if (options == NULL) {
return;
}
memset(options, 0, sizeof(*options));
options->nodelay = KCP_VIDEO_NODELAY;
options->interval_ms = KCP_VIDEO_INTERVAL_MS;
options->resend = KCP_VIDEO_RESEND;
options->nc = KCP_VIDEO_NC;
options->sndwnd = KCP_VIDEO_SND_WND;
options->rcvwnd = KCP_VIDEO_RCV_WND;
options->mtu = KCP_VIDEO_MTU;
}
static int kcp_conn_validate_options(const kcp_conn_options_t *options) {
if (options == NULL) {
errno = EINVAL;
return -1;
}
if (options->interval_ms <= 0 || options->sndwnd <= 0 || options->rcvwnd <= 0 || options->mtu <= 0) {
errno = EINVAL;
return -1;
}
return 0;
}
static int kcp_conn_apply_options_locked(kcp_conn_t *conn, const kcp_conn_options_t *options) {
if (conn == NULL || conn->kcp == NULL || kcp_conn_validate_options(options) != 0) {
return -1;
}
if (ikcp_wndsize(conn->kcp, options->sndwnd, options->rcvwnd) != 0) {
errno = EINVAL;
return -1;
}
if (ikcp_setmtu(conn->kcp, options->mtu) != 0) {
errno = EINVAL;
return -1;
}
if (ikcp_nodelay(conn->kcp, options->nodelay, options->interval_ms, options->resend, options->nc) != 0) {
errno = EINVAL;
return -1;
}
conn->kcp->stream = 1;
conn->options = *options;
conn->update_interval_ms = options->interval_ms;
return 0;
}
static void kcp_parse_packet_segments(const uint8_t *packet, size_t len, uint32_t *conv, kcp_packet_debug_segment_t **segments, size_t *segment_count) {
size_t offset = 0;
size_t count = 0;
@@ -1107,10 +1185,12 @@ static void *kcp_client_recv_thread_main(void *arg) {
static void *kcp_update_thread_main(void *arg) {
kcp_conn_t *conn = (kcp_conn_t *) arg;
while (!atomic_load(&conn->closed)) {
int interval_ms;
pthread_mutex_lock(&conn->kcp_mu);
ikcp_update(conn->kcp, omni_now_millis32());
interval_ms = conn->update_interval_ms > 0 ? conn->update_interval_ms : KCP_DEFAULT_INTERVAL_MS;
pthread_mutex_unlock(&conn->kcp_mu);
usleep(10000);
usleep((useconds_t) interval_ms * 1000U);
}
return NULL;
}
@@ -1129,10 +1209,11 @@ static int kcp_conn_start_stats_thread(kcp_conn_t *conn) {
return 0;
}
static kcp_conn_t *kcp_conn_alloc_common(int fd, const struct sockaddr_storage *remote_addr, socklen_t remote_addr_len, kcp_socket_debug_state_t *sock_state, latency_logger_t *logger, const char *node_role, const char *node_id, kcp_session_stats_logger_t *stats_logger, int stats_interval_ms) {
static kcp_conn_t *kcp_conn_alloc_common(int fd, const struct sockaddr_storage *remote_addr, socklen_t remote_addr_len, const kcp_conn_options_t *options, kcp_socket_debug_state_t *sock_state, latency_logger_t *logger, const char *node_role, const char *node_id, kcp_session_stats_logger_t *stats_logger, int stats_interval_ms) {
kcp_conn_t *conn = (kcp_conn_t *) calloc(1, sizeof(*conn));
uint32_t conv;
int thread_rc;
kcp_conn_options_t effective_options;
if (conn == NULL) {
errno = ENOMEM;
@@ -1150,6 +1231,12 @@ static kcp_conn_t *kcp_conn_alloc_common(int fd, const struct sockaddr_storage *
snprintf(conn->node_id, sizeof(conn->node_id), "%s", node_id == NULL ? "" : node_id);
conn->stats_logger = stats_logger;
conn->stats_interval_ms = stats_interval_ms > 0 ? stats_interval_ms : KCP_DEFAULT_STATS_INTERVAL_MS;
kcp_conn_options_init(&effective_options);
if (options != NULL) {
effective_options = *options;
}
conn->options = effective_options;
conn->update_interval_ms = effective_options.interval_ms;
conn->sock_state = sock_state;
if (omni_random_u32(&conv) != 0) {
protocol_frame_decoder_destroy(&conn->decoder);
@@ -1170,10 +1257,15 @@ static kcp_conn_t *kcp_conn_alloc_common(int fd, const struct sockaddr_storage *
return NULL;
}
ikcp_setoutput(conn->kcp, kcp_output_callback_impl);
ikcp_wndsize(conn->kcp, KCP_WND_SIZE, KCP_WND_SIZE);
ikcp_setmtu(conn->kcp, KCP_MTU);
ikcp_nodelay(conn->kcp, KCP_NODELAY, KCP_INTERVAL, KCP_RESEND, KCP_NC);
conn->kcp->stream = 1;
if (kcp_conn_apply_options_locked(conn, &effective_options) != 0) {
ikcp_release(conn->kcp);
protocol_frame_decoder_destroy(&conn->decoder);
pthread_cond_destroy(&conn->rx_cond);
pthread_mutex_destroy(&conn->kcp_mu);
pthread_mutex_destroy(&conn->close_mu);
free(conn);
return NULL;
}
if (kcp_conn_attach_process_sampler(conn) != 0) {
ikcp_release(conn->kcp);
protocol_frame_decoder_destroy(&conn->decoder);
@@ -1213,7 +1305,7 @@ static kcp_conn_t *kcp_conn_alloc_common(int fd, const struct sockaddr_storage *
return conn;
}
kcp_conn_t *kcp_conn_dial(const char *server_addr, const char *bind_ip, const char *bind_device, kcp_packet_debug_logger_t *packet_logger, latency_logger_t *logger, const char *node_role, const char *node_id, kcp_session_stats_logger_t *stats_logger, int stats_interval_ms) {
kcp_conn_t *kcp_conn_dial_with_options(const char *server_addr, const char *bind_ip, const char *bind_device, const kcp_conn_options_t *options, kcp_packet_debug_logger_t *packet_logger, latency_logger_t *logger, const char *node_role, const char *node_id, kcp_session_stats_logger_t *stats_logger, int stats_interval_ms) {
struct sockaddr_storage remote_addr;
socklen_t remote_len;
int family;
@@ -1236,7 +1328,7 @@ kcp_conn_t *kcp_conn_dial(const char *server_addr, const char *bind_ip, const ch
close(fd);
return NULL;
}
conn = kcp_conn_alloc_common(fd, &remote_addr, remote_len, sock_state, logger, node_role, node_id, stats_logger, stats_interval_ms);
conn = kcp_conn_alloc_common(fd, &remote_addr, remote_len, options, sock_state, logger, node_role, node_id, stats_logger, stats_interval_ms);
if (conn == NULL) {
kcp_socket_debug_destroy(sock_state);
free(sock_state);
@@ -1255,6 +1347,10 @@ kcp_conn_t *kcp_conn_dial(const char *server_addr, const char *bind_ip, const ch
return conn;
}
kcp_conn_t *kcp_conn_dial(const char *server_addr, const char *bind_ip, const char *bind_device, kcp_packet_debug_logger_t *packet_logger, latency_logger_t *logger, const char *node_role, const char *node_id, kcp_session_stats_logger_t *stats_logger, int stats_interval_ms) {
return kcp_conn_dial_with_options(server_addr, bind_ip, bind_device, NULL, packet_logger, logger, node_role, node_id, stats_logger, stats_interval_ms);
}
static void kcp_listener_enqueue_accept(kcp_listener_t *listener, kcp_conn_t *conn) {
pthread_mutex_lock(&listener->accept_mu);
if (listener->accept_tail == NULL) {
@@ -1381,6 +1477,7 @@ static void *kcp_listener_recv_thread_main(void *arg) {
if (conn == NULL) {
conn = (kcp_conn_t *) calloc(1, sizeof(*conn));
if (conn != NULL) {
kcp_conn_options_t accepted_options;
conn->fd = listener->fd;
memcpy(&conn->remote_addr, &source, sizeof(source));
conn->remote_addr_len = msg.msg_namelen;
@@ -1393,15 +1490,15 @@ static void *kcp_listener_recv_thread_main(void *arg) {
conn->stats_interval_ms = KCP_DEFAULT_STATS_INTERVAL_MS;
conn->sock_state = &listener->sock_state;
conn->listener = listener;
kcp_conn_options_init(&accepted_options);
conn->options = accepted_options;
conn->update_interval_ms = accepted_options.interval_ms;
conn->kcp = ikcp_create(conv, conn);
if (conn->kcp != NULL) {
int update_started = 0;
ikcp_setoutput(conn->kcp, kcp_output_callback_impl);
ikcp_wndsize(conn->kcp, KCP_WND_SIZE, KCP_WND_SIZE);
ikcp_setmtu(conn->kcp, KCP_MTU);
ikcp_nodelay(conn->kcp, KCP_NODELAY, KCP_INTERVAL, KCP_RESEND, KCP_NC);
conn->kcp->stream = 1;
if (pthread_create(&conn->update_thread, NULL, kcp_update_thread_main, conn) == 0) {
if (kcp_conn_apply_options_locked(conn, &accepted_options) == 0 &&
pthread_create(&conn->update_thread, NULL, kcp_update_thread_main, conn) == 0) {
update_started = 1;
}
if (update_started && kcp_listener_add_session(listener, conv, conn) == 0) {
@@ -1537,6 +1634,19 @@ int kcp_conn_configure_runtime(kcp_conn_t *conn, latency_logger_t *logger, const
return 0;
}
int kcp_conn_apply_options(kcp_conn_t *conn, const kcp_conn_options_t *options) {
int rc;
if (conn == NULL || options == NULL) {
errno = EINVAL;
return -1;
}
pthread_mutex_lock(&conn->kcp_mu);
rc = kcp_conn_apply_options_locked(conn, options);
pthread_mutex_unlock(&conn->kcp_mu);
return rc;
}
int kcp_conn_send(kcp_conn_t *conn, const message_t *msg) {
uint8_t *frame = NULL;
size_t frame_len = 0;
@@ -1577,15 +1687,31 @@ int kcp_conn_send(kcp_conn_t *conn, const message_t *msg) {
return 0;
}
int kcp_conn_receive(kcp_conn_t *conn, message_t *out_msg) {
static void kcp_timespec_deadline_after_ms(struct timespec *deadline, int timeout_ms) {
clock_gettime(CLOCK_REALTIME, deadline);
deadline->tv_sec += timeout_ms / 1000;
deadline->tv_nsec += (long) (timeout_ms % 1000) * 1000000L;
if (deadline->tv_nsec >= 1000000000L) {
deadline->tv_sec += 1;
deadline->tv_nsec -= 1000000000L;
}
}
int kcp_conn_receive_timed(kcp_conn_t *conn, message_t *out_msg, int timeout_ms) {
uint8_t *frame = NULL;
size_t frame_len = 0;
char err[128];
int next_rc;
struct timespec deadline;
int use_deadline = timeout_ms > 0;
if (conn == NULL || out_msg == NULL) {
errno = EINVAL;
return -1;
}
if (use_deadline) {
kcp_timespec_deadline_after_ms(&deadline, timeout_ms);
}
for (;;) {
next_rc = protocol_frame_decoder_next(&conn->decoder, &frame, &frame_len);
if (next_rc < 0) {
@@ -1618,12 +1744,33 @@ int kcp_conn_receive(kcp_conn_t *conn, message_t *out_msg) {
errno = ECANCELED;
return -1;
}
pthread_cond_wait(&conn->rx_cond, &conn->kcp_mu);
if (timeout_ms == 0) {
pthread_mutex_unlock(&conn->kcp_mu);
return 1;
}
if (timeout_ms < 0) {
pthread_cond_wait(&conn->rx_cond, &conn->kcp_mu);
} else {
int wait_rc = pthread_cond_timedwait(&conn->rx_cond, &conn->kcp_mu, &deadline);
if (wait_rc == ETIMEDOUT) {
pthread_mutex_unlock(&conn->kcp_mu);
return 1;
}
if (wait_rc != 0) {
pthread_mutex_unlock(&conn->kcp_mu);
errno = wait_rc;
return -1;
}
}
}
pthread_mutex_unlock(&conn->kcp_mu);
}
}
int kcp_conn_receive(kcp_conn_t *conn, message_t *out_msg) {
return kcp_conn_receive_timed(conn, out_msg, -1);
}
uint32_t kcp_conn_conv(const kcp_conn_t *conn) {
return conn == NULL || conn->kcp == NULL ? 0 : conn->kcp->conv;
}
@@ -1641,6 +1788,83 @@ int kcp_conn_local_addr(const kcp_conn_t *conn, struct sockaddr_storage *addr, s
return 0;
}
int kcp_conn_metrics_snapshot(kcp_conn_t *conn, kcp_conn_metrics_t *out_metrics) {
struct sockaddr_storage local_addr;
socklen_t local_len = sizeof(local_addr);
if (conn == NULL || out_metrics == NULL) {
errno = EINVAL;
return -1;
}
memset(out_metrics, 0, sizeof(*out_metrics));
out_metrics->connected = !atomic_load(&conn->closed);
if (conn->sock_state != NULL && !conn->socket_closed &&
getsockname(conn->sock_state->fd, (struct sockaddr *) &local_addr, &local_len) == 0) {
omni_sockaddr_to_string(
(const struct sockaddr *) &local_addr,
local_len,
out_metrics->local_addr,
sizeof(out_metrics->local_addr)
);
}
if (conn->remote_addr_len > 0) {
omni_sockaddr_to_string(
(const struct sockaddr *) &conn->remote_addr,
conn->remote_addr_len,
out_metrics->remote_addr,
sizeof(out_metrics->remote_addr)
);
}
if (conn->process_sampler != NULL) {
out_metrics->bytes_sent = atomic_load_explicit(&conn->process_sampler->bytes_sent, memory_order_relaxed);
out_metrics->bytes_received = atomic_load_explicit(&conn->process_sampler->bytes_received, memory_order_relaxed);
out_metrics->in_pkts = atomic_load_explicit(&conn->process_sampler->in_pkts, memory_order_relaxed);
out_metrics->out_pkts = atomic_load_explicit(&conn->process_sampler->out_pkts, memory_order_relaxed);
out_metrics->in_segs = atomic_load_explicit(&conn->process_sampler->in_segs, memory_order_relaxed);
out_metrics->out_segs = atomic_load_explicit(&conn->process_sampler->out_segs, memory_order_relaxed);
out_metrics->in_errs = atomic_load_explicit(&conn->process_sampler->in_errs, memory_order_relaxed);
out_metrics->kcp_in_errs = atomic_load_explicit(&conn->process_sampler->kcp_in_errs, memory_order_relaxed);
} else {
out_metrics->bytes_sent = conn->pending_bytes_sent;
out_metrics->bytes_received = conn->pending_bytes_received;
out_metrics->in_pkts = conn->pending_in_pkts;
out_metrics->out_pkts = conn->pending_out_pkts;
out_metrics->in_segs = conn->pending_in_segs;
out_metrics->out_segs = conn->pending_out_segs;
out_metrics->in_errs = conn->pending_in_errs;
out_metrics->kcp_in_errs = conn->pending_kcp_in_errs;
}
pthread_mutex_lock(&conn->kcp_mu);
if (conn->kcp != NULL) {
out_metrics->has_conv = 1;
out_metrics->conv = conn->kcp->conv;
out_metrics->rto_ms = conn->kcp->rx_rto;
out_metrics->srtt_ms = conn->kcp->rx_srtt;
out_metrics->srttvar_ms = conn->kcp->rx_rttval;
out_metrics->ring_buffer_snd_queue = conn->kcp->nsnd_que;
out_metrics->ring_buffer_rcv_queue = conn->kcp->nrcv_que;
out_metrics->ring_buffer_snd_buffer = conn->kcp->nsnd_buf;
out_metrics->fast_retrans_segs = conn->kcp->fast_retrans_xmit;
/* This KCP fork does not implement early retransmit, so the counter stays zero. */
out_metrics->early_retrans_segs = conn->kcp->early_retrans_xmit;
out_metrics->lost_segs = conn->kcp->lost_xmit;
out_metrics->repeat_segs = conn->kcp->repeat_xmit;
out_metrics->retrans_segs =
out_metrics->fast_retrans_segs +
out_metrics->early_retrans_segs +
out_metrics->lost_segs;
} else {
out_metrics->connected = 0;
}
pthread_mutex_unlock(&conn->kcp_mu);
return 0;
}
int kcp_conn_close(kcp_conn_t *conn) {
if (conn == NULL) {
return 0;

View File

@@ -298,6 +298,10 @@ ikcpcb *ikcp_create(IUINT32 conv, void *user)
kcp->fastlimit = IKCP_FASTACK_LIMIT;
kcp->nocwnd = 0;
kcp->xmit = 0;
kcp->fast_retrans_xmit = 0;
kcp->early_retrans_xmit = 0;
kcp->lost_xmit = 0;
kcp->repeat_xmit = 0;
kcp->dead_link = IKCP_DEADLINK;
kcp->output = NULL;
kcp->writelog = NULL;
@@ -788,6 +792,7 @@ void ikcp_parse_data(ikcpcb *kcp, IKCPSEG *newseg)
}
else
{
kcp->repeat_xmit++;
ikcp_segment_delete(kcp, newseg);
}
@@ -1192,6 +1197,7 @@ void ikcp_flush(ikcpcb *kcp)
needsend = 1;
segment->xmit++;
kcp->xmit++;
kcp->lost_xmit++;
if (kcp->nodelay == 0)
{
segment->rto += _imax_(segment->rto, (IUINT32)kcp->rx_rto);
@@ -1211,6 +1217,7 @@ void ikcp_flush(ikcpcb *kcp)
{
needsend = 1;
segment->xmit++;
kcp->fast_retrans_xmit++;
segment->fastack = 0;
segment->resendts = current + segment->rto;
change++;

View File

@@ -300,6 +300,10 @@ struct IKCPCB
IINT32 rx_rttval, rx_srtt, rx_rto, rx_minrto;
IUINT32 snd_wnd, rcv_wnd, rmt_wnd, cwnd, probe;
IUINT32 current, interval, ts_flush, xmit;
IUINT32 fast_retrans_xmit;
IUINT32 early_retrans_xmit;
IUINT32 lost_xmit;
IUINT32 repeat_xmit;
IUINT32 nrcv_buf, nsnd_buf;
IUINT32 nrcv_que, nsnd_que;
IUINT32 nodelay, updated;