feat: 对接Python,暴露接口

This commit is contained in:
2026-03-30 22:48:36 +08:00
parent 24467c04c0
commit d678bfc326
22 changed files with 1311 additions and 51 deletions

2
.gitignore vendored
View File

@@ -11,3 +11,5 @@ peer-b-latency.*
root@117.78.11.244
c/bin
*__pycache__*

View File

@@ -2,6 +2,7 @@ CC ?= gcc
CFLAGS ?= -std=c11 -Wall -Wextra -O2 -pthread -D_GNU_SOURCE
CPPFLAGS ?= -Iinclude -Ithird_party/cjson -Ithird_party/kcp
LDFLAGS ?= -pthread
PYTHON ?= python3
BIN_DIR := bin
SRC_DIR := src
@@ -64,4 +65,10 @@ $(BIN_DIR)/kcpping: $(CMD_DIR)/kcpping.c $(COMMON_SRCS) | $(BIN_DIR)
clean:
rm -rf $(BIN_DIR)
.PHONY: all clean
python-ext:
cd python && $(PYTHON) setup.py build_ext --inplace
python-install:
cd python && $(PYTHON) -m pip install -e .
.PHONY: all clean python-ext python-install

View File

@@ -7,26 +7,32 @@ This subtree is intentionally standalone. The Go code stays in place as the beha
## Build
```bash
cd c
make
make -j$(nproc)
```
Build outputs:
- `c/bin/udpserver`
- `c/bin/udppeer`
- `c/bin/udpping`
- `c/bin/udprelay`
- `c/bin/kcpserver`
- `c/bin/kcppeer`
- `c/bin/kcpping`
- `./bin/udpserver`
- `./bin/udppeer`
- `./bin/udpping`
- `./bin/udprelay`
- `./bin/kcpserver`
- `./bin/kcppeer`
- `./bin/kcpping`
Python extension build:
```bash
make python-ext
make python-install
```
## Run On Different Machines
Server `D` runs the KCP hub on `0.0.0.0:10909`:
```bash
./c/bin/kcpserver -listen 0.0.0.0:10909 \
./bin/kcpserver -listen 0.0.0.0:10909 \
-kcp-ts-debug-log logs/d-kcp-ts.jsonl \
-kcp-session-stats-log logs/d-kcp-stats.jsonl
```
@@ -34,13 +40,13 @@ Server `D` runs the KCP hub on `0.0.0.0:10909`:
Relay `C` runs a raw UDP forwarder to `D`:
```bash
./c/bin/kcpserver -mode=relay -listen 0.0.0.0:10909 -relay-remote 172.21.32.15:10909
./bin/kcpserver -mode=relay -listen 0.0.0.0:10909 -relay-remote 172.21.32.15:10909
```
Peer `A` dials `D` through relay `C`:
```bash
./c/bin/kcppeer -id peer-a -server 172.21.32.15:10909 -relay-via 106.55.173.235:10909 \
./bin/kcppeer -id peer-a -server 172.21.32.15:10909 -relay-via 106.55.173.235:10909 \
-inbox-dir inbox/a \
-latency-log logs/a-latency.jsonl \
-kcp-ts-debug-log logs/a-kcp-ts.jsonl \
@@ -50,7 +56,7 @@ Peer `A` dials `D` through relay `C`:
Peer `B` dials `D` directly:
```bash
./c/bin/kcppeer -id peer-b -server 81.70.156.140:10909 \
./bin/kcppeer -id peer-b -server 81.70.156.140:10909 \
-inbox-dir inbox/b \
-latency-log logs/b-latency.jsonl \
-kcp-ts-debug-log logs/b-kcp-ts.jsonl \
@@ -60,13 +66,33 @@ Peer `B` dials `D` directly:
Optional ping / echo tools:
```bash
./c/bin/kcpping -id peer-a -server 106.55.173.235:10909 -echo
./c/bin/kcpping -id peer-b -server 81.70.156.140:10909 -to peer-a -count 20 -interval 100ms
./c/bin/udpserver -listen 0.0.0.0:9001
./c/bin/udppeer -id peer-a -server 127.0.0.1:9001
./c/bin/udpping -id pinger -server 127.0.0.1:9001 -to peer-a -count 20
./bin/kcpping -id peer-a -server 106.55.173.235:10909 -echo
./bin/kcpping -id peer-b -server 81.70.156.140:10909 -to peer-a -count 20 -interval 100ms
./bin/udpserver -listen 0.0.0.0:9001
./bin/udppeer -id peer-a -server 127.0.0.1:9001
./bin/udpping -id pinger -server 127.0.0.1:9001 -to peer-a -count 20
```
Python control/video demos use two KCP sessions:
- `peer-a-ctrl <-> peer-b-ctrl` for small binary control packets
- `peer-b-video -> peer-a-video` for larger binary video frames
Example demo entry points:
- `udp_keyboard_sender.py`
- `udp_xbox_sender.py`
- `udp_fsm_controller.py`
- `omnisocket_video_sender.py`
- `omnisocket_video_receiver.py`
- `scripts/kcp_control_benchmark.py`
Python `recv_into()` note:
- The writable buffer must be large enough for the full incoming payload.
- If the buffer is too small, `recv_into()` reports the required size but the current frame has already been consumed and is lost.
- For the video demo, keep `video_receiver.buffer_bytes >= video_sender.frame_bytes`.
## Interactive Commands
`udppeer` and `kcppeer` support the same interactive shell:
@@ -83,6 +109,8 @@ quit
- The C project targets Linux only.
- It preserves the Go wire format for UDP datagrams and KCP stream frames.
- It now supports `binary` payload messages in addition to `text`, `file`, `register`, and `error`.
- Python `Session.recv_into()` is a zero-copy receive helper for already-sized buffers; it does not retain oversized frames for a retry.
- It keeps runtime JSONL logging, UDP TX timestamp debug, KCP packet debug, and KCP session stats.
- Offline `latencysummary` and HTML chart generation are intentionally not migrated.
- No automated C tests are included in this subtree; validation is expected to happen on Linux via `make` and manual smoke tests.

View File

@@ -53,6 +53,15 @@ static void *kcppeer_receive_thread_main(void *arg) {
}
fprintf(stderr, "received file from %s to %s: %s (%lu bytes) -> %s\n", msg.from, msg.to, msg.file_name, (unsigned long) msg.body_len, persisted_path);
break;
case MSG_TYPE_BINARY:
if (kcp_client_persist_message(ctx->client, &msg, ctx->inbox_dir, persisted_path, sizeof(persisted_path)) != 0) {
fprintf(stderr, "kcppeer: persist binary payload from %s to %s failed\n", msg.from, msg.to);
protocol_message_clear(&msg);
ctx->rc = -1;
return NULL;
}
fprintf(stderr, "received binary payload from %s to %s (%lu bytes) -> %s\n", msg.from, msg.to, (unsigned long) msg.body_len, persisted_path);
break;
case MSG_TYPE_ERROR:
fprintf(stderr, "received error from %s to %s: %.*s\n", msg.from, msg.to, (int) msg.body_len, msg.body == NULL ? "" : (const char *) msg.body);
break;

View File

@@ -50,6 +50,15 @@ static void *udppeer_receive_thread_main(void *arg) {
}
fprintf(stderr, "received file from %s to %s: %s (%lu bytes) -> %s\n", msg.from, msg.to, msg.file_name, (unsigned long) msg.body_len, persisted_path);
break;
case MSG_TYPE_BINARY:
if (udp_client_persist_message(ctx->client, &msg, ctx->inbox_dir, persisted_path, sizeof(persisted_path)) != 0) {
fprintf(stderr, "udppeer: persist binary payload from %s to %s failed\n", msg.from, msg.to);
protocol_message_clear(&msg);
ctx->rc = -1;
return NULL;
}
fprintf(stderr, "received binary payload from %s to %s (%lu bytes) -> %s\n", msg.from, msg.to, (unsigned long) msg.body_len, persisted_path);
break;
case MSG_TYPE_ERROR:
fprintf(stderr, "received error from %s to %s: %.*s\n", msg.from, msg.to, (int) msg.body_len, msg.body == NULL ? "" : (const char *) msg.body);
break;

View File

@@ -0,0 +1,41 @@
transport:
server_addr: "127.0.0.1:10909"
relay_via: ""
bind_ip: ""
bind_device: ""
control_sender:
peer_id: "peer-a-ctrl"
target_peer: "peer-b-ctrl"
joy_topic: "/xbox_data"
deadzone: 0.10
analog_epsilon: 0.01
dpad_threshold: 0.50
trigger_pressed_threshold: -0.50
control_receiver:
peer_id: "peer-b-ctrl"
motion:
initial_lift: 0.89
lift_step: 0.05
max_surge: 1.0
max_sway: 0.5
max_spin: 0.5
max_lift: 0.90
min_lift: 0.65
surge_step: 0.1
sway_step: 0.1
spin_step: 0.1
video_sender:
peer_id: "peer-b-video"
target_peer: "peer-a-video"
frame_bytes: 30720
frame_interval_ms: 66
video_receiver:
peer_id: "peer-a-video"
# recv_into() requires a buffer large enough for the whole frame.
# If buffer_bytes is smaller than video_sender.frame_bytes, the oversize frame is dropped.
buffer_bytes: 65536

View File

@@ -8,12 +8,24 @@ extern "C" {
#endif
typedef struct kcp_client kcp_client_t;
typedef struct kcp_client_recv_meta {
message_type_t type;
uint64_t id;
char from[OMNI_MAX_PEER_ID];
char to[OMNI_MAX_PEER_ID];
char file_name[OMNI_MAX_FILE_NAME];
size_t body_len;
} kcp_client_recv_meta_t;
kcp_client_t *kcp_client_dial_with_options(const char *server_addr, const char *dial_addr, const char *peer_id, const char *bind_ip, const char *bind_device, const kcp_conn_options_t *options, latency_logger_t *logger, kcp_packet_debug_logger_t *packet_logger, kcp_session_stats_logger_t *stats_logger, int stats_interval_ms);
kcp_client_t *kcp_client_dial(const char *server_addr, const char *dial_addr, const char *peer_id, const char *bind_ip, const char *bind_device, latency_logger_t *logger, kcp_packet_debug_logger_t *packet_logger, kcp_session_stats_logger_t *stats_logger, int stats_interval_ms);
const char *kcp_client_id(const kcp_client_t *client);
int kcp_client_send_text(kcp_client_t *client, const char *to, const char *text);
int kcp_client_send_binary(kcp_client_t *client, const char *to, const void *data, size_t data_len);
int kcp_client_send_file_path(kcp_client_t *client, const char *to, const char *path);
int kcp_client_receive_timed(kcp_client_t *client, message_t *out_msg, int timeout_ms);
int kcp_client_receive(kcp_client_t *client, message_t *out_msg);
int kcp_client_receive_binary_into(kcp_client_t *client, void *buffer, size_t buffer_len, kcp_client_recv_meta_t *out_meta, int timeout_ms);
int kcp_client_persist_message(kcp_client_t *client, const message_t *msg, const char *inbox_dir, char *out_path, size_t out_path_len);
int kcp_client_close(kcp_client_t *client);
void kcp_client_free(kcp_client_t *client);

View File

@@ -12,6 +12,7 @@ typedef enum message_type {
MSG_TYPE_FILE = 1,
MSG_TYPE_REGISTER = 2,
MSG_TYPE_ERROR = 3,
MSG_TYPE_BINARY = 4,
MSG_TYPE_INVALID = 255
} message_type_t;

View File

@@ -9,20 +9,60 @@
extern "C" {
#endif
#define KCP_NODELAY 1
#define KCP_INTERVAL 10
#define KCP_RESEND 2
#define KCP_NC 1
#define KCP_WND_SIZE 256
#define KCP_MTU 1400
#define KCP_DEFAULT_NODELAY 1
#define KCP_DEFAULT_INTERVAL_MS 10
#define KCP_DEFAULT_RESEND 2
#define KCP_DEFAULT_NC 1
#define KCP_DEFAULT_SND_WND 256
#define KCP_DEFAULT_RCV_WND 256
#define KCP_DEFAULT_MTU 1400
#define KCP_DEFAULT_STATS_INTERVAL_MS 100
#define KCP_CONTROL_NODELAY 1
#define KCP_CONTROL_INTERVAL_MS 5
#define KCP_CONTROL_RESEND 2
#define KCP_CONTROL_NC 1
#define KCP_CONTROL_SND_WND 32
#define KCP_CONTROL_RCV_WND 32
#define KCP_CONTROL_MTU 1400
#define KCP_VIDEO_NODELAY 1
#define KCP_VIDEO_INTERVAL_MS 10
#define KCP_VIDEO_RESEND 2
#define KCP_VIDEO_NC 1
#define KCP_VIDEO_SND_WND 256
#define KCP_VIDEO_RCV_WND 256
#define KCP_VIDEO_MTU 1400
#define KCP_NODELAY KCP_DEFAULT_NODELAY
#define KCP_INTERVAL KCP_DEFAULT_INTERVAL_MS
#define KCP_RESEND KCP_DEFAULT_RESEND
#define KCP_NC KCP_DEFAULT_NC
#define KCP_WND_SIZE KCP_DEFAULT_SND_WND
#define KCP_MTU KCP_DEFAULT_MTU
typedef struct kcp_conn kcp_conn_t;
typedef struct kcp_listener kcp_listener_t;
typedef struct kcp_conn_options {
int nodelay;
int interval_ms;
int resend;
int nc;
int sndwnd;
int rcvwnd;
int mtu;
} kcp_conn_options_t;
void kcp_conn_options_init(kcp_conn_options_t *options);
void kcp_conn_options_set_control_defaults(kcp_conn_options_t *options);
void kcp_conn_options_set_video_defaults(kcp_conn_options_t *options);
kcp_conn_t *kcp_conn_dial_with_options(const char *server_addr, const char *bind_ip, const char *bind_device, const kcp_conn_options_t *options, kcp_packet_debug_logger_t *packet_logger, latency_logger_t *logger, const char *node_role, const char *node_id, kcp_session_stats_logger_t *stats_logger, int stats_interval_ms);
kcp_conn_t *kcp_conn_dial(const char *server_addr, const char *bind_ip, const char *bind_device, kcp_packet_debug_logger_t *packet_logger, latency_logger_t *logger, const char *node_role, const char *node_id, kcp_session_stats_logger_t *stats_logger, int stats_interval_ms);
int kcp_conn_configure_runtime(kcp_conn_t *conn, latency_logger_t *logger, const char *node_role, const char *node_id, kcp_session_stats_logger_t *stats_logger, int stats_interval_ms);
int kcp_conn_apply_options(kcp_conn_t *conn, const kcp_conn_options_t *options);
int kcp_conn_send(kcp_conn_t *conn, const message_t *msg);
int kcp_conn_receive_timed(kcp_conn_t *conn, message_t *out_msg, int timeout_ms);
int kcp_conn_receive(kcp_conn_t *conn, message_t *out_msg);
int kcp_conn_close(kcp_conn_t *conn);
void kcp_conn_free(kcp_conn_t *conn);

View File

@@ -0,0 +1,44 @@
try:
from ._omnisocket import (
MSG_TYPE_BINARY,
MSG_TYPE_ERROR,
MSG_TYPE_FILE,
MSG_TYPE_REGISTER,
MSG_TYPE_TEXT,
Session,
)
except ImportError as exc:
raise ImportError(
"omnisocket extension is not built; run `make python-ext` on a Linux host first"
) from exc
CONTROL_DEFAULTS = {
"nodelay": 1,
"interval_ms": 5,
"resend": 2,
"nc": 1,
"sndwnd": 32,
"rcvwnd": 32,
"mtu": 1400,
}
VIDEO_DEFAULTS = {
"nodelay": 1,
"interval_ms": 10,
"resend": 2,
"nc": 1,
"sndwnd": 256,
"rcvwnd": 256,
"mtu": 1400,
}
__all__ = [
"CONTROL_DEFAULTS",
"VIDEO_DEFAULTS",
"MSG_TYPE_BINARY",
"MSG_TYPE_ERROR",
"MSG_TYPE_FILE",
"MSG_TYPE_REGISTER",
"MSG_TYPE_TEXT",
"Session",
]

View File

@@ -0,0 +1,336 @@
#define PY_SSIZE_T_CLEAN
#include <Python.h>
#include "omnisocket_client.h"
typedef struct PyOmniSession {
PyObject_HEAD
omnisocket_session_t session;
} PyOmniSession;
static const char *PyOmniSession_recv_doc =
"recv(timeout_ms=-1) -> (from_peer, msg_type, payload) | None";
static const char *PyOmniSession_recv_into_doc =
"recv_into(buffer, timeout_ms=-1) -> dict | None\n"
"\n"
"The writable buffer must be large enough for the full message body.\n"
"If it is too small, BufferError reports the required size but the\n"
"current frame has already been consumed and is lost.";
static PyObject *PyOmniSession_new(PyTypeObject *type, PyObject *args, PyObject *kwargs) {
PyOmniSession *self;
(void) args;
(void) kwargs;
self = (PyOmniSession *) type->tp_alloc(type, 0);
if (self == NULL) {
return NULL;
}
if (omnisocket_session_init(&self->session) != 0) {
type->tp_free((PyObject *) self);
return PyErr_SetFromErrno(PyExc_OSError);
}
return (PyObject *) self;
}
static void PyOmniSession_dealloc(PyOmniSession *self) {
omnisocket_session_destroy(&self->session);
Py_TYPE(self)->tp_free((PyObject *) self);
}
static PyObject *PyOmniSession_connect(PyOmniSession *self, PyObject *args, PyObject *kwargs) {
const char *server_addr;
const char *peer_id;
const char *relay_via = "";
const char *bind_ip = "";
const char *bind_device = "";
int nodelay = KCP_DEFAULT_NODELAY;
int interval_ms = KCP_DEFAULT_INTERVAL_MS;
int resend = KCP_DEFAULT_RESEND;
int nc = KCP_DEFAULT_NC;
int sndwnd = KCP_DEFAULT_SND_WND;
int rcvwnd = KCP_DEFAULT_RCV_WND;
int mtu = KCP_DEFAULT_MTU;
int stats_interval_ms = KCP_DEFAULT_STATS_INTERVAL_MS;
kcp_conn_options_t options;
int rc;
static char *kwlist[] = {
"server_addr",
"peer_id",
"relay_via",
"bind_ip",
"bind_device",
"nodelay",
"interval_ms",
"resend",
"nc",
"sndwnd",
"rcvwnd",
"mtu",
"stats_interval_ms",
NULL
};
if (!PyArg_ParseTupleAndKeywords(
args,
kwargs,
"ss|sssiiiiiiii",
kwlist,
&server_addr,
&peer_id,
&relay_via,
&bind_ip,
&bind_device,
&nodelay,
&interval_ms,
&resend,
&nc,
&sndwnd,
&rcvwnd,
&mtu,
&stats_interval_ms)) {
return NULL;
}
kcp_conn_options_init(&options);
options.nodelay = nodelay;
options.interval_ms = interval_ms;
options.resend = resend;
options.nc = nc;
options.sndwnd = sndwnd;
options.rcvwnd = rcvwnd;
options.mtu = mtu;
Py_BEGIN_ALLOW_THREADS
rc = omnisocket_session_connect(
&self->session,
server_addr,
relay_via,
peer_id,
bind_ip,
bind_device,
&options,
stats_interval_ms
);
Py_END_ALLOW_THREADS
if (rc != 0) {
return PyErr_SetFromErrno(PyExc_OSError);
}
Py_RETURN_NONE;
}
static PyObject *PyOmniSession_close(PyOmniSession *self, PyObject *Py_UNUSED(ignored)) {
int rc;
Py_BEGIN_ALLOW_THREADS
rc = omnisocket_session_close(&self->session);
Py_END_ALLOW_THREADS
if (rc != 0) {
return PyErr_SetFromErrno(PyExc_OSError);
}
Py_RETURN_NONE;
}
static PyObject *PyOmniSession_send(PyOmniSession *self, PyObject *args, PyObject *kwargs) {
const char *to;
Py_buffer payload;
int rc;
static char *kwlist[] = {"to", "data", NULL};
memset(&payload, 0, sizeof(payload));
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "sy*", kwlist, &to, &payload)) {
return NULL;
}
Py_BEGIN_ALLOW_THREADS
rc = omnisocket_session_send(&self->session, to, payload.buf, (size_t) payload.len);
Py_END_ALLOW_THREADS
PyBuffer_Release(&payload);
if (rc != 0) {
return PyErr_SetFromErrno(PyExc_OSError);
}
Py_RETURN_NONE;
}
static PyObject *PyOmniSession_recv(PyOmniSession *self, PyObject *args, PyObject *kwargs) {
int timeout_ms = -1;
int rc;
message_t msg;
PyObject *body = NULL;
PyObject *result = NULL;
static char *kwlist[] = {"timeout_ms", NULL};
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|i", kwlist, &timeout_ms)) {
return NULL;
}
protocol_message_init(&msg);
Py_BEGIN_ALLOW_THREADS
rc = omnisocket_session_recv(&self->session, &msg, timeout_ms);
Py_END_ALLOW_THREADS
if (rc == 1) {
protocol_message_clear(&msg);
Py_RETURN_NONE;
}
if (rc != 0) {
protocol_message_clear(&msg);
return PyErr_SetFromErrno(PyExc_OSError);
}
body = PyBytes_FromStringAndSize((const char *) msg.body, (Py_ssize_t) msg.body_len);
if (body == NULL) {
protocol_message_clear(&msg);
return NULL;
}
result = Py_BuildValue("(siO)", msg.from, (int) msg.type, body);
Py_DECREF(body);
protocol_message_clear(&msg);
return result;
}
static PyObject *PyOmniSession_recv_into(PyOmniSession *self, PyObject *args, PyObject *kwargs) {
PyObject *buffer_obj;
Py_buffer view;
int timeout_ms = -1;
int rc;
kcp_client_recv_meta_t meta;
PyObject *result = NULL;
static char *kwlist[] = {"buffer", "timeout_ms", NULL};
memset(&view, 0, sizeof(view));
memset(&meta, 0, sizeof(meta));
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|i", kwlist, &buffer_obj, &timeout_ms)) {
return NULL;
}
if (PyObject_GetBuffer(buffer_obj, &view, PyBUF_WRITABLE) != 0) {
return NULL;
}
Py_BEGIN_ALLOW_THREADS
rc = omnisocket_session_recv_into(&self->session, view.buf, (size_t) view.len, &meta, timeout_ms);
Py_END_ALLOW_THREADS
PyBuffer_Release(&view);
if (rc == 1) {
Py_RETURN_NONE;
}
if (rc == 2) {
PyErr_Format(
PyExc_BufferError,
"buffer too small: need %zu bytes; current frame was already consumed and dropped",
meta.body_len
);
return NULL;
}
if (rc != 0) {
return PyErr_SetFromErrno(PyExc_OSError);
}
result = Py_BuildValue(
"{s:s,s:s,s:s,s:i,s:K,s:K}",
"from",
meta.from,
"to",
meta.to,
"file_name",
meta.file_name,
"msg_type",
(int) meta.type,
"message_id",
(unsigned long long) meta.id,
"body_len",
(unsigned long long) meta.body_len
);
return result;
}
static PyObject *PyOmniSession_stats(PyOmniSession *self, PyObject *Py_UNUSED(ignored)) {
omnisocket_session_stats_t stats;
memset(&stats, 0, sizeof(stats));
omnisocket_session_stats_snapshot(&self->session, &stats);
return Py_BuildValue(
"{s:K,s:K,s:K,s:K,s:K,s:K,s:K,s:i}",
"send_calls",
(unsigned long long) stats.send_calls,
"send_bytes",
(unsigned long long) stats.send_bytes,
"send_errors",
(unsigned long long) stats.send_errors,
"recv_calls",
(unsigned long long) stats.recv_calls,
"recv_bytes",
(unsigned long long) stats.recv_bytes,
"recv_timeouts",
(unsigned long long) stats.recv_timeouts,
"recv_errors",
(unsigned long long) stats.recv_errors,
"connected",
stats.connected
);
}
static PyMethodDef PyOmniSession_methods[] = {
{"connect", (PyCFunction) PyOmniSession_connect, METH_VARARGS | METH_KEYWORDS, NULL},
{"close", (PyCFunction) PyOmniSession_close, METH_NOARGS, NULL},
{"send", (PyCFunction) PyOmniSession_send, METH_VARARGS | METH_KEYWORDS, NULL},
{"recv", (PyCFunction) PyOmniSession_recv, METH_VARARGS | METH_KEYWORDS, PyOmniSession_recv_doc},
{"recv_into", (PyCFunction) PyOmniSession_recv_into, METH_VARARGS | METH_KEYWORDS, PyOmniSession_recv_into_doc},
{"stats", (PyCFunction) PyOmniSession_stats, METH_NOARGS, NULL},
{NULL, NULL, 0, NULL}
};
static PyTypeObject PyOmniSessionType = {
PyVarObject_HEAD_INIT(NULL, 0)
};
static PyModuleDef omnisocket_module = {
PyModuleDef_HEAD_INIT,
.m_name = "_omnisocket",
.m_size = -1,
};
PyMODINIT_FUNC PyInit__omnisocket(void) {
PyObject *module;
PyOmniSessionType.tp_name = "omnisocket.Session";
PyOmniSessionType.tp_basicsize = sizeof(PyOmniSession);
PyOmniSessionType.tp_flags = Py_TPFLAGS_DEFAULT;
PyOmniSessionType.tp_new = PyOmniSession_new;
PyOmniSessionType.tp_dealloc = (destructor) PyOmniSession_dealloc;
PyOmniSessionType.tp_methods = PyOmniSession_methods;
if (PyType_Ready(&PyOmniSessionType) < 0) {
return NULL;
}
module = PyModule_Create(&omnisocket_module);
if (module == NULL) {
return NULL;
}
Py_INCREF(&PyOmniSessionType);
if (PyModule_AddObject(module, "Session", (PyObject *) &PyOmniSessionType) != 0) {
Py_DECREF(&PyOmniSessionType);
Py_DECREF(module);
return NULL;
}
if (PyModule_AddIntConstant(module, "MSG_TYPE_TEXT", MSG_TYPE_TEXT) != 0 ||
PyModule_AddIntConstant(module, "MSG_TYPE_FILE", MSG_TYPE_FILE) != 0 ||
PyModule_AddIntConstant(module, "MSG_TYPE_REGISTER", MSG_TYPE_REGISTER) != 0 ||
PyModule_AddIntConstant(module, "MSG_TYPE_ERROR", MSG_TYPE_ERROR) != 0 ||
PyModule_AddIntConstant(module, "MSG_TYPE_BINARY", MSG_TYPE_BINARY) != 0) {
Py_DECREF(module);
return NULL;
}
return module;
}

View File

@@ -0,0 +1,248 @@
#include "omnisocket_client.h"
int omnisocket_session_init(omnisocket_session_t *session) {
int rc;
if (session == NULL) {
errno = EINVAL;
return -1;
}
memset(session, 0, sizeof(*session));
rc = pthread_mutex_init(&session->mutex, NULL);
if (rc != 0) {
errno = rc;
return -1;
}
rc = pthread_cond_init(&session->idle_cond, NULL);
if (rc != 0) {
pthread_mutex_destroy(&session->mutex);
errno = rc;
return -1;
}
return 0;
}
void omnisocket_session_destroy(omnisocket_session_t *session) {
if (session == NULL) {
return;
}
(void) omnisocket_session_close(session);
pthread_cond_destroy(&session->idle_cond);
pthread_mutex_destroy(&session->mutex);
}
static int omnisocket_session_begin_client_op(omnisocket_session_t *session, kcp_client_t **out_client) {
if (session == NULL || out_client == NULL) {
errno = EINVAL;
return -1;
}
pthread_mutex_lock(&session->mutex);
if (session->closing) {
pthread_mutex_unlock(&session->mutex);
errno = ECANCELED;
return -1;
}
if (session->client == NULL) {
pthread_mutex_unlock(&session->mutex);
errno = ENOTCONN;
return -1;
}
*out_client = session->client;
session->active_ops += 1;
pthread_mutex_unlock(&session->mutex);
return 0;
}
int omnisocket_session_connect(
omnisocket_session_t *session,
const char *server_addr,
const char *relay_via,
const char *peer_id,
const char *bind_ip,
const char *bind_device,
const kcp_conn_options_t *options,
int stats_interval_ms
) {
kcp_client_t *client;
if (session == NULL || server_addr == NULL || peer_id == NULL) {
errno = EINVAL;
return -1;
}
pthread_mutex_lock(&session->mutex);
while (session->closing) {
pthread_cond_wait(&session->idle_cond, &session->mutex);
}
if (session->client != NULL) {
pthread_mutex_unlock(&session->mutex);
errno = EISCONN;
return -1;
}
client = kcp_client_dial_with_options(
server_addr,
relay_via,
peer_id,
bind_ip,
bind_device,
options,
NULL,
NULL,
NULL,
stats_interval_ms
);
if (client == NULL) {
pthread_mutex_unlock(&session->mutex);
return -1;
}
session->client = client;
session->stats.connected = 1;
pthread_mutex_unlock(&session->mutex);
return 0;
}
int omnisocket_session_close(omnisocket_session_t *session) {
kcp_client_t *client;
if (session == NULL) {
errno = EINVAL;
return -1;
}
pthread_mutex_lock(&session->mutex);
while (session->closing) {
pthread_cond_wait(&session->idle_cond, &session->mutex);
}
client = session->client;
if (client != NULL) {
session->closing = 1;
session->client = NULL;
}
session->stats.connected = 0;
pthread_mutex_unlock(&session->mutex);
if (client != NULL) {
kcp_client_close(client);
pthread_mutex_lock(&session->mutex);
while (session->active_ops > 0) {
pthread_cond_wait(&session->idle_cond, &session->mutex);
}
pthread_mutex_unlock(&session->mutex);
kcp_client_free(client);
pthread_mutex_lock(&session->mutex);
session->closing = 0;
pthread_cond_broadcast(&session->idle_cond);
pthread_mutex_unlock(&session->mutex);
}
return 0;
}
int omnisocket_session_send(omnisocket_session_t *session, const char *to, const void *data, size_t data_len) {
kcp_client_t *client;
int rc;
if (session == NULL || to == NULL || (data == NULL && data_len > 0)) {
errno = EINVAL;
return -1;
}
if (omnisocket_session_begin_client_op(session, &client) != 0) {
return -1;
}
rc = kcp_client_send_binary(client, to, data, data_len);
pthread_mutex_lock(&session->mutex);
if (rc == 0) {
session->stats.send_calls += 1;
session->stats.send_bytes += (uint64_t) data_len;
} else {
session->stats.send_errors += 1;
}
if (session->active_ops > 0) {
session->active_ops -= 1;
}
if (session->closing && session->active_ops == 0) {
pthread_cond_broadcast(&session->idle_cond);
}
pthread_mutex_unlock(&session->mutex);
return rc;
}
int omnisocket_session_recv(omnisocket_session_t *session, message_t *out_msg, int timeout_ms) {
kcp_client_t *client;
int rc;
if (session == NULL || out_msg == NULL) {
errno = EINVAL;
return -1;
}
if (omnisocket_session_begin_client_op(session, &client) != 0) {
return -1;
}
rc = kcp_client_receive_timed(client, out_msg, timeout_ms);
pthread_mutex_lock(&session->mutex);
if (rc == 0) {
session->stats.recv_calls += 1;
session->stats.recv_bytes += (uint64_t) out_msg->body_len;
} else if (rc == 1) {
session->stats.recv_timeouts += 1;
} else {
session->stats.recv_errors += 1;
}
if (session->active_ops > 0) {
session->active_ops -= 1;
}
if (session->closing && session->active_ops == 0) {
pthread_cond_broadcast(&session->idle_cond);
}
pthread_mutex_unlock(&session->mutex);
return rc;
}
int omnisocket_session_recv_into(
omnisocket_session_t *session,
void *buffer,
size_t buffer_len,
kcp_client_recv_meta_t *out_meta,
int timeout_ms
) {
kcp_client_t *client;
int rc;
if (session == NULL || out_meta == NULL || (buffer == NULL && buffer_len > 0)) {
errno = EINVAL;
return -1;
}
if (omnisocket_session_begin_client_op(session, &client) != 0) {
return -1;
}
rc = kcp_client_receive_binary_into(client, buffer, buffer_len, out_meta, timeout_ms);
pthread_mutex_lock(&session->mutex);
if (rc == 0) {
session->stats.recv_calls += 1;
session->stats.recv_bytes += (uint64_t) out_meta->body_len;
} else if (rc == 1) {
session->stats.recv_timeouts += 1;
} else {
session->stats.recv_errors += 1;
}
if (session->active_ops > 0) {
session->active_ops -= 1;
}
if (session->closing && session->active_ops == 0) {
pthread_cond_broadcast(&session->idle_cond);
}
pthread_mutex_unlock(&session->mutex);
return rc;
}
void omnisocket_session_stats_snapshot(omnisocket_session_t *session, omnisocket_session_stats_t *out_stats) {
if (session == NULL || out_stats == NULL) {
return;
}
pthread_mutex_lock(&session->mutex);
*out_stats = session->stats;
pthread_mutex_unlock(&session->mutex);
}

View File

@@ -0,0 +1,51 @@
#ifndef OMNISOCKET_PY_CLIENT_H
#define OMNISOCKET_PY_CLIENT_H
#include "peer_kcp_client.h"
typedef struct omnisocket_session_stats {
uint64_t send_calls;
uint64_t send_bytes;
uint64_t send_errors;
uint64_t recv_calls;
uint64_t recv_bytes;
uint64_t recv_timeouts;
uint64_t recv_errors;
int connected;
} omnisocket_session_stats_t;
typedef struct omnisocket_session {
pthread_mutex_t mutex;
pthread_cond_t idle_cond;
kcp_client_t *client;
size_t active_ops;
int closing;
omnisocket_session_stats_t stats;
} omnisocket_session_t;
int omnisocket_session_init(omnisocket_session_t *session);
void omnisocket_session_destroy(omnisocket_session_t *session);
int omnisocket_session_connect(
omnisocket_session_t *session,
const char *server_addr,
const char *relay_via,
const char *peer_id,
const char *bind_ip,
const char *bind_device,
const kcp_conn_options_t *options,
int stats_interval_ms
);
int omnisocket_session_close(omnisocket_session_t *session);
int omnisocket_session_send(omnisocket_session_t *session, const char *to, const void *data, size_t data_len);
int omnisocket_session_recv(omnisocket_session_t *session, message_t *out_msg, int timeout_ms);
int omnisocket_session_recv_into(
omnisocket_session_t *session,
void *buffer,
size_t buffer_len,
kcp_client_recv_meta_t *out_meta,
int timeout_ms
);
void omnisocket_session_stats_snapshot(omnisocket_session_t *session, omnisocket_session_stats_t *out_stats);
#endif

58
python/setup.py Normal file
View File

@@ -0,0 +1,58 @@
from pathlib import Path
import sys
from setuptools import Extension, setup
ROOT = Path(__file__).resolve().parent.parent
PY_ROOT = Path(__file__).resolve().parent
if sys.platform != "linux":
raise RuntimeError("omnisocket Python extension can only be built on Linux")
COMMON_SOURCES = [
ROOT / "src" / "omni_common.c",
ROOT / "src" / "protocol.c",
ROOT / "src" / "latencylog.c",
ROOT / "src" / "tx_timestamp_debug.c",
ROOT / "src" / "kcp_packet_debug.c",
ROOT / "src" / "kcp_session_stats.c",
ROOT / "src" / "linux_timestamping.c",
ROOT / "src" / "interactive.c",
ROOT / "src" / "transport_udp.c",
ROOT / "src" / "transport_kcp.c",
ROOT / "src" / "server_udp_relay.c",
ROOT / "src" / "server_udp_hub.c",
ROOT / "src" / "server_kcp_hub.c",
ROOT / "src" / "peer_udp_client.c",
ROOT / "src" / "peer_kcp_client.c",
ROOT / "third_party" / "cjson" / "cJSON.c",
ROOT / "third_party" / "kcp" / "ikcp.c",
]
setup(
name="omnisocket",
version="0.1.0",
packages=["omnisocket"],
ext_modules=[
Extension(
"omnisocket._omnisocket",
sources=[
str(PY_ROOT / "omnisocket" / "_omnisocket.c"),
str(PY_ROOT / "omnisocket" / "omnisocket_client.c"),
*[str(path) for path in COMMON_SOURCES],
],
include_dirs=[
str(ROOT / "include"),
str(ROOT / "third_party" / "cjson"),
str(ROOT / "third_party" / "kcp"),
str(PY_ROOT / "omnisocket"),
],
define_macros=[("_GNU_SOURCE", None)],
extra_compile_args=["-std=c11", "-O2", "-pthread"],
extra_link_args=["-pthread"],
)
],
)

View File

@@ -0,0 +1,76 @@
"""Send high-rate control packets to benchmark the KCP control session."""
from __future__ import annotations
import argparse
from pathlib import Path
import sys
import time
ROOT = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(ROOT))
import yaml
from omnisocket_control import make_control_packet
try:
from omnisocket import CONTROL_DEFAULTS, Session
except ImportError:
sys.path.insert(0, str(ROOT / "python"))
from omnisocket import CONTROL_DEFAULTS, Session
def load_config() -> dict:
config_path = ROOT / "config" / "omnisocket_demo.yaml"
if not config_path.exists():
return {}
with config_path.open("r", encoding="utf-8") as file:
return yaml.safe_load(file) or {}
def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument("--rate", type=float, default=200.0, help="send rate in Hz")
parser.add_argument("--count", type=int, default=1000, help="packets to send")
args = parser.parse_args()
config = load_config()
transport_cfg = config.get("transport", {})
sender_cfg = config.get("control_sender", {})
session = Session()
session.connect(
server_addr=str(transport_cfg.get("server_addr", "127.0.0.1:10909")),
peer_id=str(sender_cfg.get("peer_id", "peer-a-ctrl")),
relay_via=str(transport_cfg.get("relay_via", "")),
bind_ip=str(transport_cfg.get("bind_ip", "")),
bind_device=str(transport_cfg.get("bind_device", "")),
**CONTROL_DEFAULTS,
)
target_peer = str(sender_cfg.get("target_peer", "peer-b-ctrl"))
spacing = 1.0 / args.rate if args.rate > 0 else 0.0
start = time.perf_counter()
try:
for seq_id in range(args.count):
packet = make_control_packet(seq_id, "set_surge", drive_value=0.25)
session.send(to=target_peer, data=packet.encode())
if spacing > 0:
target = start + (seq_id + 1) * spacing
remaining = target - time.perf_counter()
if remaining > 0:
time.sleep(remaining)
finally:
elapsed = time.perf_counter() - start
print(
f"sent {args.count} control packets in {elapsed:.3f}s "
f"({(args.count / elapsed) if elapsed > 0 else 0.0:.1f} pkt/s)"
)
print(f"stats={session.stats()}")
session.close()
if __name__ == "__main__":
main()

View File

@@ -113,7 +113,7 @@ int latencylog_is_business_message(const message_t *msg) {
if (msg == NULL) {
return 0;
}
return msg->type == MSG_TYPE_TEXT || msg->type == MSG_TYPE_FILE;
return msg->type == MSG_TYPE_TEXT || msg->type == MSG_TYPE_FILE || msg->type == MSG_TYPE_BINARY;
}
void latencylog_log_message_event(latency_logger_t *logger, const char *node_role, const char *node_id, const char *event_name, const message_t *msg) {

View File

@@ -66,6 +66,11 @@ static int kcp_client_persist_message_to_disk(const message_t *msg, const char *
if (omni_write_file(path, msg->body, msg->body_len) != 0) {
return -1;
}
} else if (msg->type == MSG_TYPE_BINARY) {
snprintf(path, sizeof(path), "%s/%s-%" PRIu64 ".bin", inbox_dir, msg->from, msg->id);
if (omni_write_file(path, msg->body, msg->body_len) != 0) {
return -1;
}
} else {
errno = EINVAL;
return -1;
@@ -77,7 +82,20 @@ static int kcp_client_persist_message_to_disk(const message_t *msg, const char *
return 0;
}
kcp_client_t *kcp_client_dial(const char *server_addr, const char *dial_addr, const char *peer_id, const char *bind_ip, const char *bind_device, latency_logger_t *logger, kcp_packet_debug_logger_t *packet_logger, kcp_session_stats_logger_t *stats_logger, int stats_interval_ms) {
static void kcp_client_fill_recv_meta(kcp_client_recv_meta_t *meta, const message_t *msg) {
if (meta == NULL || msg == NULL) {
return;
}
memset(meta, 0, sizeof(*meta));
meta->type = msg->type;
meta->id = msg->id;
meta->body_len = msg->body_len;
snprintf(meta->from, sizeof(meta->from), "%s", msg->from);
snprintf(meta->to, sizeof(meta->to), "%s", msg->to);
snprintf(meta->file_name, sizeof(meta->file_name), "%s", msg->file_name);
}
kcp_client_t *kcp_client_dial_with_options(const char *server_addr, const char *dial_addr, const char *peer_id, const char *bind_ip, const char *bind_device, const kcp_conn_options_t *options, latency_logger_t *logger, kcp_packet_debug_logger_t *packet_logger, kcp_session_stats_logger_t *stats_logger, int stats_interval_ms) {
kcp_client_t *client;
const char *actual_dial_addr = (dial_addr != NULL && dial_addr[0] != '\0') ? dial_addr : server_addr;
message_t register_msg;
@@ -91,7 +109,7 @@ kcp_client_t *kcp_client_dial(const char *server_addr, const char *dial_addr, co
snprintf(client->server_addr, sizeof(client->server_addr), "%s", server_addr == NULL ? "" : server_addr);
pthread_mutex_init(&client->id_mu, NULL);
client->logger = logger;
client->conn = kcp_conn_dial(actual_dial_addr, bind_ip, bind_device, packet_logger, logger, OMNI_NODE_ROLE_PEER, peer_id, stats_logger, stats_interval_ms);
client->conn = kcp_conn_dial_with_options(actual_dial_addr, bind_ip, bind_device, options, packet_logger, logger, OMNI_NODE_ROLE_PEER, peer_id, stats_logger, stats_interval_ms);
if (client->conn == NULL) {
saved_errno = errno;
kcp_client_free(client);
@@ -113,6 +131,10 @@ kcp_client_t *kcp_client_dial(const char *server_addr, const char *dial_addr, co
return client;
}
kcp_client_t *kcp_client_dial(const char *server_addr, const char *dial_addr, const char *peer_id, const char *bind_ip, const char *bind_device, latency_logger_t *logger, kcp_packet_debug_logger_t *packet_logger, kcp_session_stats_logger_t *stats_logger, int stats_interval_ms) {
return kcp_client_dial_with_options(server_addr, dial_addr, peer_id, bind_ip, bind_device, NULL, logger, packet_logger, stats_logger, stats_interval_ms);
}
const char *kcp_client_id(const kcp_client_t *client) {
return client == NULL ? "" : client->id;
}
@@ -121,6 +143,10 @@ int kcp_client_send_text(kcp_client_t *client, const char *to, const char *text)
message_t msg;
uint64_t id;
if (client == NULL || to == NULL || text == NULL) {
errno = EINVAL;
return -1;
}
protocol_message_init(&msg);
kcp_client_next_message_id(client, &id);
msg.type = MSG_TYPE_TEXT;
@@ -141,6 +167,37 @@ int kcp_client_send_text(kcp_client_t *client, const char *to, const char *text)
return 0;
}
int kcp_client_send_binary(kcp_client_t *client, const char *to, const void *data, size_t data_len) {
message_t msg;
uint64_t id;
if (client == NULL || to == NULL || (data == NULL && data_len > 0)) {
errno = EINVAL;
return -1;
}
protocol_message_init(&msg);
kcp_client_next_message_id(client, &id);
msg.type = MSG_TYPE_BINARY;
msg.id = id;
snprintf(msg.from, sizeof(msg.from), "%s", client->id);
snprintf(msg.to, sizeof(msg.to), "%s", to);
if (data_len > 0) {
msg.body = (uint8_t *) malloc(data_len);
if (msg.body == NULL) {
return -1;
}
memcpy(msg.body, data, data_len);
}
msg.body_len = data_len;
latencylog_log_message_event(client->logger, OMNI_NODE_ROLE_PEER, client->id, EVENT_A_APP_PREP_BEGIN, &msg);
if (kcp_conn_send(client->conn, &msg) != 0) {
protocol_message_clear(&msg);
return -1;
}
protocol_message_clear(&msg);
return 0;
}
int kcp_client_send_file_path(kcp_client_t *client, const char *to, const char *path) {
message_t msg;
uint64_t id;
@@ -148,6 +205,10 @@ int kcp_client_send_file_path(kcp_client_t *client, const char *to, const char *
size_t body_len = 0;
const char *base_name = strrchr(path, '/');
if (client == NULL || to == NULL || path == NULL) {
errno = EINVAL;
return -1;
}
if (omni_read_file(path, &body, &body_len) != 0) {
return -1;
}
@@ -169,14 +230,57 @@ int kcp_client_send_file_path(kcp_client_t *client, const char *to, const char *
return 0;
}
int kcp_client_receive(kcp_client_t *client, message_t *out_msg) {
if (kcp_conn_receive(client->conn, out_msg) != 0) {
int kcp_client_receive_timed(kcp_client_t *client, message_t *out_msg, int timeout_ms) {
int rc;
if (client == NULL || out_msg == NULL) {
errno = EINVAL;
return -1;
}
rc = kcp_conn_receive_timed(client->conn, out_msg, timeout_ms);
if (rc != 0) {
return rc;
}
latencylog_log_message_event(client->logger, OMNI_NODE_ROLE_PEER, client->id, EVENT_B_APP_RECV, out_msg);
return 0;
}
int kcp_client_receive(kcp_client_t *client, message_t *out_msg) {
if (kcp_client_receive_timed(client, out_msg, -1) != 0) {
return -1;
}
return 0;
}
int kcp_client_receive_binary_into(kcp_client_t *client, void *buffer, size_t buffer_len, kcp_client_recv_meta_t *out_meta, int timeout_ms) {
message_t msg;
int rc;
if (client == NULL || (buffer == NULL && buffer_len > 0) || out_meta == NULL) {
errno = EINVAL;
return -1;
}
protocol_message_init(&msg);
rc = kcp_client_receive_timed(client, &msg, timeout_ms);
if (rc != 0) {
return rc;
}
kcp_client_fill_recv_meta(out_meta, &msg);
if (msg.body_len > buffer_len) {
protocol_message_clear(&msg);
errno = EMSGSIZE;
return 2;
}
if (msg.body_len > 0) {
memcpy(buffer, msg.body, msg.body_len);
}
protocol_message_clear(&msg);
return 0;
}
int kcp_client_persist_message(kcp_client_t *client, const message_t *msg, const char *inbox_dir, char *out_path, size_t out_path_len) {
if (!latencylog_is_business_message(msg)) {
errno = EINVAL;

View File

@@ -55,6 +55,11 @@ static int client_persist_message_to_disk(const message_t *msg, const char *inbo
if (omni_write_file(path, msg->body, msg->body_len) != 0) {
return -1;
}
} else if (msg->type == MSG_TYPE_BINARY) {
snprintf(path, sizeof(path), "%s/%s-%" PRIu64 ".bin", inbox_dir, msg->from, msg->id);
if (omni_write_file(path, msg->body, msg->body_len) != 0) {
return -1;
}
} else {
errno = EINVAL;
return -1;

View File

@@ -8,11 +8,12 @@ static const char *protocol_message_type_table[] = {
"text",
"file",
"register",
"error"
"error",
"binary"
};
const char *protocol_message_type_name(message_type_t type) {
if ((int) type < 0 || type >= MSG_TYPE_INVALID) {
if ((int) type < 0 || (size_t) type >= OMNI_ARRAY_LEN(protocol_message_type_table)) {
return "invalid";
}
return protocol_message_type_table[type];
@@ -102,6 +103,11 @@ int protocol_validate_message(const message_t *msg, char *err, size_t err_len) {
return protocol_set_err(err, err_len, "protocol: missing file name");
}
break;
case MSG_TYPE_BINARY:
if (msg->file_name[0] != '\0') {
return protocol_set_err(err, err_len, "protocol: unexpected file name");
}
break;
case MSG_TYPE_REGISTER:
if (strcmp(msg->to, SERVER_PEER_ID) != 0) {
return protocol_set_err(err, err_len, "protocol: invalid register target");

View File

@@ -63,6 +63,36 @@ static kcp_peer_entry_t *kcp_hub_find_peer(kcp_hub_t *hub, const char *peer_id)
return NULL;
}
static int kcp_hub_peer_id_has_suffix(const char *peer_id, const char *suffix) {
size_t peer_len;
size_t suffix_len;
if (peer_id == NULL || suffix == NULL) {
return 0;
}
peer_len = strlen(peer_id);
suffix_len = strlen(suffix);
return peer_len >= suffix_len && strcmp(peer_id + peer_len - suffix_len, suffix) == 0;
}
static int kcp_hub_configure_peer_transport(kcp_conn_t *conn, const char *peer_id) {
kcp_conn_options_t options;
if (conn == NULL || peer_id == NULL) {
errno = EINVAL;
return -1;
}
if (kcp_hub_peer_id_has_suffix(peer_id, "-ctrl")) {
kcp_conn_options_set_control_defaults(&options);
return kcp_conn_apply_options(conn, &options);
}
if (kcp_hub_peer_id_has_suffix(peer_id, "-video")) {
kcp_conn_options_set_video_defaults(&options);
return kcp_conn_apply_options(conn, &options);
}
return 0;
}
static int kcp_hub_send_server_error(kcp_conn_t *conn, const char *to, const char *message) {
message_t msg;
protocol_message_init(&msg);
@@ -255,6 +285,7 @@ static int kcp_hub_handle_peer_message(kcp_hub_t *hub, const char *peer_id, kcp_
switch (msg->type) {
case MSG_TYPE_TEXT:
case MSG_TYPE_FILE:
case MSG_TYPE_BINARY:
snprintf(msg->from, sizeof(msg->from), "%s", peer_id);
if (kcp_hub_deliver_to_local_peer(hub, msg) == 0) {
return 0;
@@ -294,7 +325,7 @@ static int kcp_hub_handle_peer_message(kcp_hub_t *hub, const char *peer_id, kcp_
return 0;
case MSG_TYPE_REGISTER:
case MSG_TYPE_ERROR:
if (kcp_hub_send_server_error(conn, peer_id, "registered peers can only send text or file messages") != 0) {
if (kcp_hub_send_server_error(conn, peer_id, "registered peers can only send text, file, or binary messages") != 0) {
return -1;
}
errno = EPROTO;
@@ -358,6 +389,11 @@ static int kcp_hub_register_conn(kcp_hub_t *hub, kcp_conn_t *conn, char *peer_id
pthread_rwlock_unlock(&hub->lock);
snprintf(peer_id, peer_id_len, "%s", msg.from);
if (kcp_hub_configure_peer_transport(conn, peer_id) != 0) {
kcp_hub_unregister(hub, peer_id, conn);
protocol_message_clear(&msg);
return -1;
}
protocol_message_clear(&msg);
return 0;
}
@@ -518,7 +554,7 @@ int kcp_hub_serve_relay(kcp_hub_t *hub) {
protocol_message_clear(&msg);
continue;
}
if (msg.type != MSG_TYPE_TEXT && msg.type != MSG_TYPE_FILE && msg.type != MSG_TYPE_ERROR) {
if (msg.type != MSG_TYPE_TEXT && msg.type != MSG_TYPE_FILE && msg.type != MSG_TYPE_BINARY && msg.type != MSG_TYPE_ERROR) {
protocol_message_clear(&msg);
continue;
}

View File

@@ -112,7 +112,7 @@ int udp_hub_serve(udp_hub_t *hub) {
pthread_rwlock_unlock(&hub->lock);
continue;
}
if (msg.type != MSG_TYPE_TEXT && msg.type != MSG_TYPE_FILE) {
if (msg.type != MSG_TYPE_TEXT && msg.type != MSG_TYPE_FILE && msg.type != MSG_TYPE_BINARY) {
if (msg.type == MSG_TYPE_ERROR) {
udp_hub_send_error(hub, &addr, addr_len, msg.from, "peers cannot send error messages");
} else {

View File

@@ -60,6 +60,8 @@ struct kcp_conn {
int update_thread_started;
pthread_t stats_thread;
int stats_thread_started;
kcp_conn_options_t options;
int update_interval_ms;
uint64_t pending_bytes_sent;
uint64_t pending_bytes_received;
uint64_t pending_in_pkts;
@@ -141,6 +143,82 @@ struct kcp_process_sampler {
static pthread_mutex_t g_kcp_process_sampler_mu = PTHREAD_MUTEX_INITIALIZER;
static kcp_process_sampler_t *g_kcp_process_samplers = NULL;
void kcp_conn_options_init(kcp_conn_options_t *options) {
if (options == NULL) {
return;
}
memset(options, 0, sizeof(*options));
options->nodelay = KCP_DEFAULT_NODELAY;
options->interval_ms = KCP_DEFAULT_INTERVAL_MS;
options->resend = KCP_DEFAULT_RESEND;
options->nc = KCP_DEFAULT_NC;
options->sndwnd = KCP_DEFAULT_SND_WND;
options->rcvwnd = KCP_DEFAULT_RCV_WND;
options->mtu = KCP_DEFAULT_MTU;
}
void kcp_conn_options_set_control_defaults(kcp_conn_options_t *options) {
if (options == NULL) {
return;
}
memset(options, 0, sizeof(*options));
options->nodelay = KCP_CONTROL_NODELAY;
options->interval_ms = KCP_CONTROL_INTERVAL_MS;
options->resend = KCP_CONTROL_RESEND;
options->nc = KCP_CONTROL_NC;
options->sndwnd = KCP_CONTROL_SND_WND;
options->rcvwnd = KCP_CONTROL_RCV_WND;
options->mtu = KCP_CONTROL_MTU;
}
void kcp_conn_options_set_video_defaults(kcp_conn_options_t *options) {
if (options == NULL) {
return;
}
memset(options, 0, sizeof(*options));
options->nodelay = KCP_VIDEO_NODELAY;
options->interval_ms = KCP_VIDEO_INTERVAL_MS;
options->resend = KCP_VIDEO_RESEND;
options->nc = KCP_VIDEO_NC;
options->sndwnd = KCP_VIDEO_SND_WND;
options->rcvwnd = KCP_VIDEO_RCV_WND;
options->mtu = KCP_VIDEO_MTU;
}
static int kcp_conn_validate_options(const kcp_conn_options_t *options) {
if (options == NULL) {
errno = EINVAL;
return -1;
}
if (options->interval_ms <= 0 || options->sndwnd <= 0 || options->rcvwnd <= 0 || options->mtu <= 0) {
errno = EINVAL;
return -1;
}
return 0;
}
static int kcp_conn_apply_options_locked(kcp_conn_t *conn, const kcp_conn_options_t *options) {
if (conn == NULL || conn->kcp == NULL || kcp_conn_validate_options(options) != 0) {
return -1;
}
if (ikcp_wndsize(conn->kcp, options->sndwnd, options->rcvwnd) != 0) {
errno = EINVAL;
return -1;
}
if (ikcp_setmtu(conn->kcp, options->mtu) != 0) {
errno = EINVAL;
return -1;
}
if (ikcp_nodelay(conn->kcp, options->nodelay, options->interval_ms, options->resend, options->nc) != 0) {
errno = EINVAL;
return -1;
}
conn->kcp->stream = 1;
conn->options = *options;
conn->update_interval_ms = options->interval_ms;
return 0;
}
static void kcp_parse_packet_segments(const uint8_t *packet, size_t len, uint32_t *conv, kcp_packet_debug_segment_t **segments, size_t *segment_count) {
size_t offset = 0;
size_t count = 0;
@@ -1107,10 +1185,12 @@ static void *kcp_client_recv_thread_main(void *arg) {
static void *kcp_update_thread_main(void *arg) {
kcp_conn_t *conn = (kcp_conn_t *) arg;
while (!atomic_load(&conn->closed)) {
int interval_ms;
pthread_mutex_lock(&conn->kcp_mu);
ikcp_update(conn->kcp, omni_now_millis32());
interval_ms = conn->update_interval_ms > 0 ? conn->update_interval_ms : KCP_DEFAULT_INTERVAL_MS;
pthread_mutex_unlock(&conn->kcp_mu);
usleep(10000);
usleep((useconds_t) interval_ms * 1000U);
}
return NULL;
}
@@ -1129,10 +1209,11 @@ static int kcp_conn_start_stats_thread(kcp_conn_t *conn) {
return 0;
}
static kcp_conn_t *kcp_conn_alloc_common(int fd, const struct sockaddr_storage *remote_addr, socklen_t remote_addr_len, kcp_socket_debug_state_t *sock_state, latency_logger_t *logger, const char *node_role, const char *node_id, kcp_session_stats_logger_t *stats_logger, int stats_interval_ms) {
static kcp_conn_t *kcp_conn_alloc_common(int fd, const struct sockaddr_storage *remote_addr, socklen_t remote_addr_len, const kcp_conn_options_t *options, kcp_socket_debug_state_t *sock_state, latency_logger_t *logger, const char *node_role, const char *node_id, kcp_session_stats_logger_t *stats_logger, int stats_interval_ms) {
kcp_conn_t *conn = (kcp_conn_t *) calloc(1, sizeof(*conn));
uint32_t conv;
int thread_rc;
kcp_conn_options_t effective_options;
if (conn == NULL) {
errno = ENOMEM;
@@ -1150,6 +1231,12 @@ static kcp_conn_t *kcp_conn_alloc_common(int fd, const struct sockaddr_storage *
snprintf(conn->node_id, sizeof(conn->node_id), "%s", node_id == NULL ? "" : node_id);
conn->stats_logger = stats_logger;
conn->stats_interval_ms = stats_interval_ms > 0 ? stats_interval_ms : KCP_DEFAULT_STATS_INTERVAL_MS;
kcp_conn_options_init(&effective_options);
if (options != NULL) {
effective_options = *options;
}
conn->options = effective_options;
conn->update_interval_ms = effective_options.interval_ms;
conn->sock_state = sock_state;
if (omni_random_u32(&conv) != 0) {
protocol_frame_decoder_destroy(&conn->decoder);
@@ -1170,10 +1257,15 @@ static kcp_conn_t *kcp_conn_alloc_common(int fd, const struct sockaddr_storage *
return NULL;
}
ikcp_setoutput(conn->kcp, kcp_output_callback_impl);
ikcp_wndsize(conn->kcp, KCP_WND_SIZE, KCP_WND_SIZE);
ikcp_setmtu(conn->kcp, KCP_MTU);
ikcp_nodelay(conn->kcp, KCP_NODELAY, KCP_INTERVAL, KCP_RESEND, KCP_NC);
conn->kcp->stream = 1;
if (kcp_conn_apply_options_locked(conn, &effective_options) != 0) {
ikcp_release(conn->kcp);
protocol_frame_decoder_destroy(&conn->decoder);
pthread_cond_destroy(&conn->rx_cond);
pthread_mutex_destroy(&conn->kcp_mu);
pthread_mutex_destroy(&conn->close_mu);
free(conn);
return NULL;
}
if (kcp_conn_attach_process_sampler(conn) != 0) {
ikcp_release(conn->kcp);
protocol_frame_decoder_destroy(&conn->decoder);
@@ -1213,7 +1305,7 @@ static kcp_conn_t *kcp_conn_alloc_common(int fd, const struct sockaddr_storage *
return conn;
}
kcp_conn_t *kcp_conn_dial(const char *server_addr, const char *bind_ip, const char *bind_device, kcp_packet_debug_logger_t *packet_logger, latency_logger_t *logger, const char *node_role, const char *node_id, kcp_session_stats_logger_t *stats_logger, int stats_interval_ms) {
kcp_conn_t *kcp_conn_dial_with_options(const char *server_addr, const char *bind_ip, const char *bind_device, const kcp_conn_options_t *options, kcp_packet_debug_logger_t *packet_logger, latency_logger_t *logger, const char *node_role, const char *node_id, kcp_session_stats_logger_t *stats_logger, int stats_interval_ms) {
struct sockaddr_storage remote_addr;
socklen_t remote_len;
int family;
@@ -1236,7 +1328,7 @@ kcp_conn_t *kcp_conn_dial(const char *server_addr, const char *bind_ip, const ch
close(fd);
return NULL;
}
conn = kcp_conn_alloc_common(fd, &remote_addr, remote_len, sock_state, logger, node_role, node_id, stats_logger, stats_interval_ms);
conn = kcp_conn_alloc_common(fd, &remote_addr, remote_len, options, sock_state, logger, node_role, node_id, stats_logger, stats_interval_ms);
if (conn == NULL) {
kcp_socket_debug_destroy(sock_state);
free(sock_state);
@@ -1255,6 +1347,10 @@ kcp_conn_t *kcp_conn_dial(const char *server_addr, const char *bind_ip, const ch
return conn;
}
kcp_conn_t *kcp_conn_dial(const char *server_addr, const char *bind_ip, const char *bind_device, kcp_packet_debug_logger_t *packet_logger, latency_logger_t *logger, const char *node_role, const char *node_id, kcp_session_stats_logger_t *stats_logger, int stats_interval_ms) {
return kcp_conn_dial_with_options(server_addr, bind_ip, bind_device, NULL, packet_logger, logger, node_role, node_id, stats_logger, stats_interval_ms);
}
static void kcp_listener_enqueue_accept(kcp_listener_t *listener, kcp_conn_t *conn) {
pthread_mutex_lock(&listener->accept_mu);
if (listener->accept_tail == NULL) {
@@ -1381,6 +1477,7 @@ static void *kcp_listener_recv_thread_main(void *arg) {
if (conn == NULL) {
conn = (kcp_conn_t *) calloc(1, sizeof(*conn));
if (conn != NULL) {
kcp_conn_options_t accepted_options;
conn->fd = listener->fd;
memcpy(&conn->remote_addr, &source, sizeof(source));
conn->remote_addr_len = msg.msg_namelen;
@@ -1393,15 +1490,15 @@ static void *kcp_listener_recv_thread_main(void *arg) {
conn->stats_interval_ms = KCP_DEFAULT_STATS_INTERVAL_MS;
conn->sock_state = &listener->sock_state;
conn->listener = listener;
kcp_conn_options_init(&accepted_options);
conn->options = accepted_options;
conn->update_interval_ms = accepted_options.interval_ms;
conn->kcp = ikcp_create(conv, conn);
if (conn->kcp != NULL) {
int update_started = 0;
ikcp_setoutput(conn->kcp, kcp_output_callback_impl);
ikcp_wndsize(conn->kcp, KCP_WND_SIZE, KCP_WND_SIZE);
ikcp_setmtu(conn->kcp, KCP_MTU);
ikcp_nodelay(conn->kcp, KCP_NODELAY, KCP_INTERVAL, KCP_RESEND, KCP_NC);
conn->kcp->stream = 1;
if (pthread_create(&conn->update_thread, NULL, kcp_update_thread_main, conn) == 0) {
if (kcp_conn_apply_options_locked(conn, &accepted_options) == 0 &&
pthread_create(&conn->update_thread, NULL, kcp_update_thread_main, conn) == 0) {
update_started = 1;
}
if (update_started && kcp_listener_add_session(listener, conv, conn) == 0) {
@@ -1537,6 +1634,19 @@ int kcp_conn_configure_runtime(kcp_conn_t *conn, latency_logger_t *logger, const
return 0;
}
int kcp_conn_apply_options(kcp_conn_t *conn, const kcp_conn_options_t *options) {
int rc;
if (conn == NULL || options == NULL) {
errno = EINVAL;
return -1;
}
pthread_mutex_lock(&conn->kcp_mu);
rc = kcp_conn_apply_options_locked(conn, options);
pthread_mutex_unlock(&conn->kcp_mu);
return rc;
}
int kcp_conn_send(kcp_conn_t *conn, const message_t *msg) {
uint8_t *frame = NULL;
size_t frame_len = 0;
@@ -1577,15 +1687,31 @@ int kcp_conn_send(kcp_conn_t *conn, const message_t *msg) {
return 0;
}
int kcp_conn_receive(kcp_conn_t *conn, message_t *out_msg) {
static void kcp_timespec_deadline_after_ms(struct timespec *deadline, int timeout_ms) {
clock_gettime(CLOCK_REALTIME, deadline);
deadline->tv_sec += timeout_ms / 1000;
deadline->tv_nsec += (long) (timeout_ms % 1000) * 1000000L;
if (deadline->tv_nsec >= 1000000000L) {
deadline->tv_sec += 1;
deadline->tv_nsec -= 1000000000L;
}
}
int kcp_conn_receive_timed(kcp_conn_t *conn, message_t *out_msg, int timeout_ms) {
uint8_t *frame = NULL;
size_t frame_len = 0;
char err[128];
int next_rc;
struct timespec deadline;
int use_deadline = timeout_ms > 0;
if (conn == NULL || out_msg == NULL) {
errno = EINVAL;
return -1;
}
if (use_deadline) {
kcp_timespec_deadline_after_ms(&deadline, timeout_ms);
}
for (;;) {
next_rc = protocol_frame_decoder_next(&conn->decoder, &frame, &frame_len);
if (next_rc < 0) {
@@ -1618,12 +1744,33 @@ int kcp_conn_receive(kcp_conn_t *conn, message_t *out_msg) {
errno = ECANCELED;
return -1;
}
if (timeout_ms == 0) {
pthread_mutex_unlock(&conn->kcp_mu);
return 1;
}
if (timeout_ms < 0) {
pthread_cond_wait(&conn->rx_cond, &conn->kcp_mu);
} else {
int wait_rc = pthread_cond_timedwait(&conn->rx_cond, &conn->kcp_mu, &deadline);
if (wait_rc == ETIMEDOUT) {
pthread_mutex_unlock(&conn->kcp_mu);
return 1;
}
if (wait_rc != 0) {
pthread_mutex_unlock(&conn->kcp_mu);
errno = wait_rc;
return -1;
}
}
}
pthread_mutex_unlock(&conn->kcp_mu);
}
}
int kcp_conn_receive(kcp_conn_t *conn, message_t *out_msg) {
return kcp_conn_receive_timed(conn, out_msg, -1);
}
uint32_t kcp_conn_conv(const kcp_conn_t *conn) {
return conn == NULL || conn->kcp == NULL ? 0 : conn->kcp->conv;
}