diff --git a/.gitignore b/.gitignore index 9f437b1..9e8716c 100644 --- a/.gitignore +++ b/.gitignore @@ -10,4 +10,6 @@ peer-b-latency.* *.log root@117.78.11.244 -c/bin \ No newline at end of file +c/bin + +*__pycache__* \ No newline at end of file diff --git a/Makefile b/Makefile index 3968094..a9386e3 100644 --- a/Makefile +++ b/Makefile @@ -2,6 +2,7 @@ CC ?= gcc CFLAGS ?= -std=c11 -Wall -Wextra -O2 -pthread -D_GNU_SOURCE CPPFLAGS ?= -Iinclude -Ithird_party/cjson -Ithird_party/kcp LDFLAGS ?= -pthread +PYTHON ?= python3 BIN_DIR := bin SRC_DIR := src @@ -64,4 +65,10 @@ $(BIN_DIR)/kcpping: $(CMD_DIR)/kcpping.c $(COMMON_SRCS) | $(BIN_DIR) clean: rm -rf $(BIN_DIR) -.PHONY: all clean +python-ext: + cd python && $(PYTHON) setup.py build_ext --inplace + +python-install: + cd python && $(PYTHON) -m pip install -e . + +.PHONY: all clean python-ext python-install diff --git a/README.md b/README.md index 3e98a38..c77e734 100644 --- a/README.md +++ b/README.md @@ -7,26 +7,32 @@ This subtree is intentionally standalone. The Go code stays in place as the beha ## Build ```bash -cd c -make +make -j$(nproc) ``` Build outputs: -- `c/bin/udpserver` -- `c/bin/udppeer` -- `c/bin/udpping` -- `c/bin/udprelay` -- `c/bin/kcpserver` -- `c/bin/kcppeer` -- `c/bin/kcpping` +- `./bin/udpserver` +- `./bin/udppeer` +- `./bin/udpping` +- `./bin/udprelay` +- `./bin/kcpserver` +- `./bin/kcppeer` +- `./bin/kcpping` + +Python extension build: + +```bash +make python-ext +make python-install +``` ## Run On Different Machines Server `D` runs the KCP hub on `0.0.0.0:10909`: ```bash -./c/bin/kcpserver -listen 0.0.0.0:10909 \ +./bin/kcpserver -listen 0.0.0.0:10909 \ -kcp-ts-debug-log logs/d-kcp-ts.jsonl \ -kcp-session-stats-log logs/d-kcp-stats.jsonl ``` @@ -34,13 +40,13 @@ Server `D` runs the KCP hub on `0.0.0.0:10909`: Relay `C` runs a raw UDP forwarder to `D`: ```bash -./c/bin/kcpserver -mode=relay -listen 0.0.0.0:10909 -relay-remote 172.21.32.15:10909 +./bin/kcpserver -mode=relay -listen 0.0.0.0:10909 -relay-remote 172.21.32.15:10909 ``` Peer `A` dials `D` through relay `C`: ```bash -./c/bin/kcppeer -id peer-a -server 172.21.32.15:10909 -relay-via 106.55.173.235:10909 \ +./bin/kcppeer -id peer-a -server 172.21.32.15:10909 -relay-via 106.55.173.235:10909 \ -inbox-dir inbox/a \ -latency-log logs/a-latency.jsonl \ -kcp-ts-debug-log logs/a-kcp-ts.jsonl \ @@ -50,7 +56,7 @@ Peer `A` dials `D` through relay `C`: Peer `B` dials `D` directly: ```bash -./c/bin/kcppeer -id peer-b -server 81.70.156.140:10909 \ +./bin/kcppeer -id peer-b -server 81.70.156.140:10909 \ -inbox-dir inbox/b \ -latency-log logs/b-latency.jsonl \ -kcp-ts-debug-log logs/b-kcp-ts.jsonl \ @@ -60,13 +66,33 @@ Peer `B` dials `D` directly: Optional ping / echo tools: ```bash -./c/bin/kcpping -id peer-a -server 106.55.173.235:10909 -echo -./c/bin/kcpping -id peer-b -server 81.70.156.140:10909 -to peer-a -count 20 -interval 100ms -./c/bin/udpserver -listen 0.0.0.0:9001 -./c/bin/udppeer -id peer-a -server 127.0.0.1:9001 -./c/bin/udpping -id pinger -server 127.0.0.1:9001 -to peer-a -count 20 +./bin/kcpping -id peer-a -server 106.55.173.235:10909 -echo +./bin/kcpping -id peer-b -server 81.70.156.140:10909 -to peer-a -count 20 -interval 100ms +./bin/udpserver -listen 0.0.0.0:9001 +./bin/udppeer -id peer-a -server 127.0.0.1:9001 +./bin/udpping -id pinger -server 127.0.0.1:9001 -to peer-a -count 20 ``` +Python control/video demos use two KCP sessions: + +- `peer-a-ctrl <-> peer-b-ctrl` for small binary control packets +- `peer-b-video -> peer-a-video` for larger binary video frames + +Example demo entry points: + +- `udp_keyboard_sender.py` +- `udp_xbox_sender.py` +- `udp_fsm_controller.py` +- `omnisocket_video_sender.py` +- `omnisocket_video_receiver.py` +- `scripts/kcp_control_benchmark.py` + +Python `recv_into()` note: + +- The writable buffer must be large enough for the full incoming payload. +- If the buffer is too small, `recv_into()` reports the required size but the current frame has already been consumed and is lost. +- For the video demo, keep `video_receiver.buffer_bytes >= video_sender.frame_bytes`. + ## Interactive Commands `udppeer` and `kcppeer` support the same interactive shell: @@ -83,6 +109,8 @@ quit - The C project targets Linux only. - It preserves the Go wire format for UDP datagrams and KCP stream frames. +- It now supports `binary` payload messages in addition to `text`, `file`, `register`, and `error`. +- Python `Session.recv_into()` is a zero-copy receive helper for already-sized buffers; it does not retain oversized frames for a retry. - It keeps runtime JSONL logging, UDP TX timestamp debug, KCP packet debug, and KCP session stats. - Offline `latencysummary` and HTML chart generation are intentionally not migrated. - No automated C tests are included in this subtree; validation is expected to happen on Linux via `make` and manual smoke tests. diff --git a/cmd/kcppeer.c b/cmd/kcppeer.c index 64e7afb..7913a21 100644 --- a/cmd/kcppeer.c +++ b/cmd/kcppeer.c @@ -53,6 +53,15 @@ static void *kcppeer_receive_thread_main(void *arg) { } fprintf(stderr, "received file from %s to %s: %s (%lu bytes) -> %s\n", msg.from, msg.to, msg.file_name, (unsigned long) msg.body_len, persisted_path); break; + case MSG_TYPE_BINARY: + if (kcp_client_persist_message(ctx->client, &msg, ctx->inbox_dir, persisted_path, sizeof(persisted_path)) != 0) { + fprintf(stderr, "kcppeer: persist binary payload from %s to %s failed\n", msg.from, msg.to); + protocol_message_clear(&msg); + ctx->rc = -1; + return NULL; + } + fprintf(stderr, "received binary payload from %s to %s (%lu bytes) -> %s\n", msg.from, msg.to, (unsigned long) msg.body_len, persisted_path); + break; case MSG_TYPE_ERROR: fprintf(stderr, "received error from %s to %s: %.*s\n", msg.from, msg.to, (int) msg.body_len, msg.body == NULL ? "" : (const char *) msg.body); break; diff --git a/cmd/udppeer.c b/cmd/udppeer.c index a3aa5c9..e5656ae 100644 --- a/cmd/udppeer.c +++ b/cmd/udppeer.c @@ -50,6 +50,15 @@ static void *udppeer_receive_thread_main(void *arg) { } fprintf(stderr, "received file from %s to %s: %s (%lu bytes) -> %s\n", msg.from, msg.to, msg.file_name, (unsigned long) msg.body_len, persisted_path); break; + case MSG_TYPE_BINARY: + if (udp_client_persist_message(ctx->client, &msg, ctx->inbox_dir, persisted_path, sizeof(persisted_path)) != 0) { + fprintf(stderr, "udppeer: persist binary payload from %s to %s failed\n", msg.from, msg.to); + protocol_message_clear(&msg); + ctx->rc = -1; + return NULL; + } + fprintf(stderr, "received binary payload from %s to %s (%lu bytes) -> %s\n", msg.from, msg.to, (unsigned long) msg.body_len, persisted_path); + break; case MSG_TYPE_ERROR: fprintf(stderr, "received error from %s to %s: %.*s\n", msg.from, msg.to, (int) msg.body_len, msg.body == NULL ? "" : (const char *) msg.body); break; diff --git a/config/omnisocket_demo.yaml b/config/omnisocket_demo.yaml new file mode 100644 index 0000000..80542f0 --- /dev/null +++ b/config/omnisocket_demo.yaml @@ -0,0 +1,41 @@ +transport: + server_addr: "127.0.0.1:10909" + relay_via: "" + bind_ip: "" + bind_device: "" + +control_sender: + peer_id: "peer-a-ctrl" + target_peer: "peer-b-ctrl" + joy_topic: "/xbox_data" + deadzone: 0.10 + analog_epsilon: 0.01 + dpad_threshold: 0.50 + trigger_pressed_threshold: -0.50 + +control_receiver: + peer_id: "peer-b-ctrl" + +motion: + initial_lift: 0.89 + lift_step: 0.05 + max_surge: 1.0 + max_sway: 0.5 + max_spin: 0.5 + max_lift: 0.90 + min_lift: 0.65 + surge_step: 0.1 + sway_step: 0.1 + spin_step: 0.1 + +video_sender: + peer_id: "peer-b-video" + target_peer: "peer-a-video" + frame_bytes: 30720 + frame_interval_ms: 66 + +video_receiver: + peer_id: "peer-a-video" + # recv_into() requires a buffer large enough for the whole frame. + # If buffer_bytes is smaller than video_sender.frame_bytes, the oversize frame is dropped. + buffer_bytes: 65536 diff --git a/include/peer_kcp_client.h b/include/peer_kcp_client.h index f1be8f8..d61111a 100644 --- a/include/peer_kcp_client.h +++ b/include/peer_kcp_client.h @@ -8,12 +8,24 @@ extern "C" { #endif typedef struct kcp_client kcp_client_t; +typedef struct kcp_client_recv_meta { + message_type_t type; + uint64_t id; + char from[OMNI_MAX_PEER_ID]; + char to[OMNI_MAX_PEER_ID]; + char file_name[OMNI_MAX_FILE_NAME]; + size_t body_len; +} kcp_client_recv_meta_t; +kcp_client_t *kcp_client_dial_with_options(const char *server_addr, const char *dial_addr, const char *peer_id, const char *bind_ip, const char *bind_device, const kcp_conn_options_t *options, latency_logger_t *logger, kcp_packet_debug_logger_t *packet_logger, kcp_session_stats_logger_t *stats_logger, int stats_interval_ms); kcp_client_t *kcp_client_dial(const char *server_addr, const char *dial_addr, const char *peer_id, const char *bind_ip, const char *bind_device, latency_logger_t *logger, kcp_packet_debug_logger_t *packet_logger, kcp_session_stats_logger_t *stats_logger, int stats_interval_ms); const char *kcp_client_id(const kcp_client_t *client); int kcp_client_send_text(kcp_client_t *client, const char *to, const char *text); +int kcp_client_send_binary(kcp_client_t *client, const char *to, const void *data, size_t data_len); int kcp_client_send_file_path(kcp_client_t *client, const char *to, const char *path); +int kcp_client_receive_timed(kcp_client_t *client, message_t *out_msg, int timeout_ms); int kcp_client_receive(kcp_client_t *client, message_t *out_msg); +int kcp_client_receive_binary_into(kcp_client_t *client, void *buffer, size_t buffer_len, kcp_client_recv_meta_t *out_meta, int timeout_ms); int kcp_client_persist_message(kcp_client_t *client, const message_t *msg, const char *inbox_dir, char *out_path, size_t out_path_len); int kcp_client_close(kcp_client_t *client); void kcp_client_free(kcp_client_t *client); diff --git a/include/protocol.h b/include/protocol.h index 82560c6..a6c64ad 100644 --- a/include/protocol.h +++ b/include/protocol.h @@ -12,6 +12,7 @@ typedef enum message_type { MSG_TYPE_FILE = 1, MSG_TYPE_REGISTER = 2, MSG_TYPE_ERROR = 3, + MSG_TYPE_BINARY = 4, MSG_TYPE_INVALID = 255 } message_type_t; diff --git a/include/transport_kcp.h b/include/transport_kcp.h index 202c287..d7c3c95 100644 --- a/include/transport_kcp.h +++ b/include/transport_kcp.h @@ -9,20 +9,60 @@ extern "C" { #endif -#define KCP_NODELAY 1 -#define KCP_INTERVAL 10 -#define KCP_RESEND 2 -#define KCP_NC 1 -#define KCP_WND_SIZE 256 -#define KCP_MTU 1400 +#define KCP_DEFAULT_NODELAY 1 +#define KCP_DEFAULT_INTERVAL_MS 10 +#define KCP_DEFAULT_RESEND 2 +#define KCP_DEFAULT_NC 1 +#define KCP_DEFAULT_SND_WND 256 +#define KCP_DEFAULT_RCV_WND 256 +#define KCP_DEFAULT_MTU 1400 #define KCP_DEFAULT_STATS_INTERVAL_MS 100 +#define KCP_CONTROL_NODELAY 1 +#define KCP_CONTROL_INTERVAL_MS 5 +#define KCP_CONTROL_RESEND 2 +#define KCP_CONTROL_NC 1 +#define KCP_CONTROL_SND_WND 32 +#define KCP_CONTROL_RCV_WND 32 +#define KCP_CONTROL_MTU 1400 + +#define KCP_VIDEO_NODELAY 1 +#define KCP_VIDEO_INTERVAL_MS 10 +#define KCP_VIDEO_RESEND 2 +#define KCP_VIDEO_NC 1 +#define KCP_VIDEO_SND_WND 256 +#define KCP_VIDEO_RCV_WND 256 +#define KCP_VIDEO_MTU 1400 + +#define KCP_NODELAY KCP_DEFAULT_NODELAY +#define KCP_INTERVAL KCP_DEFAULT_INTERVAL_MS +#define KCP_RESEND KCP_DEFAULT_RESEND +#define KCP_NC KCP_DEFAULT_NC +#define KCP_WND_SIZE KCP_DEFAULT_SND_WND +#define KCP_MTU KCP_DEFAULT_MTU + typedef struct kcp_conn kcp_conn_t; typedef struct kcp_listener kcp_listener_t; +typedef struct kcp_conn_options { + int nodelay; + int interval_ms; + int resend; + int nc; + int sndwnd; + int rcvwnd; + int mtu; +} kcp_conn_options_t; +void kcp_conn_options_init(kcp_conn_options_t *options); +void kcp_conn_options_set_control_defaults(kcp_conn_options_t *options); +void kcp_conn_options_set_video_defaults(kcp_conn_options_t *options); + +kcp_conn_t *kcp_conn_dial_with_options(const char *server_addr, const char *bind_ip, const char *bind_device, const kcp_conn_options_t *options, kcp_packet_debug_logger_t *packet_logger, latency_logger_t *logger, const char *node_role, const char *node_id, kcp_session_stats_logger_t *stats_logger, int stats_interval_ms); kcp_conn_t *kcp_conn_dial(const char *server_addr, const char *bind_ip, const char *bind_device, kcp_packet_debug_logger_t *packet_logger, latency_logger_t *logger, const char *node_role, const char *node_id, kcp_session_stats_logger_t *stats_logger, int stats_interval_ms); int kcp_conn_configure_runtime(kcp_conn_t *conn, latency_logger_t *logger, const char *node_role, const char *node_id, kcp_session_stats_logger_t *stats_logger, int stats_interval_ms); +int kcp_conn_apply_options(kcp_conn_t *conn, const kcp_conn_options_t *options); int kcp_conn_send(kcp_conn_t *conn, const message_t *msg); +int kcp_conn_receive_timed(kcp_conn_t *conn, message_t *out_msg, int timeout_ms); int kcp_conn_receive(kcp_conn_t *conn, message_t *out_msg); int kcp_conn_close(kcp_conn_t *conn); void kcp_conn_free(kcp_conn_t *conn); diff --git a/python/omnisocket/__init__.py b/python/omnisocket/__init__.py new file mode 100644 index 0000000..d9244cf --- /dev/null +++ b/python/omnisocket/__init__.py @@ -0,0 +1,44 @@ +try: + from ._omnisocket import ( + MSG_TYPE_BINARY, + MSG_TYPE_ERROR, + MSG_TYPE_FILE, + MSG_TYPE_REGISTER, + MSG_TYPE_TEXT, + Session, + ) +except ImportError as exc: + raise ImportError( + "omnisocket extension is not built; run `make python-ext` on a Linux host first" + ) from exc + +CONTROL_DEFAULTS = { + "nodelay": 1, + "interval_ms": 5, + "resend": 2, + "nc": 1, + "sndwnd": 32, + "rcvwnd": 32, + "mtu": 1400, +} + +VIDEO_DEFAULTS = { + "nodelay": 1, + "interval_ms": 10, + "resend": 2, + "nc": 1, + "sndwnd": 256, + "rcvwnd": 256, + "mtu": 1400, +} + +__all__ = [ + "CONTROL_DEFAULTS", + "VIDEO_DEFAULTS", + "MSG_TYPE_BINARY", + "MSG_TYPE_ERROR", + "MSG_TYPE_FILE", + "MSG_TYPE_REGISTER", + "MSG_TYPE_TEXT", + "Session", +] diff --git a/python/omnisocket/_omnisocket.c b/python/omnisocket/_omnisocket.c new file mode 100644 index 0000000..4027388 --- /dev/null +++ b/python/omnisocket/_omnisocket.c @@ -0,0 +1,336 @@ +#define PY_SSIZE_T_CLEAN +#include + +#include "omnisocket_client.h" + +typedef struct PyOmniSession { + PyObject_HEAD + omnisocket_session_t session; +} PyOmniSession; + +static const char *PyOmniSession_recv_doc = + "recv(timeout_ms=-1) -> (from_peer, msg_type, payload) | None"; + +static const char *PyOmniSession_recv_into_doc = + "recv_into(buffer, timeout_ms=-1) -> dict | None\n" + "\n" + "The writable buffer must be large enough for the full message body.\n" + "If it is too small, BufferError reports the required size but the\n" + "current frame has already been consumed and is lost."; + +static PyObject *PyOmniSession_new(PyTypeObject *type, PyObject *args, PyObject *kwargs) { + PyOmniSession *self; + (void) args; + (void) kwargs; + + self = (PyOmniSession *) type->tp_alloc(type, 0); + if (self == NULL) { + return NULL; + } + if (omnisocket_session_init(&self->session) != 0) { + type->tp_free((PyObject *) self); + return PyErr_SetFromErrno(PyExc_OSError); + } + return (PyObject *) self; +} + +static void PyOmniSession_dealloc(PyOmniSession *self) { + omnisocket_session_destroy(&self->session); + Py_TYPE(self)->tp_free((PyObject *) self); +} + +static PyObject *PyOmniSession_connect(PyOmniSession *self, PyObject *args, PyObject *kwargs) { + const char *server_addr; + const char *peer_id; + const char *relay_via = ""; + const char *bind_ip = ""; + const char *bind_device = ""; + int nodelay = KCP_DEFAULT_NODELAY; + int interval_ms = KCP_DEFAULT_INTERVAL_MS; + int resend = KCP_DEFAULT_RESEND; + int nc = KCP_DEFAULT_NC; + int sndwnd = KCP_DEFAULT_SND_WND; + int rcvwnd = KCP_DEFAULT_RCV_WND; + int mtu = KCP_DEFAULT_MTU; + int stats_interval_ms = KCP_DEFAULT_STATS_INTERVAL_MS; + kcp_conn_options_t options; + int rc; + + static char *kwlist[] = { + "server_addr", + "peer_id", + "relay_via", + "bind_ip", + "bind_device", + "nodelay", + "interval_ms", + "resend", + "nc", + "sndwnd", + "rcvwnd", + "mtu", + "stats_interval_ms", + NULL + }; + + if (!PyArg_ParseTupleAndKeywords( + args, + kwargs, + "ss|sssiiiiiiii", + kwlist, + &server_addr, + &peer_id, + &relay_via, + &bind_ip, + &bind_device, + &nodelay, + &interval_ms, + &resend, + &nc, + &sndwnd, + &rcvwnd, + &mtu, + &stats_interval_ms)) { + return NULL; + } + + kcp_conn_options_init(&options); + options.nodelay = nodelay; + options.interval_ms = interval_ms; + options.resend = resend; + options.nc = nc; + options.sndwnd = sndwnd; + options.rcvwnd = rcvwnd; + options.mtu = mtu; + + Py_BEGIN_ALLOW_THREADS + rc = omnisocket_session_connect( + &self->session, + server_addr, + relay_via, + peer_id, + bind_ip, + bind_device, + &options, + stats_interval_ms + ); + Py_END_ALLOW_THREADS + + if (rc != 0) { + return PyErr_SetFromErrno(PyExc_OSError); + } + Py_RETURN_NONE; +} + +static PyObject *PyOmniSession_close(PyOmniSession *self, PyObject *Py_UNUSED(ignored)) { + int rc; + + Py_BEGIN_ALLOW_THREADS + rc = omnisocket_session_close(&self->session); + Py_END_ALLOW_THREADS + + if (rc != 0) { + return PyErr_SetFromErrno(PyExc_OSError); + } + Py_RETURN_NONE; +} + +static PyObject *PyOmniSession_send(PyOmniSession *self, PyObject *args, PyObject *kwargs) { + const char *to; + Py_buffer payload; + int rc; + static char *kwlist[] = {"to", "data", NULL}; + + memset(&payload, 0, sizeof(payload)); + if (!PyArg_ParseTupleAndKeywords(args, kwargs, "sy*", kwlist, &to, &payload)) { + return NULL; + } + + Py_BEGIN_ALLOW_THREADS + rc = omnisocket_session_send(&self->session, to, payload.buf, (size_t) payload.len); + Py_END_ALLOW_THREADS + + PyBuffer_Release(&payload); + if (rc != 0) { + return PyErr_SetFromErrno(PyExc_OSError); + } + Py_RETURN_NONE; +} + +static PyObject *PyOmniSession_recv(PyOmniSession *self, PyObject *args, PyObject *kwargs) { + int timeout_ms = -1; + int rc; + message_t msg; + PyObject *body = NULL; + PyObject *result = NULL; + static char *kwlist[] = {"timeout_ms", NULL}; + + if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|i", kwlist, &timeout_ms)) { + return NULL; + } + + protocol_message_init(&msg); + Py_BEGIN_ALLOW_THREADS + rc = omnisocket_session_recv(&self->session, &msg, timeout_ms); + Py_END_ALLOW_THREADS + + if (rc == 1) { + protocol_message_clear(&msg); + Py_RETURN_NONE; + } + if (rc != 0) { + protocol_message_clear(&msg); + return PyErr_SetFromErrno(PyExc_OSError); + } + + body = PyBytes_FromStringAndSize((const char *) msg.body, (Py_ssize_t) msg.body_len); + if (body == NULL) { + protocol_message_clear(&msg); + return NULL; + } + result = Py_BuildValue("(siO)", msg.from, (int) msg.type, body); + Py_DECREF(body); + protocol_message_clear(&msg); + return result; +} + +static PyObject *PyOmniSession_recv_into(PyOmniSession *self, PyObject *args, PyObject *kwargs) { + PyObject *buffer_obj; + Py_buffer view; + int timeout_ms = -1; + int rc; + kcp_client_recv_meta_t meta; + PyObject *result = NULL; + static char *kwlist[] = {"buffer", "timeout_ms", NULL}; + + memset(&view, 0, sizeof(view)); + memset(&meta, 0, sizeof(meta)); + + if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|i", kwlist, &buffer_obj, &timeout_ms)) { + return NULL; + } + if (PyObject_GetBuffer(buffer_obj, &view, PyBUF_WRITABLE) != 0) { + return NULL; + } + + Py_BEGIN_ALLOW_THREADS + rc = omnisocket_session_recv_into(&self->session, view.buf, (size_t) view.len, &meta, timeout_ms); + Py_END_ALLOW_THREADS + + PyBuffer_Release(&view); + if (rc == 1) { + Py_RETURN_NONE; + } + if (rc == 2) { + PyErr_Format( + PyExc_BufferError, + "buffer too small: need %zu bytes; current frame was already consumed and dropped", + meta.body_len + ); + return NULL; + } + if (rc != 0) { + return PyErr_SetFromErrno(PyExc_OSError); + } + + result = Py_BuildValue( + "{s:s,s:s,s:s,s:i,s:K,s:K}", + "from", + meta.from, + "to", + meta.to, + "file_name", + meta.file_name, + "msg_type", + (int) meta.type, + "message_id", + (unsigned long long) meta.id, + "body_len", + (unsigned long long) meta.body_len + ); + return result; +} + +static PyObject *PyOmniSession_stats(PyOmniSession *self, PyObject *Py_UNUSED(ignored)) { + omnisocket_session_stats_t stats; + + memset(&stats, 0, sizeof(stats)); + omnisocket_session_stats_snapshot(&self->session, &stats); + return Py_BuildValue( + "{s:K,s:K,s:K,s:K,s:K,s:K,s:K,s:i}", + "send_calls", + (unsigned long long) stats.send_calls, + "send_bytes", + (unsigned long long) stats.send_bytes, + "send_errors", + (unsigned long long) stats.send_errors, + "recv_calls", + (unsigned long long) stats.recv_calls, + "recv_bytes", + (unsigned long long) stats.recv_bytes, + "recv_timeouts", + (unsigned long long) stats.recv_timeouts, + "recv_errors", + (unsigned long long) stats.recv_errors, + "connected", + stats.connected + ); +} + +static PyMethodDef PyOmniSession_methods[] = { + {"connect", (PyCFunction) PyOmniSession_connect, METH_VARARGS | METH_KEYWORDS, NULL}, + {"close", (PyCFunction) PyOmniSession_close, METH_NOARGS, NULL}, + {"send", (PyCFunction) PyOmniSession_send, METH_VARARGS | METH_KEYWORDS, NULL}, + {"recv", (PyCFunction) PyOmniSession_recv, METH_VARARGS | METH_KEYWORDS, PyOmniSession_recv_doc}, + {"recv_into", (PyCFunction) PyOmniSession_recv_into, METH_VARARGS | METH_KEYWORDS, PyOmniSession_recv_into_doc}, + {"stats", (PyCFunction) PyOmniSession_stats, METH_NOARGS, NULL}, + {NULL, NULL, 0, NULL} +}; + +static PyTypeObject PyOmniSessionType = { + PyVarObject_HEAD_INIT(NULL, 0) +}; + +static PyModuleDef omnisocket_module = { + PyModuleDef_HEAD_INIT, + .m_name = "_omnisocket", + .m_size = -1, +}; + +PyMODINIT_FUNC PyInit__omnisocket(void) { + PyObject *module; + + PyOmniSessionType.tp_name = "omnisocket.Session"; + PyOmniSessionType.tp_basicsize = sizeof(PyOmniSession); + PyOmniSessionType.tp_flags = Py_TPFLAGS_DEFAULT; + PyOmniSessionType.tp_new = PyOmniSession_new; + PyOmniSessionType.tp_dealloc = (destructor) PyOmniSession_dealloc; + PyOmniSessionType.tp_methods = PyOmniSession_methods; + + if (PyType_Ready(&PyOmniSessionType) < 0) { + return NULL; + } + + module = PyModule_Create(&omnisocket_module); + if (module == NULL) { + return NULL; + } + + Py_INCREF(&PyOmniSessionType); + if (PyModule_AddObject(module, "Session", (PyObject *) &PyOmniSessionType) != 0) { + Py_DECREF(&PyOmniSessionType); + Py_DECREF(module); + return NULL; + } + + if (PyModule_AddIntConstant(module, "MSG_TYPE_TEXT", MSG_TYPE_TEXT) != 0 || + PyModule_AddIntConstant(module, "MSG_TYPE_FILE", MSG_TYPE_FILE) != 0 || + PyModule_AddIntConstant(module, "MSG_TYPE_REGISTER", MSG_TYPE_REGISTER) != 0 || + PyModule_AddIntConstant(module, "MSG_TYPE_ERROR", MSG_TYPE_ERROR) != 0 || + PyModule_AddIntConstant(module, "MSG_TYPE_BINARY", MSG_TYPE_BINARY) != 0) { + Py_DECREF(module); + return NULL; + } + + return module; +} diff --git a/python/omnisocket/omnisocket_client.c b/python/omnisocket/omnisocket_client.c new file mode 100644 index 0000000..3557780 --- /dev/null +++ b/python/omnisocket/omnisocket_client.c @@ -0,0 +1,248 @@ +#include "omnisocket_client.h" + +int omnisocket_session_init(omnisocket_session_t *session) { + int rc; + + if (session == NULL) { + errno = EINVAL; + return -1; + } + memset(session, 0, sizeof(*session)); + rc = pthread_mutex_init(&session->mutex, NULL); + if (rc != 0) { + errno = rc; + return -1; + } + rc = pthread_cond_init(&session->idle_cond, NULL); + if (rc != 0) { + pthread_mutex_destroy(&session->mutex); + errno = rc; + return -1; + } + return 0; +} + +void omnisocket_session_destroy(omnisocket_session_t *session) { + if (session == NULL) { + return; + } + (void) omnisocket_session_close(session); + pthread_cond_destroy(&session->idle_cond); + pthread_mutex_destroy(&session->mutex); +} + +static int omnisocket_session_begin_client_op(omnisocket_session_t *session, kcp_client_t **out_client) { + if (session == NULL || out_client == NULL) { + errno = EINVAL; + return -1; + } + + pthread_mutex_lock(&session->mutex); + if (session->closing) { + pthread_mutex_unlock(&session->mutex); + errno = ECANCELED; + return -1; + } + if (session->client == NULL) { + pthread_mutex_unlock(&session->mutex); + errno = ENOTCONN; + return -1; + } + *out_client = session->client; + session->active_ops += 1; + pthread_mutex_unlock(&session->mutex); + return 0; +} + +int omnisocket_session_connect( + omnisocket_session_t *session, + const char *server_addr, + const char *relay_via, + const char *peer_id, + const char *bind_ip, + const char *bind_device, + const kcp_conn_options_t *options, + int stats_interval_ms +) { + kcp_client_t *client; + + if (session == NULL || server_addr == NULL || peer_id == NULL) { + errno = EINVAL; + return -1; + } + + pthread_mutex_lock(&session->mutex); + while (session->closing) { + pthread_cond_wait(&session->idle_cond, &session->mutex); + } + if (session->client != NULL) { + pthread_mutex_unlock(&session->mutex); + errno = EISCONN; + return -1; + } + client = kcp_client_dial_with_options( + server_addr, + relay_via, + peer_id, + bind_ip, + bind_device, + options, + NULL, + NULL, + NULL, + stats_interval_ms + ); + if (client == NULL) { + pthread_mutex_unlock(&session->mutex); + return -1; + } + session->client = client; + session->stats.connected = 1; + pthread_mutex_unlock(&session->mutex); + return 0; +} + +int omnisocket_session_close(omnisocket_session_t *session) { + kcp_client_t *client; + + if (session == NULL) { + errno = EINVAL; + return -1; + } + + pthread_mutex_lock(&session->mutex); + while (session->closing) { + pthread_cond_wait(&session->idle_cond, &session->mutex); + } + client = session->client; + if (client != NULL) { + session->closing = 1; + session->client = NULL; + } + session->stats.connected = 0; + pthread_mutex_unlock(&session->mutex); + + if (client != NULL) { + kcp_client_close(client); + pthread_mutex_lock(&session->mutex); + while (session->active_ops > 0) { + pthread_cond_wait(&session->idle_cond, &session->mutex); + } + pthread_mutex_unlock(&session->mutex); + kcp_client_free(client); + pthread_mutex_lock(&session->mutex); + session->closing = 0; + pthread_cond_broadcast(&session->idle_cond); + pthread_mutex_unlock(&session->mutex); + } + return 0; +} + +int omnisocket_session_send(omnisocket_session_t *session, const char *to, const void *data, size_t data_len) { + kcp_client_t *client; + int rc; + + if (session == NULL || to == NULL || (data == NULL && data_len > 0)) { + errno = EINVAL; + return -1; + } + + if (omnisocket_session_begin_client_op(session, &client) != 0) { + return -1; + } + rc = kcp_client_send_binary(client, to, data, data_len); + pthread_mutex_lock(&session->mutex); + if (rc == 0) { + session->stats.send_calls += 1; + session->stats.send_bytes += (uint64_t) data_len; + } else { + session->stats.send_errors += 1; + } + if (session->active_ops > 0) { + session->active_ops -= 1; + } + if (session->closing && session->active_ops == 0) { + pthread_cond_broadcast(&session->idle_cond); + } + pthread_mutex_unlock(&session->mutex); + return rc; +} + +int omnisocket_session_recv(omnisocket_session_t *session, message_t *out_msg, int timeout_ms) { + kcp_client_t *client; + int rc; + + if (session == NULL || out_msg == NULL) { + errno = EINVAL; + return -1; + } + + if (omnisocket_session_begin_client_op(session, &client) != 0) { + return -1; + } + rc = kcp_client_receive_timed(client, out_msg, timeout_ms); + pthread_mutex_lock(&session->mutex); + if (rc == 0) { + session->stats.recv_calls += 1; + session->stats.recv_bytes += (uint64_t) out_msg->body_len; + } else if (rc == 1) { + session->stats.recv_timeouts += 1; + } else { + session->stats.recv_errors += 1; + } + if (session->active_ops > 0) { + session->active_ops -= 1; + } + if (session->closing && session->active_ops == 0) { + pthread_cond_broadcast(&session->idle_cond); + } + pthread_mutex_unlock(&session->mutex); + return rc; +} + +int omnisocket_session_recv_into( + omnisocket_session_t *session, + void *buffer, + size_t buffer_len, + kcp_client_recv_meta_t *out_meta, + int timeout_ms +) { + kcp_client_t *client; + int rc; + + if (session == NULL || out_meta == NULL || (buffer == NULL && buffer_len > 0)) { + errno = EINVAL; + return -1; + } + + if (omnisocket_session_begin_client_op(session, &client) != 0) { + return -1; + } + rc = kcp_client_receive_binary_into(client, buffer, buffer_len, out_meta, timeout_ms); + pthread_mutex_lock(&session->mutex); + if (rc == 0) { + session->stats.recv_calls += 1; + session->stats.recv_bytes += (uint64_t) out_meta->body_len; + } else if (rc == 1) { + session->stats.recv_timeouts += 1; + } else { + session->stats.recv_errors += 1; + } + if (session->active_ops > 0) { + session->active_ops -= 1; + } + if (session->closing && session->active_ops == 0) { + pthread_cond_broadcast(&session->idle_cond); + } + pthread_mutex_unlock(&session->mutex); + return rc; +} + +void omnisocket_session_stats_snapshot(omnisocket_session_t *session, omnisocket_session_stats_t *out_stats) { + if (session == NULL || out_stats == NULL) { + return; + } + pthread_mutex_lock(&session->mutex); + *out_stats = session->stats; + pthread_mutex_unlock(&session->mutex); +} diff --git a/python/omnisocket/omnisocket_client.h b/python/omnisocket/omnisocket_client.h new file mode 100644 index 0000000..be9e4b2 --- /dev/null +++ b/python/omnisocket/omnisocket_client.h @@ -0,0 +1,51 @@ +#ifndef OMNISOCKET_PY_CLIENT_H +#define OMNISOCKET_PY_CLIENT_H + +#include "peer_kcp_client.h" + +typedef struct omnisocket_session_stats { + uint64_t send_calls; + uint64_t send_bytes; + uint64_t send_errors; + uint64_t recv_calls; + uint64_t recv_bytes; + uint64_t recv_timeouts; + uint64_t recv_errors; + int connected; +} omnisocket_session_stats_t; + +typedef struct omnisocket_session { + pthread_mutex_t mutex; + pthread_cond_t idle_cond; + kcp_client_t *client; + size_t active_ops; + int closing; + omnisocket_session_stats_t stats; +} omnisocket_session_t; + +int omnisocket_session_init(omnisocket_session_t *session); +void omnisocket_session_destroy(omnisocket_session_t *session); + +int omnisocket_session_connect( + omnisocket_session_t *session, + const char *server_addr, + const char *relay_via, + const char *peer_id, + const char *bind_ip, + const char *bind_device, + const kcp_conn_options_t *options, + int stats_interval_ms +); +int omnisocket_session_close(omnisocket_session_t *session); +int omnisocket_session_send(omnisocket_session_t *session, const char *to, const void *data, size_t data_len); +int omnisocket_session_recv(omnisocket_session_t *session, message_t *out_msg, int timeout_ms); +int omnisocket_session_recv_into( + omnisocket_session_t *session, + void *buffer, + size_t buffer_len, + kcp_client_recv_meta_t *out_meta, + int timeout_ms +); +void omnisocket_session_stats_snapshot(omnisocket_session_t *session, omnisocket_session_stats_t *out_stats); + +#endif diff --git a/python/setup.py b/python/setup.py new file mode 100644 index 0000000..f302f32 --- /dev/null +++ b/python/setup.py @@ -0,0 +1,58 @@ +from pathlib import Path +import sys + +from setuptools import Extension, setup + + +ROOT = Path(__file__).resolve().parent.parent +PY_ROOT = Path(__file__).resolve().parent + +if sys.platform != "linux": + raise RuntimeError("omnisocket Python extension can only be built on Linux") + + +COMMON_SOURCES = [ + ROOT / "src" / "omni_common.c", + ROOT / "src" / "protocol.c", + ROOT / "src" / "latencylog.c", + ROOT / "src" / "tx_timestamp_debug.c", + ROOT / "src" / "kcp_packet_debug.c", + ROOT / "src" / "kcp_session_stats.c", + ROOT / "src" / "linux_timestamping.c", + ROOT / "src" / "interactive.c", + ROOT / "src" / "transport_udp.c", + ROOT / "src" / "transport_kcp.c", + ROOT / "src" / "server_udp_relay.c", + ROOT / "src" / "server_udp_hub.c", + ROOT / "src" / "server_kcp_hub.c", + ROOT / "src" / "peer_udp_client.c", + ROOT / "src" / "peer_kcp_client.c", + ROOT / "third_party" / "cjson" / "cJSON.c", + ROOT / "third_party" / "kcp" / "ikcp.c", +] + + +setup( + name="omnisocket", + version="0.1.0", + packages=["omnisocket"], + ext_modules=[ + Extension( + "omnisocket._omnisocket", + sources=[ + str(PY_ROOT / "omnisocket" / "_omnisocket.c"), + str(PY_ROOT / "omnisocket" / "omnisocket_client.c"), + *[str(path) for path in COMMON_SOURCES], + ], + include_dirs=[ + str(ROOT / "include"), + str(ROOT / "third_party" / "cjson"), + str(ROOT / "third_party" / "kcp"), + str(PY_ROOT / "omnisocket"), + ], + define_macros=[("_GNU_SOURCE", None)], + extra_compile_args=["-std=c11", "-O2", "-pthread"], + extra_link_args=["-pthread"], + ) + ], +) diff --git a/scripts/kcp_control_benchmark.py b/scripts/kcp_control_benchmark.py new file mode 100644 index 0000000..90c5b23 --- /dev/null +++ b/scripts/kcp_control_benchmark.py @@ -0,0 +1,76 @@ +"""Send high-rate control packets to benchmark the KCP control session.""" + +from __future__ import annotations + +import argparse +from pathlib import Path +import sys +import time + +ROOT = Path(__file__).resolve().parents[1] +sys.path.insert(0, str(ROOT)) + +import yaml + +from omnisocket_control import make_control_packet + +try: + from omnisocket import CONTROL_DEFAULTS, Session +except ImportError: + sys.path.insert(0, str(ROOT / "python")) + from omnisocket import CONTROL_DEFAULTS, Session + + +def load_config() -> dict: + config_path = ROOT / "config" / "omnisocket_demo.yaml" + if not config_path.exists(): + return {} + with config_path.open("r", encoding="utf-8") as file: + return yaml.safe_load(file) or {} + + +def main() -> None: + parser = argparse.ArgumentParser() + parser.add_argument("--rate", type=float, default=200.0, help="send rate in Hz") + parser.add_argument("--count", type=int, default=1000, help="packets to send") + args = parser.parse_args() + + config = load_config() + transport_cfg = config.get("transport", {}) + sender_cfg = config.get("control_sender", {}) + + session = Session() + session.connect( + server_addr=str(transport_cfg.get("server_addr", "127.0.0.1:10909")), + peer_id=str(sender_cfg.get("peer_id", "peer-a-ctrl")), + relay_via=str(transport_cfg.get("relay_via", "")), + bind_ip=str(transport_cfg.get("bind_ip", "")), + bind_device=str(transport_cfg.get("bind_device", "")), + **CONTROL_DEFAULTS, + ) + + target_peer = str(sender_cfg.get("target_peer", "peer-b-ctrl")) + spacing = 1.0 / args.rate if args.rate > 0 else 0.0 + start = time.perf_counter() + + try: + for seq_id in range(args.count): + packet = make_control_packet(seq_id, "set_surge", drive_value=0.25) + session.send(to=target_peer, data=packet.encode()) + if spacing > 0: + target = start + (seq_id + 1) * spacing + remaining = target - time.perf_counter() + if remaining > 0: + time.sleep(remaining) + finally: + elapsed = time.perf_counter() - start + print( + f"sent {args.count} control packets in {elapsed:.3f}s " + f"({(args.count / elapsed) if elapsed > 0 else 0.0:.1f} pkt/s)" + ) + print(f"stats={session.stats()}") + session.close() + + +if __name__ == "__main__": + main() diff --git a/src/latencylog.c b/src/latencylog.c index 147e0ec..1c6ae7c 100644 --- a/src/latencylog.c +++ b/src/latencylog.c @@ -113,7 +113,7 @@ int latencylog_is_business_message(const message_t *msg) { if (msg == NULL) { return 0; } - return msg->type == MSG_TYPE_TEXT || msg->type == MSG_TYPE_FILE; + return msg->type == MSG_TYPE_TEXT || msg->type == MSG_TYPE_FILE || msg->type == MSG_TYPE_BINARY; } void latencylog_log_message_event(latency_logger_t *logger, const char *node_role, const char *node_id, const char *event_name, const message_t *msg) { diff --git a/src/peer_kcp_client.c b/src/peer_kcp_client.c index b3b21d3..ec969c0 100644 --- a/src/peer_kcp_client.c +++ b/src/peer_kcp_client.c @@ -66,6 +66,11 @@ static int kcp_client_persist_message_to_disk(const message_t *msg, const char * if (omni_write_file(path, msg->body, msg->body_len) != 0) { return -1; } + } else if (msg->type == MSG_TYPE_BINARY) { + snprintf(path, sizeof(path), "%s/%s-%" PRIu64 ".bin", inbox_dir, msg->from, msg->id); + if (omni_write_file(path, msg->body, msg->body_len) != 0) { + return -1; + } } else { errno = EINVAL; return -1; @@ -77,7 +82,20 @@ static int kcp_client_persist_message_to_disk(const message_t *msg, const char * return 0; } -kcp_client_t *kcp_client_dial(const char *server_addr, const char *dial_addr, const char *peer_id, const char *bind_ip, const char *bind_device, latency_logger_t *logger, kcp_packet_debug_logger_t *packet_logger, kcp_session_stats_logger_t *stats_logger, int stats_interval_ms) { +static void kcp_client_fill_recv_meta(kcp_client_recv_meta_t *meta, const message_t *msg) { + if (meta == NULL || msg == NULL) { + return; + } + memset(meta, 0, sizeof(*meta)); + meta->type = msg->type; + meta->id = msg->id; + meta->body_len = msg->body_len; + snprintf(meta->from, sizeof(meta->from), "%s", msg->from); + snprintf(meta->to, sizeof(meta->to), "%s", msg->to); + snprintf(meta->file_name, sizeof(meta->file_name), "%s", msg->file_name); +} + +kcp_client_t *kcp_client_dial_with_options(const char *server_addr, const char *dial_addr, const char *peer_id, const char *bind_ip, const char *bind_device, const kcp_conn_options_t *options, latency_logger_t *logger, kcp_packet_debug_logger_t *packet_logger, kcp_session_stats_logger_t *stats_logger, int stats_interval_ms) { kcp_client_t *client; const char *actual_dial_addr = (dial_addr != NULL && dial_addr[0] != '\0') ? dial_addr : server_addr; message_t register_msg; @@ -91,7 +109,7 @@ kcp_client_t *kcp_client_dial(const char *server_addr, const char *dial_addr, co snprintf(client->server_addr, sizeof(client->server_addr), "%s", server_addr == NULL ? "" : server_addr); pthread_mutex_init(&client->id_mu, NULL); client->logger = logger; - client->conn = kcp_conn_dial(actual_dial_addr, bind_ip, bind_device, packet_logger, logger, OMNI_NODE_ROLE_PEER, peer_id, stats_logger, stats_interval_ms); + client->conn = kcp_conn_dial_with_options(actual_dial_addr, bind_ip, bind_device, options, packet_logger, logger, OMNI_NODE_ROLE_PEER, peer_id, stats_logger, stats_interval_ms); if (client->conn == NULL) { saved_errno = errno; kcp_client_free(client); @@ -113,6 +131,10 @@ kcp_client_t *kcp_client_dial(const char *server_addr, const char *dial_addr, co return client; } +kcp_client_t *kcp_client_dial(const char *server_addr, const char *dial_addr, const char *peer_id, const char *bind_ip, const char *bind_device, latency_logger_t *logger, kcp_packet_debug_logger_t *packet_logger, kcp_session_stats_logger_t *stats_logger, int stats_interval_ms) { + return kcp_client_dial_with_options(server_addr, dial_addr, peer_id, bind_ip, bind_device, NULL, logger, packet_logger, stats_logger, stats_interval_ms); +} + const char *kcp_client_id(const kcp_client_t *client) { return client == NULL ? "" : client->id; } @@ -121,6 +143,10 @@ int kcp_client_send_text(kcp_client_t *client, const char *to, const char *text) message_t msg; uint64_t id; + if (client == NULL || to == NULL || text == NULL) { + errno = EINVAL; + return -1; + } protocol_message_init(&msg); kcp_client_next_message_id(client, &id); msg.type = MSG_TYPE_TEXT; @@ -141,6 +167,37 @@ int kcp_client_send_text(kcp_client_t *client, const char *to, const char *text) return 0; } +int kcp_client_send_binary(kcp_client_t *client, const char *to, const void *data, size_t data_len) { + message_t msg; + uint64_t id; + + if (client == NULL || to == NULL || (data == NULL && data_len > 0)) { + errno = EINVAL; + return -1; + } + protocol_message_init(&msg); + kcp_client_next_message_id(client, &id); + msg.type = MSG_TYPE_BINARY; + msg.id = id; + snprintf(msg.from, sizeof(msg.from), "%s", client->id); + snprintf(msg.to, sizeof(msg.to), "%s", to); + if (data_len > 0) { + msg.body = (uint8_t *) malloc(data_len); + if (msg.body == NULL) { + return -1; + } + memcpy(msg.body, data, data_len); + } + msg.body_len = data_len; + latencylog_log_message_event(client->logger, OMNI_NODE_ROLE_PEER, client->id, EVENT_A_APP_PREP_BEGIN, &msg); + if (kcp_conn_send(client->conn, &msg) != 0) { + protocol_message_clear(&msg); + return -1; + } + protocol_message_clear(&msg); + return 0; +} + int kcp_client_send_file_path(kcp_client_t *client, const char *to, const char *path) { message_t msg; uint64_t id; @@ -148,6 +205,10 @@ int kcp_client_send_file_path(kcp_client_t *client, const char *to, const char * size_t body_len = 0; const char *base_name = strrchr(path, '/'); + if (client == NULL || to == NULL || path == NULL) { + errno = EINVAL; + return -1; + } if (omni_read_file(path, &body, &body_len) != 0) { return -1; } @@ -169,14 +230,57 @@ int kcp_client_send_file_path(kcp_client_t *client, const char *to, const char * return 0; } -int kcp_client_receive(kcp_client_t *client, message_t *out_msg) { - if (kcp_conn_receive(client->conn, out_msg) != 0) { +int kcp_client_receive_timed(kcp_client_t *client, message_t *out_msg, int timeout_ms) { + int rc; + + if (client == NULL || out_msg == NULL) { + errno = EINVAL; return -1; } + rc = kcp_conn_receive_timed(client->conn, out_msg, timeout_ms); + if (rc != 0) { + return rc; + } latencylog_log_message_event(client->logger, OMNI_NODE_ROLE_PEER, client->id, EVENT_B_APP_RECV, out_msg); return 0; } +int kcp_client_receive(kcp_client_t *client, message_t *out_msg) { + if (kcp_client_receive_timed(client, out_msg, -1) != 0) { + return -1; + } + return 0; +} + +int kcp_client_receive_binary_into(kcp_client_t *client, void *buffer, size_t buffer_len, kcp_client_recv_meta_t *out_meta, int timeout_ms) { + message_t msg; + int rc; + + if (client == NULL || (buffer == NULL && buffer_len > 0) || out_meta == NULL) { + errno = EINVAL; + return -1; + } + + protocol_message_init(&msg); + rc = kcp_client_receive_timed(client, &msg, timeout_ms); + if (rc != 0) { + return rc; + } + + kcp_client_fill_recv_meta(out_meta, &msg); + if (msg.body_len > buffer_len) { + protocol_message_clear(&msg); + errno = EMSGSIZE; + return 2; + } + + if (msg.body_len > 0) { + memcpy(buffer, msg.body, msg.body_len); + } + protocol_message_clear(&msg); + return 0; +} + int kcp_client_persist_message(kcp_client_t *client, const message_t *msg, const char *inbox_dir, char *out_path, size_t out_path_len) { if (!latencylog_is_business_message(msg)) { errno = EINVAL; diff --git a/src/peer_udp_client.c b/src/peer_udp_client.c index 0a5deae..4fc3bb5 100644 --- a/src/peer_udp_client.c +++ b/src/peer_udp_client.c @@ -55,6 +55,11 @@ static int client_persist_message_to_disk(const message_t *msg, const char *inbo if (omni_write_file(path, msg->body, msg->body_len) != 0) { return -1; } + } else if (msg->type == MSG_TYPE_BINARY) { + snprintf(path, sizeof(path), "%s/%s-%" PRIu64 ".bin", inbox_dir, msg->from, msg->id); + if (omni_write_file(path, msg->body, msg->body_len) != 0) { + return -1; + } } else { errno = EINVAL; return -1; diff --git a/src/protocol.c b/src/protocol.c index fea7d96..d82d045 100644 --- a/src/protocol.c +++ b/src/protocol.c @@ -8,11 +8,12 @@ static const char *protocol_message_type_table[] = { "text", "file", "register", - "error" + "error", + "binary" }; const char *protocol_message_type_name(message_type_t type) { - if ((int) type < 0 || type >= MSG_TYPE_INVALID) { + if ((int) type < 0 || (size_t) type >= OMNI_ARRAY_LEN(protocol_message_type_table)) { return "invalid"; } return protocol_message_type_table[type]; @@ -102,6 +103,11 @@ int protocol_validate_message(const message_t *msg, char *err, size_t err_len) { return protocol_set_err(err, err_len, "protocol: missing file name"); } break; + case MSG_TYPE_BINARY: + if (msg->file_name[0] != '\0') { + return protocol_set_err(err, err_len, "protocol: unexpected file name"); + } + break; case MSG_TYPE_REGISTER: if (strcmp(msg->to, SERVER_PEER_ID) != 0) { return protocol_set_err(err, err_len, "protocol: invalid register target"); diff --git a/src/server_kcp_hub.c b/src/server_kcp_hub.c index f6c65d5..8ffbbf4 100644 --- a/src/server_kcp_hub.c +++ b/src/server_kcp_hub.c @@ -63,6 +63,36 @@ static kcp_peer_entry_t *kcp_hub_find_peer(kcp_hub_t *hub, const char *peer_id) return NULL; } +static int kcp_hub_peer_id_has_suffix(const char *peer_id, const char *suffix) { + size_t peer_len; + size_t suffix_len; + + if (peer_id == NULL || suffix == NULL) { + return 0; + } + peer_len = strlen(peer_id); + suffix_len = strlen(suffix); + return peer_len >= suffix_len && strcmp(peer_id + peer_len - suffix_len, suffix) == 0; +} + +static int kcp_hub_configure_peer_transport(kcp_conn_t *conn, const char *peer_id) { + kcp_conn_options_t options; + + if (conn == NULL || peer_id == NULL) { + errno = EINVAL; + return -1; + } + if (kcp_hub_peer_id_has_suffix(peer_id, "-ctrl")) { + kcp_conn_options_set_control_defaults(&options); + return kcp_conn_apply_options(conn, &options); + } + if (kcp_hub_peer_id_has_suffix(peer_id, "-video")) { + kcp_conn_options_set_video_defaults(&options); + return kcp_conn_apply_options(conn, &options); + } + return 0; +} + static int kcp_hub_send_server_error(kcp_conn_t *conn, const char *to, const char *message) { message_t msg; protocol_message_init(&msg); @@ -255,6 +285,7 @@ static int kcp_hub_handle_peer_message(kcp_hub_t *hub, const char *peer_id, kcp_ switch (msg->type) { case MSG_TYPE_TEXT: case MSG_TYPE_FILE: + case MSG_TYPE_BINARY: snprintf(msg->from, sizeof(msg->from), "%s", peer_id); if (kcp_hub_deliver_to_local_peer(hub, msg) == 0) { return 0; @@ -294,7 +325,7 @@ static int kcp_hub_handle_peer_message(kcp_hub_t *hub, const char *peer_id, kcp_ return 0; case MSG_TYPE_REGISTER: case MSG_TYPE_ERROR: - if (kcp_hub_send_server_error(conn, peer_id, "registered peers can only send text or file messages") != 0) { + if (kcp_hub_send_server_error(conn, peer_id, "registered peers can only send text, file, or binary messages") != 0) { return -1; } errno = EPROTO; @@ -358,6 +389,11 @@ static int kcp_hub_register_conn(kcp_hub_t *hub, kcp_conn_t *conn, char *peer_id pthread_rwlock_unlock(&hub->lock); snprintf(peer_id, peer_id_len, "%s", msg.from); + if (kcp_hub_configure_peer_transport(conn, peer_id) != 0) { + kcp_hub_unregister(hub, peer_id, conn); + protocol_message_clear(&msg); + return -1; + } protocol_message_clear(&msg); return 0; } @@ -518,7 +554,7 @@ int kcp_hub_serve_relay(kcp_hub_t *hub) { protocol_message_clear(&msg); continue; } - if (msg.type != MSG_TYPE_TEXT && msg.type != MSG_TYPE_FILE && msg.type != MSG_TYPE_ERROR) { + if (msg.type != MSG_TYPE_TEXT && msg.type != MSG_TYPE_FILE && msg.type != MSG_TYPE_BINARY && msg.type != MSG_TYPE_ERROR) { protocol_message_clear(&msg); continue; } diff --git a/src/server_udp_hub.c b/src/server_udp_hub.c index 885d12a..faf67ed 100644 --- a/src/server_udp_hub.c +++ b/src/server_udp_hub.c @@ -112,7 +112,7 @@ int udp_hub_serve(udp_hub_t *hub) { pthread_rwlock_unlock(&hub->lock); continue; } - if (msg.type != MSG_TYPE_TEXT && msg.type != MSG_TYPE_FILE) { + if (msg.type != MSG_TYPE_TEXT && msg.type != MSG_TYPE_FILE && msg.type != MSG_TYPE_BINARY) { if (msg.type == MSG_TYPE_ERROR) { udp_hub_send_error(hub, &addr, addr_len, msg.from, "peers cannot send error messages"); } else { diff --git a/src/transport_kcp.c b/src/transport_kcp.c index c1e2aab..30778ac 100644 --- a/src/transport_kcp.c +++ b/src/transport_kcp.c @@ -60,6 +60,8 @@ struct kcp_conn { int update_thread_started; pthread_t stats_thread; int stats_thread_started; + kcp_conn_options_t options; + int update_interval_ms; uint64_t pending_bytes_sent; uint64_t pending_bytes_received; uint64_t pending_in_pkts; @@ -141,6 +143,82 @@ struct kcp_process_sampler { static pthread_mutex_t g_kcp_process_sampler_mu = PTHREAD_MUTEX_INITIALIZER; static kcp_process_sampler_t *g_kcp_process_samplers = NULL; +void kcp_conn_options_init(kcp_conn_options_t *options) { + if (options == NULL) { + return; + } + memset(options, 0, sizeof(*options)); + options->nodelay = KCP_DEFAULT_NODELAY; + options->interval_ms = KCP_DEFAULT_INTERVAL_MS; + options->resend = KCP_DEFAULT_RESEND; + options->nc = KCP_DEFAULT_NC; + options->sndwnd = KCP_DEFAULT_SND_WND; + options->rcvwnd = KCP_DEFAULT_RCV_WND; + options->mtu = KCP_DEFAULT_MTU; +} + +void kcp_conn_options_set_control_defaults(kcp_conn_options_t *options) { + if (options == NULL) { + return; + } + memset(options, 0, sizeof(*options)); + options->nodelay = KCP_CONTROL_NODELAY; + options->interval_ms = KCP_CONTROL_INTERVAL_MS; + options->resend = KCP_CONTROL_RESEND; + options->nc = KCP_CONTROL_NC; + options->sndwnd = KCP_CONTROL_SND_WND; + options->rcvwnd = KCP_CONTROL_RCV_WND; + options->mtu = KCP_CONTROL_MTU; +} + +void kcp_conn_options_set_video_defaults(kcp_conn_options_t *options) { + if (options == NULL) { + return; + } + memset(options, 0, sizeof(*options)); + options->nodelay = KCP_VIDEO_NODELAY; + options->interval_ms = KCP_VIDEO_INTERVAL_MS; + options->resend = KCP_VIDEO_RESEND; + options->nc = KCP_VIDEO_NC; + options->sndwnd = KCP_VIDEO_SND_WND; + options->rcvwnd = KCP_VIDEO_RCV_WND; + options->mtu = KCP_VIDEO_MTU; +} + +static int kcp_conn_validate_options(const kcp_conn_options_t *options) { + if (options == NULL) { + errno = EINVAL; + return -1; + } + if (options->interval_ms <= 0 || options->sndwnd <= 0 || options->rcvwnd <= 0 || options->mtu <= 0) { + errno = EINVAL; + return -1; + } + return 0; +} + +static int kcp_conn_apply_options_locked(kcp_conn_t *conn, const kcp_conn_options_t *options) { + if (conn == NULL || conn->kcp == NULL || kcp_conn_validate_options(options) != 0) { + return -1; + } + if (ikcp_wndsize(conn->kcp, options->sndwnd, options->rcvwnd) != 0) { + errno = EINVAL; + return -1; + } + if (ikcp_setmtu(conn->kcp, options->mtu) != 0) { + errno = EINVAL; + return -1; + } + if (ikcp_nodelay(conn->kcp, options->nodelay, options->interval_ms, options->resend, options->nc) != 0) { + errno = EINVAL; + return -1; + } + conn->kcp->stream = 1; + conn->options = *options; + conn->update_interval_ms = options->interval_ms; + return 0; +} + static void kcp_parse_packet_segments(const uint8_t *packet, size_t len, uint32_t *conv, kcp_packet_debug_segment_t **segments, size_t *segment_count) { size_t offset = 0; size_t count = 0; @@ -1107,10 +1185,12 @@ static void *kcp_client_recv_thread_main(void *arg) { static void *kcp_update_thread_main(void *arg) { kcp_conn_t *conn = (kcp_conn_t *) arg; while (!atomic_load(&conn->closed)) { + int interval_ms; pthread_mutex_lock(&conn->kcp_mu); ikcp_update(conn->kcp, omni_now_millis32()); + interval_ms = conn->update_interval_ms > 0 ? conn->update_interval_ms : KCP_DEFAULT_INTERVAL_MS; pthread_mutex_unlock(&conn->kcp_mu); - usleep(10000); + usleep((useconds_t) interval_ms * 1000U); } return NULL; } @@ -1129,10 +1209,11 @@ static int kcp_conn_start_stats_thread(kcp_conn_t *conn) { return 0; } -static kcp_conn_t *kcp_conn_alloc_common(int fd, const struct sockaddr_storage *remote_addr, socklen_t remote_addr_len, kcp_socket_debug_state_t *sock_state, latency_logger_t *logger, const char *node_role, const char *node_id, kcp_session_stats_logger_t *stats_logger, int stats_interval_ms) { +static kcp_conn_t *kcp_conn_alloc_common(int fd, const struct sockaddr_storage *remote_addr, socklen_t remote_addr_len, const kcp_conn_options_t *options, kcp_socket_debug_state_t *sock_state, latency_logger_t *logger, const char *node_role, const char *node_id, kcp_session_stats_logger_t *stats_logger, int stats_interval_ms) { kcp_conn_t *conn = (kcp_conn_t *) calloc(1, sizeof(*conn)); uint32_t conv; int thread_rc; + kcp_conn_options_t effective_options; if (conn == NULL) { errno = ENOMEM; @@ -1150,6 +1231,12 @@ static kcp_conn_t *kcp_conn_alloc_common(int fd, const struct sockaddr_storage * snprintf(conn->node_id, sizeof(conn->node_id), "%s", node_id == NULL ? "" : node_id); conn->stats_logger = stats_logger; conn->stats_interval_ms = stats_interval_ms > 0 ? stats_interval_ms : KCP_DEFAULT_STATS_INTERVAL_MS; + kcp_conn_options_init(&effective_options); + if (options != NULL) { + effective_options = *options; + } + conn->options = effective_options; + conn->update_interval_ms = effective_options.interval_ms; conn->sock_state = sock_state; if (omni_random_u32(&conv) != 0) { protocol_frame_decoder_destroy(&conn->decoder); @@ -1170,10 +1257,15 @@ static kcp_conn_t *kcp_conn_alloc_common(int fd, const struct sockaddr_storage * return NULL; } ikcp_setoutput(conn->kcp, kcp_output_callback_impl); - ikcp_wndsize(conn->kcp, KCP_WND_SIZE, KCP_WND_SIZE); - ikcp_setmtu(conn->kcp, KCP_MTU); - ikcp_nodelay(conn->kcp, KCP_NODELAY, KCP_INTERVAL, KCP_RESEND, KCP_NC); - conn->kcp->stream = 1; + if (kcp_conn_apply_options_locked(conn, &effective_options) != 0) { + ikcp_release(conn->kcp); + protocol_frame_decoder_destroy(&conn->decoder); + pthread_cond_destroy(&conn->rx_cond); + pthread_mutex_destroy(&conn->kcp_mu); + pthread_mutex_destroy(&conn->close_mu); + free(conn); + return NULL; + } if (kcp_conn_attach_process_sampler(conn) != 0) { ikcp_release(conn->kcp); protocol_frame_decoder_destroy(&conn->decoder); @@ -1213,7 +1305,7 @@ static kcp_conn_t *kcp_conn_alloc_common(int fd, const struct sockaddr_storage * return conn; } -kcp_conn_t *kcp_conn_dial(const char *server_addr, const char *bind_ip, const char *bind_device, kcp_packet_debug_logger_t *packet_logger, latency_logger_t *logger, const char *node_role, const char *node_id, kcp_session_stats_logger_t *stats_logger, int stats_interval_ms) { +kcp_conn_t *kcp_conn_dial_with_options(const char *server_addr, const char *bind_ip, const char *bind_device, const kcp_conn_options_t *options, kcp_packet_debug_logger_t *packet_logger, latency_logger_t *logger, const char *node_role, const char *node_id, kcp_session_stats_logger_t *stats_logger, int stats_interval_ms) { struct sockaddr_storage remote_addr; socklen_t remote_len; int family; @@ -1236,7 +1328,7 @@ kcp_conn_t *kcp_conn_dial(const char *server_addr, const char *bind_ip, const ch close(fd); return NULL; } - conn = kcp_conn_alloc_common(fd, &remote_addr, remote_len, sock_state, logger, node_role, node_id, stats_logger, stats_interval_ms); + conn = kcp_conn_alloc_common(fd, &remote_addr, remote_len, options, sock_state, logger, node_role, node_id, stats_logger, stats_interval_ms); if (conn == NULL) { kcp_socket_debug_destroy(sock_state); free(sock_state); @@ -1255,6 +1347,10 @@ kcp_conn_t *kcp_conn_dial(const char *server_addr, const char *bind_ip, const ch return conn; } +kcp_conn_t *kcp_conn_dial(const char *server_addr, const char *bind_ip, const char *bind_device, kcp_packet_debug_logger_t *packet_logger, latency_logger_t *logger, const char *node_role, const char *node_id, kcp_session_stats_logger_t *stats_logger, int stats_interval_ms) { + return kcp_conn_dial_with_options(server_addr, bind_ip, bind_device, NULL, packet_logger, logger, node_role, node_id, stats_logger, stats_interval_ms); +} + static void kcp_listener_enqueue_accept(kcp_listener_t *listener, kcp_conn_t *conn) { pthread_mutex_lock(&listener->accept_mu); if (listener->accept_tail == NULL) { @@ -1381,6 +1477,7 @@ static void *kcp_listener_recv_thread_main(void *arg) { if (conn == NULL) { conn = (kcp_conn_t *) calloc(1, sizeof(*conn)); if (conn != NULL) { + kcp_conn_options_t accepted_options; conn->fd = listener->fd; memcpy(&conn->remote_addr, &source, sizeof(source)); conn->remote_addr_len = msg.msg_namelen; @@ -1393,15 +1490,15 @@ static void *kcp_listener_recv_thread_main(void *arg) { conn->stats_interval_ms = KCP_DEFAULT_STATS_INTERVAL_MS; conn->sock_state = &listener->sock_state; conn->listener = listener; + kcp_conn_options_init(&accepted_options); + conn->options = accepted_options; + conn->update_interval_ms = accepted_options.interval_ms; conn->kcp = ikcp_create(conv, conn); if (conn->kcp != NULL) { int update_started = 0; ikcp_setoutput(conn->kcp, kcp_output_callback_impl); - ikcp_wndsize(conn->kcp, KCP_WND_SIZE, KCP_WND_SIZE); - ikcp_setmtu(conn->kcp, KCP_MTU); - ikcp_nodelay(conn->kcp, KCP_NODELAY, KCP_INTERVAL, KCP_RESEND, KCP_NC); - conn->kcp->stream = 1; - if (pthread_create(&conn->update_thread, NULL, kcp_update_thread_main, conn) == 0) { + if (kcp_conn_apply_options_locked(conn, &accepted_options) == 0 && + pthread_create(&conn->update_thread, NULL, kcp_update_thread_main, conn) == 0) { update_started = 1; } if (update_started && kcp_listener_add_session(listener, conv, conn) == 0) { @@ -1537,6 +1634,19 @@ int kcp_conn_configure_runtime(kcp_conn_t *conn, latency_logger_t *logger, const return 0; } +int kcp_conn_apply_options(kcp_conn_t *conn, const kcp_conn_options_t *options) { + int rc; + + if (conn == NULL || options == NULL) { + errno = EINVAL; + return -1; + } + pthread_mutex_lock(&conn->kcp_mu); + rc = kcp_conn_apply_options_locked(conn, options); + pthread_mutex_unlock(&conn->kcp_mu); + return rc; +} + int kcp_conn_send(kcp_conn_t *conn, const message_t *msg) { uint8_t *frame = NULL; size_t frame_len = 0; @@ -1577,15 +1687,31 @@ int kcp_conn_send(kcp_conn_t *conn, const message_t *msg) { return 0; } -int kcp_conn_receive(kcp_conn_t *conn, message_t *out_msg) { +static void kcp_timespec_deadline_after_ms(struct timespec *deadline, int timeout_ms) { + clock_gettime(CLOCK_REALTIME, deadline); + deadline->tv_sec += timeout_ms / 1000; + deadline->tv_nsec += (long) (timeout_ms % 1000) * 1000000L; + if (deadline->tv_nsec >= 1000000000L) { + deadline->tv_sec += 1; + deadline->tv_nsec -= 1000000000L; + } +} + +int kcp_conn_receive_timed(kcp_conn_t *conn, message_t *out_msg, int timeout_ms) { uint8_t *frame = NULL; size_t frame_len = 0; char err[128]; int next_rc; + struct timespec deadline; + int use_deadline = timeout_ms > 0; + if (conn == NULL || out_msg == NULL) { errno = EINVAL; return -1; } + if (use_deadline) { + kcp_timespec_deadline_after_ms(&deadline, timeout_ms); + } for (;;) { next_rc = protocol_frame_decoder_next(&conn->decoder, &frame, &frame_len); if (next_rc < 0) { @@ -1618,12 +1744,33 @@ int kcp_conn_receive(kcp_conn_t *conn, message_t *out_msg) { errno = ECANCELED; return -1; } - pthread_cond_wait(&conn->rx_cond, &conn->kcp_mu); + if (timeout_ms == 0) { + pthread_mutex_unlock(&conn->kcp_mu); + return 1; + } + if (timeout_ms < 0) { + pthread_cond_wait(&conn->rx_cond, &conn->kcp_mu); + } else { + int wait_rc = pthread_cond_timedwait(&conn->rx_cond, &conn->kcp_mu, &deadline); + if (wait_rc == ETIMEDOUT) { + pthread_mutex_unlock(&conn->kcp_mu); + return 1; + } + if (wait_rc != 0) { + pthread_mutex_unlock(&conn->kcp_mu); + errno = wait_rc; + return -1; + } + } } pthread_mutex_unlock(&conn->kcp_mu); } } +int kcp_conn_receive(kcp_conn_t *conn, message_t *out_msg) { + return kcp_conn_receive_timed(conn, out_msg, -1); +} + uint32_t kcp_conn_conv(const kcp_conn_t *conn) { return conn == NULL || conn->kcp == NULL ? 0 : conn->kcp->conv; }