Compare commits

..

6 Commits

30 changed files with 75 additions and 4258 deletions

2
.gitignore vendored
View File

@@ -18,5 +18,3 @@ c/bin
/python/omnisocket.egg-info
*.so*
/.venv

View File

@@ -4,6 +4,10 @@ CPPFLAGS ?= -Iinclude -Ithird_party/cjson -Ithird_party/kcp
LDFLAGS ?= -pthread
PYTHON ?= python3
ifeq ($(QUIET_FFMPEG_LOGS),1)
CFLAGS += -DQUIET_FFMPEG_LOGS
endif
BIN_DIR := bin
SRC_DIR := src
CMD_DIR := cmd
@@ -50,20 +54,6 @@ CAMERA_VIDEO_SENDER_SRCS := \
third_party/cjson/cJSON.c \
third_party/kcp/ikcp.c
B_SIDE_VIDEO_SENDER := $(BIN_DIR)/b_side_video_sender
B_SIDE_VIDEO_SENDER_SRCS := \
$(CMD_DIR)/b_side_video_sender.c \
$(SRC_DIR)/omni_common.c \
$(SRC_DIR)/protocol.c \
$(SRC_DIR)/latencylog.c \
$(SRC_DIR)/kcp_packet_debug.c \
$(SRC_DIR)/kcp_session_stats.c \
$(SRC_DIR)/linux_timestamping.c \
$(SRC_DIR)/transport_kcp.c \
$(SRC_DIR)/peer_kcp_client.c \
third_party/cjson/cJSON.c \
third_party/kcp/ikcp.c
all: $(TARGETS)
$(BIN_DIR):
@@ -95,11 +85,6 @@ $(CAMERA_VIDEO_SENDER): $(CAMERA_VIDEO_SENDER_SRCS) | $(BIN_DIR)
camera_video_sender: $(CAMERA_VIDEO_SENDER)
$(B_SIDE_VIDEO_SENDER): $(B_SIDE_VIDEO_SENDER_SRCS) | $(BIN_DIR)
$(CC) $(CFLAGS) $(CPPFLAGS) $$(pkg-config --cflags libavformat libavcodec libavutil libswscale) -o $@ $^ $(LDFLAGS) $$(pkg-config --libs libavformat libavcodec libavutil libswscale)
b_side_video_sender: $(B_SIDE_VIDEO_SENDER)
clean:
rm -rf $(BIN_DIR)
@@ -109,4 +94,4 @@ python-ext:
python-install:
cd python && $(PYTHON) -m pip install -e .
.PHONY: all clean python-ext python-install camera_video_sender b_side_video_sender
.PHONY: all clean python-ext python-install camera_video_sender

View File

@@ -27,30 +27,6 @@ make python-ext
make python-install
```
## A-Side OmniDaemon
The A-side daemon is configured from `config/a_side_omnidaemon.yaml` in this repo. The safest way to start it is to pass that file explicitly, because the installed Python package does not bundle the YAML config.
Run from a source checkout:
```bash
python -m omnisocket_a_side.daemon --config "$(pwd)/config/a_side_omnidaemon.yaml"
```
Or, if you installed the console script:
```bash
OMNIDAEMON_CONFIG="$(pwd)/config/a_side_omnidaemon.yaml" \
omnisocket-a-side-daemon
```
Optional overrides:
- `OMNIDAEMON_SOCKET=/tmp/omnisocket-a-side.sock` selects the local UDS path.
- `OMNIDAEMON_CONFIG=/abs/path/to/a_side_omnidaemon.yaml` overrides `--config`.
For `robot-command-center` and the A-side senders, keep the daemon and its clients on the same Linux machine so they can share the Unix-domain socket.
## Run On Different Machines
Server `D` runs the KCP hub on `0.0.0.0:10909`:

View File

@@ -1,933 +0,0 @@
#include <errno.h>
#include <fcntl.h>
#include <inttypes.h>
#include <linux/videodev2.h>
#include <pthread.h>
#include <signal.h>
#include <stdbool.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/ioctl.h>
#include <sys/mman.h>
#include <sys/select.h>
#include <time.h>
#include <unistd.h>
#include <libavcodec/avcodec.h>
#include <libavutil/avutil.h>
#include <libavutil/imgutils.h>
#include <libavutil/opt.h>
#include <libswscale/swscale.h>
#include "cJSON.h"
#include "peer_kcp_client.h"
#define WORKER_CONTROL_FD 3
#define WORKER_TELEMETRY_FD 4
#define WORKER_CONTROL_FD_ENV "OMNI_WORKER_CONTROL_FD"
#define WORKER_TELEMETRY_FD_ENV "OMNI_WORKER_TELEMETRY_FD"
#define NUM_BUFFERS 4
#define CLEAR(x) memset(&(x), 0, sizeof(x))
#define VIDEO_SERVER_ADDR_ENV "OMNI_VIDEO_SERVER_ADDR"
#define VIDEO_RELAY_ADDR_ENV "OMNI_VIDEO_RELAY_VIA"
#define VIDEO_BIND_IP_ENV "OMNI_VIDEO_BIND_IP"
#define VIDEO_BIND_DEVICE_ENV "OMNI_VIDEO_BIND_DEVICE"
#define VIDEO_PEER_ID_ENV "OMNI_VIDEO_PEER_ID"
#define VIDEO_TARGET_PEER_ENV "OMNI_VIDEO_TARGET_PEER"
#define VIDEO_DEVICE_ENV "OMNI_VIDEO_DEVICE"
#define VIDEO_CAPTURE_WIDTH_ENV "OMNI_VIDEO_CAPTURE_WIDTH"
#define VIDEO_CAPTURE_HEIGHT_ENV "OMNI_VIDEO_CAPTURE_HEIGHT"
#define VIDEO_OUTPUT_WIDTH_ENV "OMNI_VIDEO_OUTPUT_WIDTH"
#define VIDEO_OUTPUT_HEIGHT_ENV "OMNI_VIDEO_OUTPUT_HEIGHT"
#define VIDEO_FPS_ENV "OMNI_VIDEO_FPS"
#define VIDEO_JPEG_QSCALE_ENV "OMNI_VIDEO_JPEG_QSCALE"
#define VIDEO_MAX_FRAME_BYTES_ENV "OMNI_VIDEO_MAX_FRAME_BYTES"
#define VIDEO_STATS_INTERVAL_ENV "OMNI_VIDEO_STATS_INTERVAL_MS"
typedef struct {
void *start;
size_t length;
} Buffer;
typedef struct {
char server_addr[OMNI_MAX_ADDR_TEXT];
char relay_addr[OMNI_MAX_ADDR_TEXT];
char bind_ip[OMNI_MAX_ADDR_TEXT];
char bind_device[128];
char peer_id[OMNI_MAX_PEER_ID];
char target_peer[OMNI_MAX_PEER_ID];
char video_device[256];
int capture_width;
int capture_height;
int output_width;
int output_height;
int initial_fps;
int initial_qscale;
int initial_max_frame_bytes;
int stats_interval_ms;
} worker_config_t;
typedef struct {
pthread_mutex_t lock;
pthread_mutex_t telemetry_lock;
FILE *telemetry_stream;
int target_fps;
int jpeg_quality_qscale;
int max_frame_bytes;
bool shutdown_requested;
} runtime_state_t;
typedef struct {
kcp_client_t *client;
char target_peer[OMNI_MAX_PEER_ID];
} video_sender_t;
typedef struct {
FILE *control_stream;
runtime_state_t *runtime;
} control_thread_args_t;
static volatile sig_atomic_t g_signal_stop = 0;
static double monotonic_ms(void) {
struct timespec ts;
clock_gettime(CLOCK_MONOTONIC, &ts);
return (double) ts.tv_sec * 1000.0 + (double) ts.tv_nsec / 1000000.0;
}
static int64_t realtime_ms(void) {
struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
return (int64_t) ts.tv_sec * 1000 + (int64_t) ts.tv_nsec / 1000000;
}
static int env_as_int(const char *name, int fallback) {
const char *raw = getenv(name);
char *endptr = NULL;
long value;
if (raw == NULL || raw[0] == '\0') {
return fallback;
}
errno = 0;
value = strtol(raw, &endptr, 10);
if (errno != 0 || endptr == raw || (endptr != NULL && *endptr != '\0')) {
return fallback;
}
return (int) value;
}
static const char *env_or_default(const char *name, const char *fallback) {
const char *value = getenv(name);
if (value != NULL && value[0] != '\0') {
return value;
}
return fallback;
}
static void handle_signal(int signum) {
(void) signum;
g_signal_stop = 1;
}
static void runtime_set_shutdown(runtime_state_t *runtime, bool shutdown_requested) {
pthread_mutex_lock(&runtime->lock);
runtime->shutdown_requested = shutdown_requested;
pthread_mutex_unlock(&runtime->lock);
}
static bool runtime_should_stop(runtime_state_t *runtime) {
bool shutdown_requested;
pthread_mutex_lock(&runtime->lock);
shutdown_requested = runtime->shutdown_requested;
pthread_mutex_unlock(&runtime->lock);
return g_signal_stop != 0 || shutdown_requested;
}
static void runtime_get_profile(runtime_state_t *runtime, int *fps, int *qscale, int *max_frame_bytes) {
pthread_mutex_lock(&runtime->lock);
*fps = runtime->target_fps;
*qscale = runtime->jpeg_quality_qscale;
*max_frame_bytes = runtime->max_frame_bytes;
pthread_mutex_unlock(&runtime->lock);
}
static void runtime_set_profile(runtime_state_t *runtime, int fps, int qscale, int max_frame_bytes) {
pthread_mutex_lock(&runtime->lock);
if (fps > 0) {
runtime->target_fps = fps;
}
if (qscale > 0) {
runtime->jpeg_quality_qscale = qscale;
}
if (max_frame_bytes > 0) {
runtime->max_frame_bytes = max_frame_bytes;
}
pthread_mutex_unlock(&runtime->lock);
}
static void telemetry_write_line(runtime_state_t *runtime, const char *line) {
pthread_mutex_lock(&runtime->telemetry_lock);
if (runtime->telemetry_stream != NULL) {
fputs(line, runtime->telemetry_stream);
fputc('\n', runtime->telemetry_stream);
fflush(runtime->telemetry_stream);
}
pthread_mutex_unlock(&runtime->telemetry_lock);
}
static void telemetry_write_frame_stat(
runtime_state_t *runtime,
int frame_bytes,
int encode_us,
bool sent,
const char *drop_reason
) {
char line[512];
int written;
written = snprintf(
line,
sizeof(line),
"{\"type\":\"frame_stat\",\"frame_bytes\":%d,\"encode_us\":%d,"
"\"sent\":%s,\"drop_reason\":\"%s\",\"ts_unix_ms\":%" PRId64 "}",
frame_bytes,
encode_us,
sent ? "true" : "false",
drop_reason != NULL ? drop_reason : "",
realtime_ms());
if (written > 0 && written < (int) sizeof(line)) {
telemetry_write_line(runtime, line);
}
}
static void telemetry_write_kcp_metrics(runtime_state_t *runtime, const kcp_conn_metrics_t *metrics) {
char line[1024];
int written;
written = snprintf(
line,
sizeof(line),
"{\"type\":\"kcp_metrics\",\"connected\":%d,\"srtt_ms\":%d,\"srttvar_ms\":%d,"
"\"rto_ms\":%u,\"bytes_sent\":%" PRIu64 ",\"bytes_received\":%" PRIu64 ","
"\"out_pkts\":%" PRIu64 ",\"out_segs\":%" PRIu64 ",\"retrans_segs\":%" PRIu64 ","
"\"fast_retrans_segs\":%" PRIu64 ",\"early_retrans_segs\":%" PRIu64 ","
"\"lost_segs\":%" PRIu64 ",\"ring_buffer_snd_queue\":%" PRIu64 ","
"\"ring_buffer_snd_buffer\":%" PRIu64 ",\"ts_unix_ms\":%" PRId64 "}",
metrics->connected,
metrics->srtt_ms,
metrics->srttvar_ms,
metrics->rto_ms,
metrics->bytes_sent,
metrics->bytes_received,
metrics->out_pkts,
metrics->out_segs,
metrics->retrans_segs,
metrics->fast_retrans_segs,
metrics->early_retrans_segs,
metrics->lost_segs,
metrics->ring_buffer_snd_queue,
metrics->ring_buffer_snd_buffer,
realtime_ms());
if (written > 0 && written < (int) sizeof(line)) {
telemetry_write_line(runtime, line);
}
}
static int load_worker_config(worker_config_t *cfg) {
const char *server_addr = getenv(VIDEO_SERVER_ADDR_ENV);
const char *relay_addr = env_or_default(VIDEO_RELAY_ADDR_ENV, "");
if (cfg == NULL) {
errno = EINVAL;
return -1;
}
if ((server_addr == NULL || server_addr[0] == '\0') && relay_addr[0] != '\0') {
server_addr = relay_addr;
}
if (server_addr == NULL || server_addr[0] == '\0') {
fprintf(stderr, "%s is required\n", VIDEO_SERVER_ADDR_ENV);
errno = EINVAL;
return -1;
}
CLEAR(*cfg);
snprintf(cfg->server_addr, sizeof(cfg->server_addr), "%s", server_addr);
snprintf(cfg->relay_addr, sizeof(cfg->relay_addr), "%s", relay_addr);
snprintf(cfg->bind_ip, sizeof(cfg->bind_ip), "%s", env_or_default(VIDEO_BIND_IP_ENV, ""));
snprintf(cfg->bind_device, sizeof(cfg->bind_device), "%s", env_or_default(VIDEO_BIND_DEVICE_ENV, ""));
snprintf(cfg->peer_id, sizeof(cfg->peer_id), "%s", env_or_default(VIDEO_PEER_ID_ENV, "peer-b-video"));
snprintf(cfg->target_peer, sizeof(cfg->target_peer), "%s", env_or_default(VIDEO_TARGET_PEER_ENV, "peer-a-video"));
snprintf(cfg->video_device, sizeof(cfg->video_device), "%s", env_or_default(VIDEO_DEVICE_ENV, "/dev/video0"));
cfg->capture_width = env_as_int(VIDEO_CAPTURE_WIDTH_ENV, 1280);
cfg->capture_height = env_as_int(VIDEO_CAPTURE_HEIGHT_ENV, 720);
cfg->output_width = env_as_int(VIDEO_OUTPUT_WIDTH_ENV, 640);
cfg->output_height = env_as_int(VIDEO_OUTPUT_HEIGHT_ENV, 360);
cfg->initial_fps = env_as_int(VIDEO_FPS_ENV, 10);
cfg->initial_qscale = env_as_int(VIDEO_JPEG_QSCALE_ENV, 8);
cfg->initial_max_frame_bytes = env_as_int(VIDEO_MAX_FRAME_BYTES_ENV, 40960);
cfg->stats_interval_ms = env_as_int(VIDEO_STATS_INTERVAL_ENV, 100);
return 0;
}
static int open_v4l2_device(const char *device) {
int fd = open(device, O_RDWR | O_NONBLOCK);
if (fd < 0) {
perror("open device");
}
return fd;
}
static int init_v4l2_device(int fd, const worker_config_t *cfg) {
struct v4l2_format fmt;
struct v4l2_streamparm parm;
CLEAR(fmt);
fmt.type = V4L2_BUF_TYPE_VIDEO_CAPTURE;
fmt.fmt.pix.width = cfg->capture_width;
fmt.fmt.pix.height = cfg->capture_height;
fmt.fmt.pix.pixelformat = V4L2_PIX_FMT_MJPEG;
fmt.fmt.pix.field = V4L2_FIELD_NONE;
if (ioctl(fd, VIDIOC_S_FMT, &fmt) < 0) {
perror("VIDIOC_S_FMT");
return -1;
}
CLEAR(parm);
parm.type = V4L2_BUF_TYPE_VIDEO_CAPTURE;
if (cfg->initial_fps > 0 && ioctl(fd, VIDIOC_G_PARM, &parm) == 0) {
if ((parm.parm.capture.capability & V4L2_CAP_TIMEPERFRAME) != 0U) {
parm.parm.capture.timeperframe.numerator = 1U;
parm.parm.capture.timeperframe.denominator = (unsigned int) cfg->initial_fps;
if (ioctl(fd, VIDIOC_S_PARM, &parm) < 0) {
perror("VIDIOC_S_PARM");
}
}
}
return 0;
}
static int init_mmap(int fd, Buffer **buffers, int *num_buffers) {
struct v4l2_requestbuffers req;
int index;
CLEAR(req);
req.count = NUM_BUFFERS;
req.type = V4L2_BUF_TYPE_VIDEO_CAPTURE;
req.memory = V4L2_MEMORY_MMAP;
if (ioctl(fd, VIDIOC_REQBUFS, &req) < 0) {
perror("VIDIOC_REQBUFS");
return -1;
}
*num_buffers = (int) req.count;
*buffers = (Buffer *) calloc(req.count, sizeof(Buffer));
if (*buffers == NULL) {
return -1;
}
for (index = 0; index < (int) req.count; ++index) {
struct v4l2_buffer buf;
CLEAR(buf);
buf.type = V4L2_BUF_TYPE_VIDEO_CAPTURE;
buf.memory = V4L2_MEMORY_MMAP;
buf.index = (unsigned int) index;
if (ioctl(fd, VIDIOC_QUERYBUF, &buf) < 0) {
perror("VIDIOC_QUERYBUF");
return -1;
}
(*buffers)[index].length = buf.length;
(*buffers)[index].start = mmap(NULL, buf.length, PROT_READ | PROT_WRITE, MAP_SHARED, fd, buf.m.offset);
if ((*buffers)[index].start == MAP_FAILED) {
perror("mmap");
return -1;
}
}
return 0;
}
static int queue_all_buffers(int fd, int num_buffers) {
int index;
for (index = 0; index < num_buffers; ++index) {
struct v4l2_buffer buf;
CLEAR(buf);
buf.type = V4L2_BUF_TYPE_VIDEO_CAPTURE;
buf.memory = V4L2_MEMORY_MMAP;
buf.index = (unsigned int) index;
if (ioctl(fd, VIDIOC_QBUF, &buf) < 0) {
perror("VIDIOC_QBUF");
return -1;
}
}
return 0;
}
static int dequeue_latest_buffer(int fd, struct v4l2_buffer *latest_buf) {
struct v4l2_buffer latest_local;
bool have_latest = false;
if (latest_buf == NULL) {
errno = EINVAL;
return -1;
}
for (;;) {
struct v4l2_buffer current;
int dq_errno;
CLEAR(current);
current.type = V4L2_BUF_TYPE_VIDEO_CAPTURE;
current.memory = V4L2_MEMORY_MMAP;
if (ioctl(fd, VIDIOC_DQBUF, &current) < 0) {
dq_errno = errno;
if (dq_errno == EINTR) {
continue;
}
if (dq_errno == EAGAIN) {
if (!have_latest) {
errno = EAGAIN;
return 1;
}
*latest_buf = latest_local;
return 0;
}
if (have_latest && ioctl(fd, VIDIOC_QBUF, &latest_local) < 0) {
perror("VIDIOC_QBUF");
}
errno = dq_errno;
return -1;
}
if (have_latest && ioctl(fd, VIDIOC_QBUF, &latest_local) < 0) {
int q_errno = errno;
perror("VIDIOC_QBUF");
if (ioctl(fd, VIDIOC_QBUF, &current) < 0) {
perror("VIDIOC_QBUF");
}
errno = q_errno;
return -1;
}
latest_local = current;
have_latest = true;
}
}
static AVCodecContext *create_mjpeg_decoder(const worker_config_t *cfg) {
const AVCodec *decoder = avcodec_find_decoder(AV_CODEC_ID_MJPEG);
AVCodecContext *ctx;
if (decoder == NULL) {
fprintf(stderr, "MJPEG decoder not found\n");
return NULL;
}
ctx = avcodec_alloc_context3(decoder);
if (ctx == NULL) {
return NULL;
}
ctx->width = cfg->capture_width;
ctx->height = cfg->capture_height;
ctx->pix_fmt = AV_PIX_FMT_YUVJ420P;
ctx->color_range = AVCOL_RANGE_JPEG;
ctx->thread_count = 1;
if (avcodec_open2(ctx, decoder, NULL) < 0) {
avcodec_free_context(&ctx);
return NULL;
}
return ctx;
}
static AVCodecContext *create_mjpeg_encoder(const worker_config_t *cfg) {
const AVCodec *encoder = avcodec_find_encoder(AV_CODEC_ID_MJPEG);
AVCodecContext *ctx;
if (encoder == NULL) {
fprintf(stderr, "MJPEG encoder not found\n");
return NULL;
}
ctx = avcodec_alloc_context3(encoder);
if (ctx == NULL) {
return NULL;
}
ctx->width = cfg->output_width;
ctx->height = cfg->output_height;
ctx->pix_fmt = AV_PIX_FMT_YUVJ420P;
ctx->time_base = (AVRational) {1, cfg->initial_fps > 0 ? cfg->initial_fps : 10};
ctx->qmin = 1;
ctx->qmax = 31;
ctx->flags |= AV_CODEC_FLAG_QSCALE;
ctx->global_quality = FF_QP2LAMBDA * cfg->initial_qscale;
if (avcodec_open2(ctx, encoder, NULL) < 0) {
avcodec_free_context(&ctx);
return NULL;
}
return ctx;
}
static int decode_mjpeg_frame(AVCodecContext *decoder, const uint8_t *data, int size, AVFrame **frame) {
AVPacket *pkt;
int ret;
*frame = NULL;
pkt = av_packet_alloc();
if (pkt == NULL) {
return -1;
}
pkt->data = (uint8_t *) data;
pkt->size = size;
ret = avcodec_send_packet(decoder, pkt);
if (ret < 0) {
av_packet_free(&pkt);
return -1;
}
*frame = av_frame_alloc();
if (*frame == NULL) {
av_packet_free(&pkt);
return -1;
}
ret = avcodec_receive_frame(decoder, *frame);
av_packet_free(&pkt);
if (ret < 0) {
av_frame_free(frame);
return -1;
}
return 0;
}
static struct SwsContext *create_scaler(AVFrame *src, const worker_config_t *cfg) {
return sws_getContext(
src->width,
src->height,
src->format,
cfg->output_width,
cfg->output_height,
AV_PIX_FMT_YUVJ420P,
SWS_BILINEAR,
NULL,
NULL,
NULL);
}
static int scale_frame(struct SwsContext *sws_ctx, const worker_config_t *cfg, AVFrame *src, AVFrame **dst) {
int ret;
if (sws_ctx == NULL) {
return -1;
}
*dst = av_frame_alloc();
if (*dst == NULL) {
return -1;
}
(*dst)->width = cfg->output_width;
(*dst)->height = cfg->output_height;
(*dst)->format = AV_PIX_FMT_YUVJ420P;
if (av_frame_get_buffer(*dst, 0) < 0) {
av_frame_free(dst);
return -1;
}
ret = sws_scale(
sws_ctx,
(const uint8_t *const *) src->data,
src->linesize,
0,
src->height,
(*dst)->data,
(*dst)->linesize);
if (ret < 0) {
av_frame_free(dst);
return -1;
}
return 0;
}
static int encode_frame(AVCodecContext *encoder, AVFrame *frame, int qscale, AVPacket **pkt) {
int ret;
*pkt = av_packet_alloc();
if (*pkt == NULL) {
return -1;
}
encoder->global_quality = FF_QP2LAMBDA * qscale;
frame->quality = encoder->global_quality;
ret = avcodec_send_frame(encoder, frame);
if (ret < 0) {
av_packet_free(pkt);
return -1;
}
ret = avcodec_receive_packet(encoder, *pkt);
if (ret < 0) {
av_packet_free(pkt);
return -1;
}
return 0;
}
static int video_sender_init(video_sender_t *sender, const worker_config_t *cfg) {
kcp_conn_options_t options;
if (sender == NULL) {
errno = EINVAL;
return -1;
}
CLEAR(*sender);
snprintf(sender->target_peer, sizeof(sender->target_peer), "%s", cfg->target_peer);
kcp_conn_options_set_video_defaults(&options);
sender->client = kcp_client_dial_with_options(
cfg->server_addr,
cfg->relay_addr,
cfg->peer_id,
cfg->bind_ip,
cfg->bind_device,
&options,
NULL,
NULL,
NULL,
cfg->stats_interval_ms);
if (sender->client == NULL) {
return -1;
}
fprintf(stderr, "B-side video worker connected as %s -> %s\n", kcp_client_id(sender->client), sender->target_peer);
return 0;
}
static int video_sender_send_packet(video_sender_t *sender, const AVPacket *encoded_pkt) {
if (sender == NULL || sender->client == NULL || encoded_pkt == NULL) {
errno = EINVAL;
return -1;
}
return kcp_client_send_binary(sender->client, sender->target_peer, encoded_pkt->data, (size_t) encoded_pkt->size);
}
static void video_sender_close(video_sender_t *sender) {
if (sender == NULL || sender->client == NULL) {
return;
}
kcp_client_close(sender->client);
kcp_client_free(sender->client);
sender->client = NULL;
}
static void *control_thread_main(void *opaque) {
control_thread_args_t *args = (control_thread_args_t *) opaque;
char line[512];
while (fgets(line, sizeof(line), args->control_stream) != NULL) {
cJSON *root = cJSON_Parse(line);
cJSON *type_item;
if (root == NULL) {
continue;
}
type_item = cJSON_GetObjectItemCaseSensitive(root, "type");
if (!cJSON_IsString(type_item) || type_item->valuestring == NULL) {
cJSON_Delete(root);
continue;
}
if (strcmp(type_item->valuestring, "shutdown") == 0) {
runtime_set_shutdown(args->runtime, true);
cJSON_Delete(root);
return NULL;
}
if (strcmp(type_item->valuestring, "set_profile") == 0) {
cJSON *fps = cJSON_GetObjectItemCaseSensitive(root, "fps");
cJSON *qscale = cJSON_GetObjectItemCaseSensitive(root, "jpeg_quality_qscale");
cJSON *max_frame_bytes = cJSON_GetObjectItemCaseSensitive(root, "max_frame_bytes");
runtime_set_profile(
args->runtime,
cJSON_IsNumber(fps) ? fps->valueint : -1,
cJSON_IsNumber(qscale) ? qscale->valueint : -1,
cJSON_IsNumber(max_frame_bytes) ? max_frame_bytes->valueint : -1);
}
cJSON_Delete(root);
}
runtime_set_shutdown(args->runtime, true);
return NULL;
}
int main(void) {
worker_config_t cfg;
runtime_state_t runtime;
video_sender_t sender;
control_thread_args_t control_args;
pthread_t control_thread;
FILE *control_stream = NULL;
FILE *telemetry_stream = NULL;
Buffer *buffers = NULL;
int num_buffers = 0;
int camera_fd = -1;
int control_fd = env_as_int(WORKER_CONTROL_FD_ENV, WORKER_CONTROL_FD);
int telemetry_fd = env_as_int(WORKER_TELEMETRY_FD_ENV, WORKER_TELEMETRY_FD);
enum v4l2_buf_type stream_type = V4L2_BUF_TYPE_VIDEO_CAPTURE;
AVCodecContext *decoder = NULL;
AVCodecContext *encoder = NULL;
struct SwsContext *sws_ctx = NULL;
double next_deadline_ms = 0.0;
double next_metrics_ms = 0.0;
int exit_code = 1;
int index;
int sws_src_width = 0;
int sws_src_height = 0;
int sws_src_format = -1;
bool control_thread_started = false;
av_log_set_level(AV_LOG_ERROR);
signal(SIGINT, handle_signal);
signal(SIGTERM, handle_signal);
CLEAR(sender);
if (load_worker_config(&cfg) != 0) {
return 1;
}
CLEAR(runtime);
pthread_mutex_init(&runtime.lock, NULL);
pthread_mutex_init(&runtime.telemetry_lock, NULL);
runtime.target_fps = cfg.initial_fps;
runtime.jpeg_quality_qscale = cfg.initial_qscale;
runtime.max_frame_bytes = cfg.initial_max_frame_bytes;
control_stream = fdopen(control_fd, "r");
telemetry_stream = fdopen(telemetry_fd, "w");
if (control_stream == NULL || telemetry_stream == NULL) {
perror("fdopen worker control/telemetry");
goto cleanup;
}
setvbuf(telemetry_stream, NULL, _IOLBF, 0);
runtime.telemetry_stream = telemetry_stream;
control_args.control_stream = control_stream;
control_args.runtime = &runtime;
if (pthread_create(&control_thread, NULL, control_thread_main, &control_args) != 0) {
perror("pthread_create");
goto cleanup;
}
control_thread_started = true;
camera_fd = open_v4l2_device(cfg.video_device);
if (camera_fd < 0) {
goto cleanup_join_thread;
}
if (init_v4l2_device(camera_fd, &cfg) != 0) {
goto cleanup_join_thread;
}
if (init_mmap(camera_fd, &buffers, &num_buffers) != 0) {
goto cleanup_join_thread;
}
if (queue_all_buffers(camera_fd, num_buffers) != 0) {
goto cleanup_join_thread;
}
if (ioctl(camera_fd, VIDIOC_STREAMON, &stream_type) < 0) {
perror("VIDIOC_STREAMON");
goto cleanup_join_thread;
}
decoder = create_mjpeg_decoder(&cfg);
encoder = create_mjpeg_encoder(&cfg);
if (decoder == NULL || encoder == NULL) {
fprintf(stderr, "failed to create codecs\n");
goto cleanup_join_thread;
}
if (video_sender_init(&sender, &cfg) != 0) {
perror("video_sender_init");
goto cleanup_join_thread;
}
next_deadline_ms = monotonic_ms();
next_metrics_ms = next_deadline_ms + 100.0;
while (!runtime_should_stop(&runtime)) {
AVFrame *decoded_frame = NULL;
AVFrame *scaled_frame = NULL;
AVPacket *encoded_pkt = NULL;
struct v4l2_buffer buf;
fd_set fds;
struct timeval tv;
int select_result;
int fps;
int qscale;
int max_frame_bytes;
int encode_us = 0;
bool sent = false;
const char *drop_reason = "";
double now_ms = monotonic_ms();
double encode_start_ms;
double encode_end_ms;
runtime_get_profile(&runtime, &fps, &qscale, &max_frame_bytes);
if (fps < 1) {
fps = 1;
}
FD_ZERO(&fds);
FD_SET(camera_fd, &fds);
tv.tv_sec = 2;
tv.tv_usec = 0;
select_result = select(camera_fd + 1, &fds, NULL, NULL, &tv);
if (select_result <= 0) {
if (select_result < 0 && errno == EINTR) {
continue;
}
fprintf(stderr, "select timeout/error on camera\n");
continue;
}
if (dequeue_latest_buffer(camera_fd, &buf) != 0) {
if (errno == EAGAIN) {
continue;
}
perror("VIDIOC_DQBUF");
break;
}
now_ms = monotonic_ms();
if (now_ms < next_deadline_ms) {
drop_reason = "paced_drop";
goto requeue_and_report;
}
next_deadline_ms = now_ms + (1000.0 / (double) fps);
if (decode_mjpeg_frame(decoder, (uint8_t *) buffers[buf.index].start, (int) buf.bytesused, &decoded_frame) != 0) {
drop_reason = "decode_failed";
goto requeue_and_report;
}
if (
sws_ctx == NULL
|| sws_src_width != decoded_frame->width
|| sws_src_height != decoded_frame->height
|| sws_src_format != decoded_frame->format
) {
sws_freeContext(sws_ctx);
sws_ctx = create_scaler(decoded_frame, &cfg);
sws_src_width = decoded_frame->width;
sws_src_height = decoded_frame->height;
sws_src_format = decoded_frame->format;
}
if (scale_frame(sws_ctx, &cfg, decoded_frame, &scaled_frame) != 0) {
drop_reason = "scale_failed";
goto requeue_and_report;
}
encode_start_ms = monotonic_ms();
if (encode_frame(encoder, scaled_frame, qscale, &encoded_pkt) != 0) {
drop_reason = "encode_failed";
goto requeue_and_report;
}
encode_end_ms = monotonic_ms();
encode_us = (int) ((encode_end_ms - encode_start_ms) * 1000.0);
if (encoded_pkt->size > max_frame_bytes) {
drop_reason = "frame_too_large";
goto requeue_and_report;
}
if (video_sender_send_packet(&sender, encoded_pkt) != 0) {
perror("video_sender_send_packet");
drop_reason = "send_failed";
goto requeue_and_report;
}
sent = true;
requeue_and_report:
telemetry_write_frame_stat(
&runtime,
encoded_pkt != NULL ? encoded_pkt->size : 0,
encode_us,
sent,
drop_reason);
if (ioctl(camera_fd, VIDIOC_QBUF, &buf) < 0) {
perror("VIDIOC_QBUF");
av_frame_free(&decoded_frame);
av_frame_free(&scaled_frame);
if (encoded_pkt != NULL) {
av_packet_free(&encoded_pkt);
}
break;
}
av_frame_free(&decoded_frame);
av_frame_free(&scaled_frame);
if (encoded_pkt != NULL) {
av_packet_free(&encoded_pkt);
}
now_ms = monotonic_ms();
if (sender.client != NULL && now_ms >= next_metrics_ms) {
kcp_conn_metrics_t metrics;
if (kcp_client_metrics_snapshot(sender.client, &metrics) == 0) {
telemetry_write_kcp_metrics(&runtime, &metrics);
}
next_metrics_ms = now_ms + 100.0;
}
}
exit_code = 0;
cleanup_join_thread:
runtime_set_shutdown(&runtime, true);
if (control_stream != NULL) {
fclose(control_stream);
control_stream = NULL;
}
if (control_thread_started) {
pthread_join(control_thread, NULL);
}
cleanup:
if (control_stream != NULL) {
fclose(control_stream);
control_stream = NULL;
}
if (camera_fd >= 0) {
ioctl(camera_fd, VIDIOC_STREAMOFF, &stream_type);
}
if (buffers != NULL) {
for (index = 0; index < num_buffers; ++index) {
if (buffers[index].start != NULL && buffers[index].start != MAP_FAILED) {
munmap(buffers[index].start, buffers[index].length);
}
}
free(buffers);
}
video_sender_close(&sender);
if (encoder != NULL) {
avcodec_free_context(&encoder);
}
if (decoder != NULL) {
avcodec_free_context(&decoder);
}
sws_freeContext(sws_ctx);
if (camera_fd >= 0) {
close(camera_fd);
}
if (telemetry_stream != NULL) {
fclose(telemetry_stream);
telemetry_stream = NULL;
}
pthread_mutex_destroy(&runtime.lock);
pthread_mutex_destroy(&runtime.telemetry_lock);
return exit_code;
}

View File

@@ -2,6 +2,7 @@
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <stdint.h>
#include <errno.h>
#include <unistd.h>
#include <fcntl.h>
@@ -56,7 +57,7 @@ typedef struct
} VideoSender;
static int video_sender_init(VideoSender *sender);
static int video_sender_send_packet(VideoSender *sender, const AVPacket *encoded_pkt);
static int video_sender_send_packet(VideoSender *sender, const AVPacket *encoded_pkt, uint64_t timestamp);
static void video_sender_close(VideoSender *sender);
typedef struct
@@ -72,6 +73,13 @@ double get_time_ms()
return ts.tv_sec * 1000.0 + ts.tv_nsec / 1000000.0;
}
int64_t get_realtime_ms()
{
struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
return (int64_t)ts.tv_sec * 1000 + ts.tv_nsec / 1000000;
}
static const char *env_or_default(const char *name, const char *fallback)
{
const char *value = getenv(name);
@@ -127,18 +135,41 @@ static int video_sender_init(VideoSender *sender)
return 0;
}
static int video_sender_send_packet(VideoSender *sender, const AVPacket *encoded_pkt)
static int video_sender_send_packet(VideoSender *sender, const AVPacket *encoded_pkt, uint64_t timestamp)
{
uint8_t *senddata;
size_t payload_len;
int rc;
if (sender == NULL || sender->client == NULL || encoded_pkt == NULL)
{
errno = EINVAL;
return -1;
}
return kcp_client_send_binary(
if (encoded_pkt->size < 0)
{
errno = EINVAL;
return -1;
}
payload_len = (size_t)encoded_pkt->size + sizeof(timestamp);
senddata = (uint8_t *)malloc(payload_len);
if (senddata == NULL)
{
errno = ENOMEM;
return -1;
}
memcpy(senddata, encoded_pkt->data, (size_t)encoded_pkt->size);
memcpy(senddata + encoded_pkt->size, &timestamp, sizeof(timestamp));
rc = kcp_client_send_binary(
sender->client,
sender->target_peer,
encoded_pkt->data,
(size_t)encoded_pkt->size);
senddata,
payload_len);
free(senddata);
return rc;
}
static void video_sender_close(VideoSender *sender)
@@ -237,7 +268,7 @@ AVCodecContext *create_mjpeg_decoder()
AVCodecContext *ctx = avcodec_alloc_context3(decoder);
ctx->width = WIDTH;
ctx->height = HEIGHT;
ctx->pix_fmt = AV_PIX_FMT_YUVJ420P; // 使用YUVJ420P
ctx->pix_fmt = AV_PIX_FMT_YUVJ420P; // Use YUVJ420P for MJPEG compatibility.
ctx->color_range = AVCOL_RANGE_JPEG; // JPEG范围
ctx->thread_count = 1;
@@ -268,7 +299,7 @@ AVCodecContext *create_mjpeg_encoder()
AVCodecContext *ctx = avcodec_alloc_context3(encoder);
ctx->width = OUTPUT_WIDTH;
ctx->height = OUTPUT_HEIGHT;
ctx->pix_fmt = AV_PIX_FMT_YUVJ420P; // 使用YUVJ420P
ctx->pix_fmt = AV_PIX_FMT_YUVJ420P; // Use YUVJ420P for MJPEG compatibility.
ctx->time_base = (AVRational){1, 30};
ctx->qmin = 8;
ctx->qmax = 31;
@@ -401,6 +432,10 @@ int main()
memset(&sender, 0, sizeof(sender));
#ifdef QUIET_FFMPEG_LOGS
av_log_set_level(AV_LOG_ERROR);
#endif
PRINT_TIME("=== V4L2 Direct Capture + FFmpeg Processing ===\n");
// 1. Open V4L2 device
@@ -478,7 +513,7 @@ int main()
PRINT_TIME("Frame | Capture | Decode | Scale | Encode | Total | Size | Marker\n");
PRINT_TIME("------|---------|--------|-------|--------|-------|------|--------\n");
for (int i = 0; i < 100; i++)
for (int i = 0; i < 10000; i++)
{
// 只有在开启 DEBUG_TIMING 时才声明这些时间变量
#ifdef DEBUG_TIMING
@@ -605,8 +640,12 @@ int main()
// 如果不开启宏,也打印一些基本信息
printf("Frame %d processed\n", i + 1);
#endif
if (encoded_pkt && video_sender_send_packet(&sender, encoded_pkt) != 0)
int64_t unixtime_ms;
if (encoded_pkt)
{
unixtime_ms = get_realtime_ms();
}
if (encoded_pkt && video_sender_send_packet(&sender, encoded_pkt, unixtime_ms) != 0)
{
perror("video_sender_send_packet");
av_frame_free(&decoded_frame);

View File

@@ -1,51 +0,0 @@
transport:
server_addr: "81.70.156.140:10909"
relay_via: "106.55.173.235:10909"
bind_ip: ""
bind_device: ""
control_sender:
peer_id: "peer-a-ctrl"
target_peer: "peer-b-ctrl"
nodelay: 1
interval_ms: 5
resend: 2
nc: 1
sndwnd: 32
rcvwnd: 32
mtu: 1400
stats_interval_ms: 100
video_receiver:
peer_id: "peer-a-video"
buffer_bytes: 1048576
nodelay: 1
interval_ms: 10
resend: 2
nc: 1
sndwnd: 256
rcvwnd: 256
mtu: 1400
stats_interval_ms: 100
daemon:
socket_path: "/tmp/omnisocket-a-side.sock"
reconnect_delay_ms: 2000
telemetry_interval_ms: 100
analog_send_hz: 100
frame_stale_ms: 500
policy:
health_window_ms: 2000
green_srtt_ms: 35
yellow_srtt_ms: 60
retrans_red_threshold: 10
profile_green:
fps: 15
max_frame_kb: 60
profile_yellow:
fps: 10
max_frame_kb: 40
profile_red:
fps: 5
max_frame_kb: 20

View File

@@ -1,58 +0,0 @@
transport:
server_addr: ""
relay_via: "81.70.156.140:10909"
bind_ip: ""
bind_device: ""
control_receiver:
peer_id: "peer-b-ctrl"
nodelay: 1
interval_ms: 5
resend: 2
nc: 1
sndwnd: 32
rcvwnd: 32
mtu: 1400
stats_interval_ms: 100
queue_capacity: 256
video_sender:
enabled: true
peer_id: "peer-b-video"
target_peer: "peer-a-video"
binary_path: "bin/b_side_video_sender"
device: "/dev/video0"
capture_width: 1280
capture_height: 720
output_width: 640
output_height: 360
fps: 10
jpeg_quality_qscale: 8
max_frame_bytes: 40960
stats_interval_ms: 100
daemon:
socket_path: "/tmp/omnisocket-b-side.sock"
ctrl_socket_path: "/tmp/omnisocket-b-ctrl.sock"
reconnect_delay_ms: 2000
telemetry_interval_ms: 100
worker_restart_delay_ms: 2000
policy:
mode: "manual"
health_window_ms: 2000
green_srtt_ms: 30
yellow_srtt_ms: 55
retrans_red_threshold: 8
profile_green:
fps: 10
jpeg_quality_qscale: 8
max_frame_bytes: 40960
profile_yellow:
fps: 7
jpeg_quality_qscale: 12
max_frame_bytes: 28672
profile_red:
fps: 5
jpeg_quality_qscale: 16
max_frame_bytes: 20480

View File

@@ -27,7 +27,6 @@ int kcp_client_receive_timed(kcp_client_t *client, message_t *out_msg, int timeo
int kcp_client_receive(kcp_client_t *client, message_t *out_msg);
int kcp_client_receive_binary_into(kcp_client_t *client, void *buffer, size_t buffer_len, kcp_client_recv_meta_t *out_meta, int timeout_ms);
int kcp_client_persist_message(kcp_client_t *client, const message_t *msg, const char *inbox_dir, char *out_path, size_t out_path_len);
int kcp_client_metrics_snapshot(kcp_client_t *client, kcp_conn_metrics_t *out_metrics);
int kcp_client_close(kcp_client_t *client);
void kcp_client_free(kcp_client_t *client);

View File

@@ -43,32 +43,6 @@ extern "C" {
typedef struct kcp_conn kcp_conn_t;
typedef struct kcp_listener kcp_listener_t;
typedef struct kcp_conn_metrics {
int connected;
int has_conv;
uint32_t conv;
char local_addr[OMNI_MAX_ADDR_TEXT];
char remote_addr[OMNI_MAX_ADDR_TEXT];
uint32_t rto_ms;
int32_t srtt_ms;
int32_t srttvar_ms;
uint64_t bytes_sent;
uint64_t bytes_received;
uint64_t in_pkts;
uint64_t out_pkts;
uint64_t in_segs;
uint64_t out_segs;
uint64_t retrans_segs;
uint64_t fast_retrans_segs;
uint64_t early_retrans_segs;
uint64_t lost_segs;
uint64_t repeat_segs;
uint64_t in_errs;
uint64_t kcp_in_errs;
uint64_t ring_buffer_snd_queue;
uint64_t ring_buffer_rcv_queue;
uint64_t ring_buffer_snd_buffer;
} kcp_conn_metrics_t;
typedef struct kcp_conn_options {
int nodelay;
int interval_ms;
@@ -94,7 +68,6 @@ int kcp_conn_close(kcp_conn_t *conn);
void kcp_conn_free(kcp_conn_t *conn);
uint32_t kcp_conn_conv(const kcp_conn_t *conn);
int kcp_conn_local_addr(const kcp_conn_t *conn, struct sockaddr_storage *addr, socklen_t *addr_len);
int kcp_conn_metrics_snapshot(kcp_conn_t *conn, kcp_conn_metrics_t *out_metrics);
kcp_listener_t *kcp_listener_listen(const char *listen_addr, const char *bind_device, kcp_packet_debug_logger_t *packet_logger, const char *node_role, const char *node_id);
kcp_conn_t *kcp_listener_accept(kcp_listener_t *listener);

View File

@@ -22,13 +22,6 @@ PyDoc_STRVAR(
"current frame has already been consumed and is lost."
);
PyDoc_STRVAR(
PyOmniSession_kcp_metrics_doc,
"kcp_metrics() -> dict\n"
"\n"
"Return a snapshot of low-level KCP metrics for the current session."
);
static PyObject *PyOmniSession_new(PyTypeObject *type, PyObject *args, PyObject *kwargs) {
PyOmniSession *self;
(void) args;
@@ -288,67 +281,6 @@ static PyObject *PyOmniSession_stats(PyOmniSession *self, PyObject *Py_UNUSED(ig
);
}
static PyObject *PyOmniSession_kcp_metrics(PyOmniSession *self, PyObject *Py_UNUSED(ignored)) {
omnisocket_session_kcp_metrics_t metrics;
memset(&metrics, 0, sizeof(metrics));
if (omnisocket_session_kcp_metrics_snapshot(&self->session, &metrics) != 0) {
return PyErr_SetFromErrno(PyExc_OSError);
}
return Py_BuildValue(
"{s:i,s:i,s:I,s:s,s:s,s:I,s:i,s:i,s:K,s:K,s:K,s:K,s:K,s:K,s:K,s:K,s:K,s:K,s:K,s:K,s:K,s:K,s:K}",
"connected",
metrics.connected,
"has_conv",
metrics.has_conv,
"conv",
metrics.conv,
"local_addr",
metrics.local_addr,
"remote_addr",
metrics.remote_addr,
"rto_ms",
metrics.rto_ms,
"srtt_ms",
metrics.srtt_ms,
"srttvar_ms",
metrics.srttvar_ms,
"bytes_sent",
(unsigned long long) metrics.bytes_sent,
"bytes_received",
(unsigned long long) metrics.bytes_received,
"in_pkts",
(unsigned long long) metrics.in_pkts,
"out_pkts",
(unsigned long long) metrics.out_pkts,
"in_segs",
(unsigned long long) metrics.in_segs,
"out_segs",
(unsigned long long) metrics.out_segs,
"retrans_segs",
(unsigned long long) metrics.retrans_segs,
"fast_retrans_segs",
(unsigned long long) metrics.fast_retrans_segs,
"early_retrans_segs",
(unsigned long long) metrics.early_retrans_segs,
"lost_segs",
(unsigned long long) metrics.lost_segs,
"repeat_segs",
(unsigned long long) metrics.repeat_segs,
"in_errs",
(unsigned long long) metrics.in_errs,
"kcp_in_errs",
(unsigned long long) metrics.kcp_in_errs,
"ring_buffer_snd_queue",
(unsigned long long) metrics.ring_buffer_snd_queue,
"ring_buffer_rcv_queue",
(unsigned long long) metrics.ring_buffer_rcv_queue,
"ring_buffer_snd_buffer",
(unsigned long long) metrics.ring_buffer_snd_buffer
);
}
static PyMethodDef PyOmniSession_methods[] = {
{"connect", (PyCFunction) PyOmniSession_connect, METH_VARARGS | METH_KEYWORDS, NULL},
{"close", (PyCFunction) PyOmniSession_close, METH_NOARGS, NULL},
@@ -356,7 +288,6 @@ static PyMethodDef PyOmniSession_methods[] = {
{"recv", (PyCFunction) PyOmniSession_recv, METH_VARARGS | METH_KEYWORDS, PyOmniSession_recv_doc},
{"recv_into", (PyCFunction) PyOmniSession_recv_into, METH_VARARGS | METH_KEYWORDS, PyOmniSession_recv_into_doc},
{"stats", (PyCFunction) PyOmniSession_stats, METH_NOARGS, NULL},
{"kcp_metrics", (PyCFunction) PyOmniSession_kcp_metrics, METH_NOARGS, PyOmniSession_kcp_metrics_doc},
{NULL, NULL, 0, NULL}
};

View File

@@ -246,72 +246,3 @@ void omnisocket_session_stats_snapshot(omnisocket_session_t *session, omnisocket
*out_stats = session->stats;
pthread_mutex_unlock(&session->mutex);
}
int omnisocket_session_kcp_metrics_snapshot(
omnisocket_session_t *session,
omnisocket_session_kcp_metrics_t *out_metrics
) {
kcp_client_t *client = NULL;
kcp_conn_metrics_t metrics;
int rc = 0;
if (session == NULL || out_metrics == NULL) {
errno = EINVAL;
return -1;
}
memset(out_metrics, 0, sizeof(*out_metrics));
pthread_mutex_lock(&session->mutex);
if (session->client != NULL && !session->closing) {
client = session->client;
session->active_ops += 1;
}
pthread_mutex_unlock(&session->mutex);
if (client == NULL) {
return 0;
}
memset(&metrics, 0, sizeof(metrics));
rc = kcp_client_metrics_snapshot(client, &metrics);
pthread_mutex_lock(&session->mutex);
if (session->active_ops > 0) {
session->active_ops -= 1;
}
if (session->closing && session->active_ops == 0) {
pthread_cond_broadcast(&session->idle_cond);
}
pthread_mutex_unlock(&session->mutex);
if (rc != 0) {
return rc;
}
out_metrics->connected = metrics.connected;
out_metrics->has_conv = metrics.has_conv;
out_metrics->conv = metrics.conv;
snprintf(out_metrics->local_addr, sizeof(out_metrics->local_addr), "%s", metrics.local_addr);
snprintf(out_metrics->remote_addr, sizeof(out_metrics->remote_addr), "%s", metrics.remote_addr);
out_metrics->rto_ms = metrics.rto_ms;
out_metrics->srtt_ms = metrics.srtt_ms;
out_metrics->srttvar_ms = metrics.srttvar_ms;
out_metrics->bytes_sent = metrics.bytes_sent;
out_metrics->bytes_received = metrics.bytes_received;
out_metrics->in_pkts = metrics.in_pkts;
out_metrics->out_pkts = metrics.out_pkts;
out_metrics->in_segs = metrics.in_segs;
out_metrics->out_segs = metrics.out_segs;
out_metrics->retrans_segs = metrics.retrans_segs;
out_metrics->fast_retrans_segs = metrics.fast_retrans_segs;
out_metrics->early_retrans_segs = metrics.early_retrans_segs;
out_metrics->lost_segs = metrics.lost_segs;
out_metrics->repeat_segs = metrics.repeat_segs;
out_metrics->in_errs = metrics.in_errs;
out_metrics->kcp_in_errs = metrics.kcp_in_errs;
out_metrics->ring_buffer_snd_queue = metrics.ring_buffer_snd_queue;
out_metrics->ring_buffer_rcv_queue = metrics.ring_buffer_rcv_queue;
out_metrics->ring_buffer_snd_buffer = metrics.ring_buffer_snd_buffer;
return 0;
}

View File

@@ -14,33 +14,6 @@ typedef struct omnisocket_session_stats {
int connected;
} omnisocket_session_stats_t;
typedef struct omnisocket_session_kcp_metrics {
int connected;
int has_conv;
uint32_t conv;
char local_addr[OMNI_MAX_ADDR_TEXT];
char remote_addr[OMNI_MAX_ADDR_TEXT];
uint32_t rto_ms;
int32_t srtt_ms;
int32_t srttvar_ms;
uint64_t bytes_sent;
uint64_t bytes_received;
uint64_t in_pkts;
uint64_t out_pkts;
uint64_t in_segs;
uint64_t out_segs;
uint64_t retrans_segs;
uint64_t fast_retrans_segs;
uint64_t early_retrans_segs;
uint64_t lost_segs;
uint64_t repeat_segs;
uint64_t in_errs;
uint64_t kcp_in_errs;
uint64_t ring_buffer_snd_queue;
uint64_t ring_buffer_rcv_queue;
uint64_t ring_buffer_snd_buffer;
} omnisocket_session_kcp_metrics_t;
typedef struct omnisocket_session {
pthread_mutex_t mutex;
pthread_cond_t idle_cond;
@@ -74,9 +47,5 @@ int omnisocket_session_recv_into(
int timeout_ms
);
void omnisocket_session_stats_snapshot(omnisocket_session_t *session, omnisocket_session_stats_t *out_stats);
int omnisocket_session_kcp_metrics_snapshot(
omnisocket_session_t *session,
omnisocket_session_kcp_metrics_t *out_metrics
);
#endif

View File

@@ -1,9 +0,0 @@
from pathlib import Path
PACKAGE_ROOT = Path(__file__).resolve().parent
PYTHON_ROOT = PACKAGE_ROOT.parent
REPO_ROOT = PYTHON_ROOT.parent
DEFAULT_SOCKET_PATH = "/tmp/omnisocket-a-side.sock"
DEFAULT_CONFIG_PATH = REPO_ROOT / "config" / "a_side_omnidaemon.yaml"
VERSION = "0.1.0"

View File

@@ -1,5 +0,0 @@
from .daemon import main
if __name__ == "__main__":
main()

View File

@@ -1,149 +0,0 @@
"""Local Unix-domain HTTP client for the A-side OmniDaemon."""
from __future__ import annotations
import http.client
import json
import os
import socket
import threading
from typing import Any
from . import DEFAULT_SOCKET_PATH
class OmniDaemonError(RuntimeError):
def __init__(self, message: str, status_code: int | None = None) -> None:
super().__init__(message)
self.status_code = status_code
class UnixHTTPConnection(http.client.HTTPConnection):
def __init__(self, socket_path: str, timeout: float = 2.0) -> None:
super().__init__("localhost", timeout=timeout)
self.socket_path = socket_path
def connect(self) -> None: # pragma: no cover - runtime depends on Linux socket support
if not hasattr(socket, "AF_UNIX"):
raise OSError("AF_UNIX sockets are not available on this platform")
self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
self.sock.settimeout(self.timeout)
self.sock.connect(self.socket_path)
class OmniDaemonClient:
def __init__(self, socket_path: str | None = None, timeout: float = 2.0) -> None:
self.socket_path = socket_path or os.getenv("OMNIDAEMON_SOCKET", DEFAULT_SOCKET_PATH)
self.timeout = timeout
self._local = threading.local()
def get_health(self) -> dict[str, Any]:
return self._request_json("GET", "/v1/health")
def get_state(self) -> dict[str, Any]:
return self._request_json("GET", "/v1/state")
def get_video_frame(self) -> bytes:
return self._request_bytes("GET", "/v1/video/frame")
def get_control_status(self) -> dict[str, Any]:
return self._request_json("GET", "/v1/control/status")
def send_control_event(
self,
*,
source: str,
event_code: str,
drive_value: float = 1.0,
client_time_ms: int | None = None,
) -> dict[str, Any]:
payload = {
"source": source,
"event_code": event_code,
"drive_value": float(drive_value),
"client_time_ms": client_time_ms,
}
return self._request_json("POST", "/v1/control/event", payload)
def close(self) -> None:
self._reset_connection()
def _request_json(
self,
method: str,
path: str,
payload: dict[str, Any] | None = None,
) -> dict[str, Any]:
raw = self._request_bytes(method, path, payload)
if not raw:
return {}
try:
return json.loads(raw.decode("utf-8"))
except json.JSONDecodeError as error:
raise OmniDaemonError(f"invalid daemon JSON response: {error}") from error
def _request_bytes(
self,
method: str,
path: str,
payload: dict[str, Any] | None = None,
) -> bytes:
body = b""
headers: dict[str, str] = {}
if payload is not None:
body = json.dumps(payload).encode("utf-8")
headers["Content-Type"] = "application/json"
headers["Content-Length"] = str(len(body))
headers.setdefault("Connection", "keep-alive")
for attempt in range(2):
connection = self._get_connection()
try:
connection.request(method, path, body=body, headers=headers)
response = connection.getresponse()
raw = response.read()
except FileNotFoundError as error:
self._reset_connection()
raise OmniDaemonError(
f"daemon socket not found: {self.socket_path}"
) from error
except (OSError, http.client.HTTPException) as error:
self._reset_connection()
if attempt == 0:
continue
raise OmniDaemonError(
f"daemon request failed via {self.socket_path}: {error}"
) from error
if getattr(response, "will_close", False):
self._reset_connection()
if response.status >= 400:
message = raw.decode("utf-8", errors="replace").strip() or response.reason
try:
parsed = json.loads(message)
if isinstance(parsed, dict) and "error" in parsed:
message = str(parsed["error"])
except json.JSONDecodeError:
pass
raise OmniDaemonError(message, status_code=response.status)
return raw
raise OmniDaemonError(f"daemon request failed via {self.socket_path}: retry exhausted")
def _get_connection(self) -> UnixHTTPConnection:
connection = getattr(self._local, "connection", None)
if connection is None:
connection = UnixHTTPConnection(self.socket_path, timeout=self.timeout)
self._local.connection = connection
return connection
def _reset_connection(self) -> None:
connection = getattr(self._local, "connection", None)
if connection is None:
return
try:
connection.close()
except OSError:
pass
self._local.connection = None

View File

@@ -1,90 +0,0 @@
"""Binary control packet codec shared by the daemon and local clients."""
from __future__ import annotations
from dataclasses import dataclass
import struct
import time
CONTROL_PACKET_VERSION = 1
CONTROL_PACKET_STRUCT = struct.Struct("!BBHIfQ")
EVENT_NAME_TO_ID = {
"pose_home": 1,
"pose_hold": 2,
"mode_stride": 3,
"surge_up": 6,
"surge_down": 7,
"sway_left": 8,
"sway_right": 9,
"spin_left": 10,
"spin_right": 11,
"set_surge": 12,
"set_sway": 13,
"set_spin": 14,
"set_lift": 15,
"lift_up": 16,
"lift_down": 17,
"trim_reset": 18,
"session_quit": 19,
}
EVENT_ID_TO_NAME = {value: key for key, value in EVENT_NAME_TO_ID.items()}
ANALOG_EVENT_CODES = {"set_surge", "set_sway", "set_spin"}
@dataclass(slots=True)
class ControlPacket:
seq_id: int
event_id: int
drive_value: float = 1.0
sent_at_ns: int = 0
@property
def event_name(self) -> str:
return EVENT_ID_TO_NAME.get(self.event_id, f"unknown_{self.event_id}")
def encode(self) -> bytes:
sent_at_ns = self.sent_at_ns or time.time_ns()
return CONTROL_PACKET_STRUCT.pack(
CONTROL_PACKET_VERSION,
self.event_id,
0,
int(self.seq_id),
float(self.drive_value),
int(sent_at_ns),
)
@classmethod
def decode(cls, payload: bytes) -> "ControlPacket":
if len(payload) != CONTROL_PACKET_STRUCT.size:
raise ValueError(
f"invalid control packet length {len(payload)}; "
f"want {CONTROL_PACKET_STRUCT.size}"
)
version, event_id, _reserved, seq_id, drive_value, sent_at_ns = (
CONTROL_PACKET_STRUCT.unpack(payload)
)
if version != CONTROL_PACKET_VERSION:
raise ValueError(f"unsupported control packet version {version}")
return cls(
seq_id=int(seq_id),
event_id=int(event_id),
drive_value=float(drive_value),
sent_at_ns=int(sent_at_ns),
)
def make_control_packet(
seq_id: int,
event_name: str,
drive_value: float = 1.0,
sent_at_ns: int | None = None,
) -> ControlPacket:
event_id = EVENT_NAME_TO_ID[event_name]
return ControlPacket(
seq_id=seq_id,
event_id=event_id,
drive_value=drive_value,
sent_at_ns=time.time_ns() if sent_at_ns is None else sent_at_ns,
)

File diff suppressed because it is too large Load Diff

View File

@@ -1,10 +0,0 @@
from pathlib import Path
PACKAGE_ROOT = Path(__file__).resolve().parent
PYTHON_ROOT = PACKAGE_ROOT.parent
REPO_ROOT = PYTHON_ROOT.parent
DEFAULT_SOCKET_PATH = "/tmp/omnisocket-b-side.sock"
DEFAULT_CTRL_SOCKET_PATH = "/tmp/omnisocket-b-ctrl.sock"
DEFAULT_CONFIG_PATH = REPO_ROOT / "config" / "b_side_omnidaemon.yaml"
VERSION = "0.1.0"

View File

@@ -1,5 +0,0 @@
from .daemon import main
if __name__ == "__main__":
main()

View File

@@ -1,54 +0,0 @@
"""Local AF_UNIX SOCK_SEQPACKET client for B-side control delivery."""
from __future__ import annotations
import os
import socket
from omnisocket_a_side.control_codec import CONTROL_PACKET_STRUCT
from . import DEFAULT_CTRL_SOCKET_PATH
class BSideControlClient:
def __init__(self, socket_path: str | None = None) -> None:
self.socket_path = socket_path or os.getenv(
"OMNIBDAEMON_CTRL_SOCKET",
DEFAULT_CTRL_SOCKET_PATH,
)
self._sock: socket.socket | None = None
def connect(self) -> None:
if not hasattr(socket, "AF_UNIX"):
raise OSError("AF_UNIX sockets are not available on this platform")
if not hasattr(socket, "SOCK_SEQPACKET"):
raise OSError("SOCK_SEQPACKET is not available on this platform")
sock = socket.socket(socket.AF_UNIX, socket.SOCK_SEQPACKET)
sock.connect(self.socket_path)
self._sock = sock
def recv_control_packet(self, timeout_ms: int = 100) -> bytes | None:
if self._sock is None:
raise RuntimeError("B-side control client is not connected")
self._sock.settimeout(max(0.001, timeout_ms / 1000.0))
try:
payload = self._sock.recv(CONTROL_PACKET_STRUCT.size)
except socket.timeout:
return None
except BlockingIOError:
return None
if payload == b"":
raise ConnectionResetError("daemon control socket closed")
if len(payload) != CONTROL_PACKET_STRUCT.size:
return None
return payload
def close(self) -> None:
if self._sock is None:
return
try:
self._sock.close()
finally:
self._sock = None

File diff suppressed because it is too large Load Diff

View File

@@ -35,16 +35,7 @@ COMMON_SOURCES = [
setup(
name="omnisocket",
version="0.1.0",
packages=["omnisocket", "omnisocket_a_side", "omnisocket_b_side"],
install_requires=[
"PyYAML>=6.0",
],
entry_points={
"console_scripts": [
"omnisocket-a-side-daemon=omnisocket_a_side.daemon:main",
"omnisocket-b-side-daemon=omnisocket_b_side.daemon:main",
]
},
packages=["omnisocket"],
ext_modules=[
Extension(
"omnisocket._omnisocket",

View File

@@ -193,9 +193,9 @@ done
# 50 轮发送
for i in $(seq 1 50); do
echo "file peer-a /tmp/test30k.bin" >&3
echo "file peer-a /home/boll/test30.bin" >&3
sleep 1
echo "file peer-a /tmp/test5.bin" >&3
echo "file peer-a /home/boll/test5.bin" >&3
sleep 1
done

View File

@@ -155,9 +155,9 @@ done
# 50 轮发送
for i in $(seq 1 50); do
echo "file peer-a /tmp/test30k.bin" >&3
echo "file peer-a /home/boll/test30.bin" >&3
sleep 1
echo "file peer-a /tmp/test5.bin" >&3
echo "file peer-a /home/boll/test5.bin" >&3
sleep 1
done

View File

@@ -1,11 +0,0 @@
#!/usr/bin/env bash
set -euo pipefail
ROOT="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)"
CONFIG_PATH="${1:-$ROOT/config/b_side_omnidaemon.yaml}"
export OMNIBDAEMON_CONFIG="$CONFIG_PATH"
export PYTHONPATH="$ROOT/python${PYTHONPATH:+:$PYTHONPATH}"
cd "$ROOT"
exec python3 -m omnisocket_b_side.daemon --config "$CONFIG_PATH"

View File

@@ -294,14 +294,6 @@ int kcp_client_persist_message(kcp_client_t *client, const message_t *msg, const
return 0;
}
int kcp_client_metrics_snapshot(kcp_client_t *client, kcp_conn_metrics_t *out_metrics) {
if (client == NULL || out_metrics == NULL) {
errno = EINVAL;
return -1;
}
return kcp_conn_metrics_snapshot(client->conn, out_metrics);
}
int kcp_client_close(kcp_client_t *client) {
return client == NULL ? 0 : kcp_conn_close(client->conn);
}

View File

@@ -5,13 +5,6 @@
#define UDP_RELAY_BUF_SIZE (64U * 1024U)
typedef struct udp_relay_client_entry {
struct udp_relay_client_entry *next;
uint32_t conv;
struct sockaddr_storage addr;
socklen_t addr_len;
} udp_relay_client_entry_t;
struct udp_relay {
int downstream_fd;
int upstream_fd;
@@ -19,10 +12,9 @@ struct udp_relay {
socklen_t upstream_addr_len;
char downstream_local_addr[OMNI_MAX_ADDR_TEXT];
char upstream_local_addr[OMNI_MAX_ADDR_TEXT];
struct sockaddr_storage last_client_addr;
socklen_t last_client_addr_len;
int has_last_client;
udp_relay_client_entry_t *clients;
struct sockaddr_storage client_addr;
socklen_t client_addr_len;
int has_client;
pthread_mutex_t lock;
pthread_mutex_t log_mu;
pthread_mutex_t state_mu;
@@ -139,55 +131,22 @@ static void udp_relay_note_result(udp_relay_t *relay, int rc, int errnum) {
pthread_mutex_unlock(&relay->state_mu);
}
static udp_relay_client_entry_t *udp_relay_find_client_locked(udp_relay_t *relay, uint32_t conv) {
udp_relay_client_entry_t *entry;
for (entry = relay->clients; entry != NULL; entry = entry->next) {
if (entry->conv == conv) {
return entry;
}
}
return NULL;
}
static void udp_relay_record_client(udp_relay_t *relay, int has_conv, uint32_t conv, const struct sockaddr_storage *addr, socklen_t addr_len) {
static void udp_relay_record_client(udp_relay_t *relay, const struct sockaddr_storage *addr, socklen_t addr_len) {
pthread_mutex_lock(&relay->lock);
memcpy(&relay->last_client_addr, addr, sizeof(*addr));
relay->last_client_addr_len = addr_len;
relay->has_last_client = 1;
if (has_conv) {
udp_relay_client_entry_t *entry = udp_relay_find_client_locked(relay, conv);
if (entry == NULL) {
entry = (udp_relay_client_entry_t *) calloc(1, sizeof(*entry));
if (entry != NULL) {
entry->conv = conv;
entry->next = relay->clients;
relay->clients = entry;
}
}
if (entry != NULL) {
memcpy(&entry->addr, addr, sizeof(*addr));
entry->addr_len = addr_len;
}
}
memcpy(&relay->client_addr, addr, sizeof(*addr));
relay->client_addr_len = addr_len;
relay->has_client = 1;
pthread_mutex_unlock(&relay->lock);
}
static int udp_relay_copy_client(udp_relay_t *relay, int has_conv, uint32_t conv, struct sockaddr_storage *addr, socklen_t *addr_len) {
int has_client = 0;
static int udp_relay_copy_client(udp_relay_t *relay, struct sockaddr_storage *addr, socklen_t *addr_len) {
int has_client;
pthread_mutex_lock(&relay->lock);
if (has_conv) {
udp_relay_client_entry_t *entry = udp_relay_find_client_locked(relay, conv);
if (entry != NULL) {
memcpy(addr, &entry->addr, sizeof(*addr));
*addr_len = entry->addr_len;
has_client = 1;
}
} else if (relay->has_last_client) {
memcpy(addr, &relay->last_client_addr, sizeof(*addr));
*addr_len = relay->last_client_addr_len;
has_client = 1;
has_client = relay->has_client;
if (has_client) {
memcpy(addr, &relay->client_addr, sizeof(*addr));
*addr_len = relay->client_addr_len;
}
pthread_mutex_unlock(&relay->lock);
return has_client;
@@ -201,8 +160,6 @@ static void *udp_relay_forward_downstream_to_upstream(void *arg) {
struct sockaddr_storage source;
socklen_t source_len = sizeof(source);
ssize_t n = recvfrom(relay->downstream_fd, buffer, sizeof(buffer), 0, (struct sockaddr *) &source, &source_len);
int has_conv = 0;
uint32_t conv = 0;
if (n < 0) {
int errnum = errno;
@@ -217,8 +174,7 @@ static void *udp_relay_forward_downstream_to_upstream(void *arg) {
return NULL;
}
udp_relay_parse_kcp_summary(buffer, (size_t) n, &has_conv, &conv, NULL);
udp_relay_record_client(relay, has_conv, conv, &source, source_len);
udp_relay_record_client(relay, &source, source_len);
udp_relay_print_packet(relay, "relay_downstream_rx", relay->downstream_local_addr, &source, source_len, buffer, (size_t) n);
for (;;) {
if (send(relay->upstream_fd, buffer, (size_t) n, 0) >= 0) {
@@ -249,8 +205,6 @@ static void *udp_relay_forward_upstream_to_downstream(void *arg) {
struct sockaddr_storage client_addr;
socklen_t client_addr_len = 0;
ssize_t n = recv(relay->upstream_fd, buffer, sizeof(buffer), 0);
int has_conv = 0;
uint32_t conv = 0;
if (n < 0) {
int errnum = errno;
@@ -266,8 +220,7 @@ static void *udp_relay_forward_upstream_to_downstream(void *arg) {
}
udp_relay_print_packet(relay, "relay_upstream_rx", relay->upstream_local_addr, &relay->upstream_addr, relay->upstream_addr_len, buffer, (size_t) n);
udp_relay_parse_kcp_summary(buffer, (size_t) n, &has_conv, &conv, NULL);
if (!udp_relay_copy_client(relay, has_conv, conv, &client_addr, &client_addr_len)) {
if (!udp_relay_copy_client(relay, &client_addr, &client_addr_len)) {
udp_relay_print_packet(relay, "relay_upstream_drop_no_client", relay->upstream_local_addr, &relay->upstream_addr, relay->upstream_addr_len, buffer, (size_t) n);
continue;
}
@@ -451,18 +404,11 @@ int udp_relay_close(udp_relay_t *relay) {
}
void udp_relay_free(udp_relay_t *relay) {
udp_relay_client_entry_t *entry;
udp_relay_client_entry_t *next;
if (relay == NULL) {
return;
}
udp_relay_close(relay);
udp_relay_join_threads(relay);
for (entry = relay->clients; entry != NULL; entry = next) {
next = entry->next;
free(entry);
}
pthread_mutex_destroy(&relay->lock);
pthread_mutex_destroy(&relay->log_mu);
pthread_cond_destroy(&relay->state_cond);

View File

@@ -1788,83 +1788,6 @@ int kcp_conn_local_addr(const kcp_conn_t *conn, struct sockaddr_storage *addr, s
return 0;
}
int kcp_conn_metrics_snapshot(kcp_conn_t *conn, kcp_conn_metrics_t *out_metrics) {
struct sockaddr_storage local_addr;
socklen_t local_len = sizeof(local_addr);
if (conn == NULL || out_metrics == NULL) {
errno = EINVAL;
return -1;
}
memset(out_metrics, 0, sizeof(*out_metrics));
out_metrics->connected = !atomic_load(&conn->closed);
if (conn->sock_state != NULL && !conn->socket_closed &&
getsockname(conn->sock_state->fd, (struct sockaddr *) &local_addr, &local_len) == 0) {
omni_sockaddr_to_string(
(const struct sockaddr *) &local_addr,
local_len,
out_metrics->local_addr,
sizeof(out_metrics->local_addr)
);
}
if (conn->remote_addr_len > 0) {
omni_sockaddr_to_string(
(const struct sockaddr *) &conn->remote_addr,
conn->remote_addr_len,
out_metrics->remote_addr,
sizeof(out_metrics->remote_addr)
);
}
if (conn->process_sampler != NULL) {
out_metrics->bytes_sent = atomic_load_explicit(&conn->process_sampler->bytes_sent, memory_order_relaxed);
out_metrics->bytes_received = atomic_load_explicit(&conn->process_sampler->bytes_received, memory_order_relaxed);
out_metrics->in_pkts = atomic_load_explicit(&conn->process_sampler->in_pkts, memory_order_relaxed);
out_metrics->out_pkts = atomic_load_explicit(&conn->process_sampler->out_pkts, memory_order_relaxed);
out_metrics->in_segs = atomic_load_explicit(&conn->process_sampler->in_segs, memory_order_relaxed);
out_metrics->out_segs = atomic_load_explicit(&conn->process_sampler->out_segs, memory_order_relaxed);
out_metrics->in_errs = atomic_load_explicit(&conn->process_sampler->in_errs, memory_order_relaxed);
out_metrics->kcp_in_errs = atomic_load_explicit(&conn->process_sampler->kcp_in_errs, memory_order_relaxed);
} else {
out_metrics->bytes_sent = conn->pending_bytes_sent;
out_metrics->bytes_received = conn->pending_bytes_received;
out_metrics->in_pkts = conn->pending_in_pkts;
out_metrics->out_pkts = conn->pending_out_pkts;
out_metrics->in_segs = conn->pending_in_segs;
out_metrics->out_segs = conn->pending_out_segs;
out_metrics->in_errs = conn->pending_in_errs;
out_metrics->kcp_in_errs = conn->pending_kcp_in_errs;
}
pthread_mutex_lock(&conn->kcp_mu);
if (conn->kcp != NULL) {
out_metrics->has_conv = 1;
out_metrics->conv = conn->kcp->conv;
out_metrics->rto_ms = conn->kcp->rx_rto;
out_metrics->srtt_ms = conn->kcp->rx_srtt;
out_metrics->srttvar_ms = conn->kcp->rx_rttval;
out_metrics->ring_buffer_snd_queue = conn->kcp->nsnd_que;
out_metrics->ring_buffer_rcv_queue = conn->kcp->nrcv_que;
out_metrics->ring_buffer_snd_buffer = conn->kcp->nsnd_buf;
out_metrics->fast_retrans_segs = conn->kcp->fast_retrans_xmit;
/* This KCP fork does not implement early retransmit, so the counter stays zero. */
out_metrics->early_retrans_segs = conn->kcp->early_retrans_xmit;
out_metrics->lost_segs = conn->kcp->lost_xmit;
out_metrics->repeat_segs = conn->kcp->repeat_xmit;
out_metrics->retrans_segs =
out_metrics->fast_retrans_segs +
out_metrics->early_retrans_segs +
out_metrics->lost_segs;
} else {
out_metrics->connected = 0;
}
pthread_mutex_unlock(&conn->kcp_mu);
return 0;
}
int kcp_conn_close(kcp_conn_t *conn) {
if (conn == NULL) {
return 0;

View File

@@ -298,10 +298,6 @@ ikcpcb *ikcp_create(IUINT32 conv, void *user)
kcp->fastlimit = IKCP_FASTACK_LIMIT;
kcp->nocwnd = 0;
kcp->xmit = 0;
kcp->fast_retrans_xmit = 0;
kcp->early_retrans_xmit = 0;
kcp->lost_xmit = 0;
kcp->repeat_xmit = 0;
kcp->dead_link = IKCP_DEADLINK;
kcp->output = NULL;
kcp->writelog = NULL;
@@ -792,7 +788,6 @@ void ikcp_parse_data(ikcpcb *kcp, IKCPSEG *newseg)
}
else
{
kcp->repeat_xmit++;
ikcp_segment_delete(kcp, newseg);
}
@@ -1197,7 +1192,6 @@ void ikcp_flush(ikcpcb *kcp)
needsend = 1;
segment->xmit++;
kcp->xmit++;
kcp->lost_xmit++;
if (kcp->nodelay == 0)
{
segment->rto += _imax_(segment->rto, (IUINT32)kcp->rx_rto);
@@ -1217,7 +1211,6 @@ void ikcp_flush(ikcpcb *kcp)
{
needsend = 1;
segment->xmit++;
kcp->fast_retrans_xmit++;
segment->fastack = 0;
segment->resendts = current + segment->rto;
change++;

View File

@@ -300,10 +300,6 @@ struct IKCPCB
IINT32 rx_rttval, rx_srtt, rx_rto, rx_minrto;
IUINT32 snd_wnd, rcv_wnd, rmt_wnd, cwnd, probe;
IUINT32 current, interval, ts_flush, xmit;
IUINT32 fast_retrans_xmit;
IUINT32 early_retrans_xmit;
IUINT32 lost_xmit;
IUINT32 repeat_xmit;
IUINT32 nrcv_buf, nsnd_buf;
IUINT32 nrcv_que, nsnd_que;
IUINT32 nodelay, updated;