diff --git a/include/peer_udp_client.h b/include/peer_udp_client.h
index fc8f64f..937e49e 100644
--- a/include/peer_udp_client.h
+++ b/include/peer_udp_client.h
@@ -8,12 +8,24 @@ extern "C" {
#endif
typedef struct udp_client udp_client_t;
+typedef struct udp_client_recv_meta {
+ message_type_t type;
+ uint64_t id;
+ char from[OMNI_MAX_PEER_ID];
+ char to[OMNI_MAX_PEER_ID];
+ char file_name[OMNI_MAX_FILE_NAME];
+ size_t body_len;
+} udp_client_recv_meta_t;
+udp_client_t *udp_client_dial_with_options(const char *server_addr, const char *peer_id, const char *bind_ip, const char *bind_device, latency_logger_t *logger, tx_timestamp_debug_logger_t *debug_logger, int enable_timestamping);
udp_client_t *udp_client_dial(const char *server_addr, const char *peer_id, const char *bind_ip, latency_logger_t *logger, tx_timestamp_debug_logger_t *debug_logger, int enable_timestamping);
const char *udp_client_id(const udp_client_t *client);
int udp_client_send_text(udp_client_t *client, const char *to, const char *text);
+int udp_client_send_binary(udp_client_t *client, const char *to, const void *data, size_t data_len);
int udp_client_send_file_path(udp_client_t *client, const char *to, const char *path);
+int udp_client_receive_timed(udp_client_t *client, message_t *out_msg, int timeout_ms);
int udp_client_receive(udp_client_t *client, message_t *out_msg);
+int udp_client_receive_into(udp_client_t *client, void *buffer, size_t buffer_len, udp_client_recv_meta_t *out_meta, int timeout_ms);
int udp_client_persist_message(udp_client_t *client, const message_t *msg, const char *inbox_dir, char *out_path, size_t out_path_len);
int udp_client_close(udp_client_t *client);
void udp_client_free(udp_client_t *client);
diff --git a/python/omnisocket/__init__.py b/python/omnisocket/__init__.py
index d9244cf..47fbdd5 100644
--- a/python/omnisocket/__init__.py
+++ b/python/omnisocket/__init__.py
@@ -6,6 +6,7 @@ try:
MSG_TYPE_REGISTER,
MSG_TYPE_TEXT,
Session,
+ UdpSession,
)
except ImportError as exc:
raise ImportError(
@@ -41,4 +42,5 @@ __all__ = [
"MSG_TYPE_REGISTER",
"MSG_TYPE_TEXT",
"Session",
+ "UdpSession",
]
diff --git a/python/omnisocket/_omnisocket.c b/python/omnisocket/_omnisocket.c
index 38d852f..5536688 100644
--- a/python/omnisocket/_omnisocket.c
+++ b/python/omnisocket/_omnisocket.c
@@ -8,6 +8,11 @@ typedef struct PyOmniSession {
omnisocket_session_t session;
} PyOmniSession;
+typedef struct PyOmniUdpSession {
+ PyObject_HEAD
+ omnisocket_udp_session_t session;
+} PyOmniUdpSession;
+
PyDoc_STRVAR(
PyOmniSession_recv_doc,
"recv(timeout_ms=-1) -> (from_peer, msg_type, payload) | None"
@@ -22,6 +27,66 @@ PyDoc_STRVAR(
"current frame has already been consumed and is lost."
);
+static PyObject *build_recv_result(const message_t *msg) {
+ PyObject *body = NULL;
+ PyObject *result = NULL;
+
+ body = PyBytes_FromStringAndSize((const char *) msg->body, (Py_ssize_t) msg->body_len);
+ if (body == NULL) {
+ return NULL;
+ }
+ result = Py_BuildValue("(siO)", msg->from, (int) msg->type, body);
+ Py_DECREF(body);
+ return result;
+}
+
+static PyObject *build_recv_meta_dict(
+ const char *from_peer,
+ const char *to_peer,
+ const char *file_name,
+ int msg_type,
+ unsigned long long message_id,
+ unsigned long long body_len
+) {
+ return Py_BuildValue(
+ "{s:s,s:s,s:s,s:i,s:K,s:K}",
+ "from",
+ from_peer,
+ "to",
+ to_peer,
+ "file_name",
+ file_name,
+ "msg_type",
+ msg_type,
+ "message_id",
+ message_id,
+ "body_len",
+ body_len
+ );
+}
+
+static PyObject *build_stats_dict(const omnisocket_session_stats_t *stats) {
+ return Py_BuildValue(
+ "{s:K,s:K,s:K,s:K,s:K,s:K,s:K,s:i}",
+ "send_calls",
+ (unsigned long long) stats->send_calls,
+ "send_bytes",
+ (unsigned long long) stats->send_bytes,
+ "send_errors",
+ (unsigned long long) stats->send_errors,
+ "recv_calls",
+ (unsigned long long) stats->recv_calls,
+ "recv_bytes",
+ (unsigned long long) stats->recv_bytes,
+ "recv_timeouts",
+ (unsigned long long) stats->recv_timeouts,
+ "recv_errors",
+ (unsigned long long) stats->recv_errors,
+ "connected",
+ stats->connected
+ );
+}
+
static PyObject *PyOmniSession_new(PyTypeObject *type, PyObject *args, PyObject *kwargs) {
PyOmniSession *self;
(void) args;
@@ -165,7 +230,6 @@ static PyObject *PyOmniSession_recv(PyOmniSession *self, PyObject *args, PyObjec
int timeout_ms = -1;
int rc;
message_t msg;
- PyObject *body = NULL;
PyObject *result = NULL;
static char *kwlist[] = {"timeout_ms", NULL};
@@ -187,13 +251,7 @@ static PyObject *PyOmniSession_recv(PyOmniSession *self, PyObject *args, PyObjec
return PyErr_SetFromErrno(PyExc_OSError);
}
- body = PyBytes_FromStringAndSize((const char *) msg.body, (Py_ssize_t) msg.body_len);
- if (body == NULL) {
- protocol_message_clear(&msg);
- return NULL;
- }
- result = Py_BuildValue("(siO)", msg.from, (int) msg.type, body);
- Py_DECREF(body);
+ result = build_recv_result(&msg);
protocol_message_clear(&msg);
return result;
}
@@ -237,19 +295,12 @@ static PyObject *PyOmniSession_recv_into(PyOmniSession *self, PyObject *args, Py
return PyErr_SetFromErrno(PyExc_OSError);
}
- result = Py_BuildValue(
- "{s:s,s:s,s:s,s:i,s:K,s:K}",
- "from",
+ result = build_recv_meta_dict(
meta.from,
- "to",
meta.to,
- "file_name",
meta.file_name,
- "msg_type",
(int) meta.type,
- "message_id",
(unsigned long long) meta.id,
- "body_len",
(unsigned long long) meta.body_len
);
return result;
@@ -260,25 +311,7 @@ static PyObject *PyOmniSession_stats(PyOmniSession *self, PyObject *Py_UNUSED(ig
memset(&stats, 0, sizeof(stats));
omnisocket_session_stats_snapshot(&self->session, &stats);
- return Py_BuildValue(
- "{s:K,s:K,s:K,s:K,s:K,s:K,s:K,s:i}",
- "send_calls",
- (unsigned long long) stats.send_calls,
- "send_bytes",
- (unsigned long long) stats.send_bytes,
- "send_errors",
- (unsigned long long) stats.send_errors,
- "recv_calls",
- (unsigned long long) stats.recv_calls,
- "recv_bytes",
- (unsigned long long) stats.recv_bytes,
- "recv_timeouts",
- (unsigned long long) stats.recv_timeouts,
- "recv_errors",
- (unsigned long long) stats.recv_errors,
- "connected",
- stats.connected
- );
+ return build_stats_dict(&stats);
}
static PyMethodDef PyOmniSession_methods[] = {
@@ -295,6 +328,211 @@ static PyTypeObject PyOmniSessionType = {
PyVarObject_HEAD_INIT(NULL, 0)
};
+static PyObject *PyOmniUdpSession_new(PyTypeObject *type, PyObject *args, PyObject *kwargs) {
+ PyOmniUdpSession *self;
+ (void) args;
+ (void) kwargs;
+
+ self = (PyOmniUdpSession *) type->tp_alloc(type, 0);
+ if (self == NULL) {
+ return NULL;
+ }
+ if (omnisocket_udp_session_init(&self->session) != 0) {
+ type->tp_free((PyObject *) self);
+ return PyErr_SetFromErrno(PyExc_OSError);
+ }
+ return (PyObject *) self;
+}
+
+static void PyOmniUdpSession_dealloc(PyOmniUdpSession *self) {
+ omnisocket_udp_session_destroy(&self->session);
+ Py_TYPE(self)->tp_free((PyObject *) self);
+}
+
+static PyObject *PyOmniUdpSession_connect(PyOmniUdpSession *self, PyObject *args, PyObject *kwargs) {
+ const char *server_addr;
+ const char *peer_id;
+ const char *bind_ip = "";
+ const char *bind_device = "";
+ int enable_timestamping = 0;
+ int rc;
+
+ static char *kwlist[] = {
+ "server_addr",
+ "peer_id",
+ "bind_ip",
+ "bind_device",
+ "enable_timestamping",
+ NULL
+ };
+
+ if (!PyArg_ParseTupleAndKeywords(
+ args,
+ kwargs,
+ "ss|ssi",
+ kwlist,
+ &server_addr,
+ &peer_id,
+ &bind_ip,
+ &bind_device,
+ &enable_timestamping)) {
+ return NULL;
+ }
+
+ Py_BEGIN_ALLOW_THREADS
+ rc = omnisocket_udp_session_connect(
+ &self->session,
+ server_addr,
+ peer_id,
+ bind_ip,
+ bind_device,
+ enable_timestamping
+ );
+ Py_END_ALLOW_THREADS
+
+ if (rc != 0) {
+ return PyErr_SetFromErrno(PyExc_OSError);
+ }
+ Py_RETURN_NONE;
+}
+
+static PyObject *PyOmniUdpSession_close(PyOmniUdpSession *self, PyObject *Py_UNUSED(ignored)) {
+ int rc;
+
+ Py_BEGIN_ALLOW_THREADS
+ rc = omnisocket_udp_session_close(&self->session);
+ Py_END_ALLOW_THREADS
+
+ if (rc != 0) {
+ return PyErr_SetFromErrno(PyExc_OSError);
+ }
+ Py_RETURN_NONE;
+}
+
+static PyObject *PyOmniUdpSession_send(PyOmniUdpSession *self, PyObject *args, PyObject *kwargs) {
+ const char *to;
+ Py_buffer payload;
+ int rc;
+ static char *kwlist[] = {"to", "data", NULL};
+
+ memset(&payload, 0, sizeof(payload));
+ if (!PyArg_ParseTupleAndKeywords(args, kwargs, "sy*", kwlist, &to, &payload)) {
+ return NULL;
+ }
+
+ Py_BEGIN_ALLOW_THREADS
+ rc = omnisocket_udp_session_send(&self->session, to, payload.buf, (size_t) payload.len);
+ Py_END_ALLOW_THREADS
+
+ PyBuffer_Release(&payload);
+ if (rc != 0) {
+ return PyErr_SetFromErrno(PyExc_OSError);
+ }
+ Py_RETURN_NONE;
+}
+
+static PyObject *PyOmniUdpSession_recv(PyOmniUdpSession *self, PyObject *args, PyObject *kwargs) {
+ int timeout_ms = -1;
+ int rc;
+ message_t msg;
+ PyObject *result = NULL;
+ static char *kwlist[] = {"timeout_ms", NULL};
+
+ if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|i", kwlist, &timeout_ms)) {
+ return NULL;
+ }
+
+ protocol_message_init(&msg);
+ Py_BEGIN_ALLOW_THREADS
+ rc = omnisocket_udp_session_recv(&self->session, &msg, timeout_ms);
+ Py_END_ALLOW_THREADS
+
+ if (rc == 1) {
+ protocol_message_clear(&msg);
+ Py_RETURN_NONE;
+ }
+ if (rc != 0) {
+ protocol_message_clear(&msg);
+ return PyErr_SetFromErrno(PyExc_OSError);
+ }
+
+ result = build_recv_result(&msg);
+ protocol_message_clear(&msg);
+ return result;
+}
+
+static PyObject *PyOmniUdpSession_recv_into(PyOmniUdpSession *self, PyObject *args, PyObject *kwargs) {
+ PyObject *buffer_obj;
+ Py_buffer view;
+ int timeout_ms = -1;
+ int rc;
+ udp_client_recv_meta_t meta;
+ PyObject *result = NULL;
+ static char *kwlist[] = {"buffer", "timeout_ms", NULL};
+
+ memset(&view, 0, sizeof(view));
+ memset(&meta, 0, sizeof(meta));
+
+ if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|i", kwlist, &buffer_obj, &timeout_ms)) {
+ return NULL;
+ }
+ if (PyObject_GetBuffer(buffer_obj, &view, PyBUF_WRITABLE) != 0) {
+ return NULL;
+ }
+
+ Py_BEGIN_ALLOW_THREADS
+ rc = omnisocket_udp_session_recv_into(&self->session, view.buf, (size_t) view.len, &meta, timeout_ms);
+ Py_END_ALLOW_THREADS
+
+ PyBuffer_Release(&view);
+ if (rc == 1) {
+ Py_RETURN_NONE;
+ }
+ if (rc == 2) {
+ PyErr_Format(
+ PyExc_BufferError,
+ "buffer too small: need %zu bytes; current frame was already consumed and dropped",
+ meta.body_len
+ );
+ return NULL;
+ }
+ if (rc != 0) {
+ return PyErr_SetFromErrno(PyExc_OSError);
+ }
+
+ result = build_recv_meta_dict(
+ meta.from,
+ meta.to,
+ meta.file_name,
+ (int) meta.type,
+ (unsigned long long) meta.id,
+ (unsigned long long) meta.body_len
+ );
+ return result;
+}
+
+static PyObject *PyOmniUdpSession_stats(PyOmniUdpSession *self, PyObject *Py_UNUSED(ignored)) {
+ omnisocket_session_stats_t stats;
+
+ memset(&stats, 0, sizeof(stats));
+ omnisocket_udp_session_stats_snapshot(&self->session, &stats);
+ return build_stats_dict(&stats);
+}
+
+static PyMethodDef PyOmniUdpSession_methods[] = {
+ {"connect", (PyCFunction) PyOmniUdpSession_connect, METH_VARARGS | METH_KEYWORDS, NULL},
+ {"close", (PyCFunction) PyOmniUdpSession_close, METH_NOARGS, NULL},
+ {"send", (PyCFunction) PyOmniUdpSession_send, METH_VARARGS | METH_KEYWORDS, NULL},
+ {"recv", (PyCFunction) PyOmniUdpSession_recv, METH_VARARGS | METH_KEYWORDS, PyOmniSession_recv_doc},
+ {"recv_into", (PyCFunction) PyOmniUdpSession_recv_into, METH_VARARGS | METH_KEYWORDS, PyOmniSession_recv_into_doc},
+ {"stats", (PyCFunction) PyOmniUdpSession_stats, METH_NOARGS, NULL},
+ {NULL, NULL, 0, NULL}
+};
+
+static PyTypeObject PyOmniUdpSessionType = {
+ PyVarObject_HEAD_INIT(NULL, 0)
+};
+
static PyModuleDef omnisocket_module = {
PyModuleDef_HEAD_INIT,
.m_name = "_omnisocket",
@@ -315,6 +553,17 @@ PyMODINIT_FUNC PyInit__omnisocket(void) {
return NULL;
}
+ PyOmniUdpSessionType.tp_name = "omnisocket.UdpSession";
+ PyOmniUdpSessionType.tp_basicsize = sizeof(PyOmniUdpSession);
+ PyOmniUdpSessionType.tp_flags = Py_TPFLAGS_DEFAULT;
+ PyOmniUdpSessionType.tp_new = PyOmniUdpSession_new;
+ PyOmniUdpSessionType.tp_dealloc = (destructor) PyOmniUdpSession_dealloc;
+ PyOmniUdpSessionType.tp_methods = PyOmniUdpSession_methods;
+
+ if (PyType_Ready(&PyOmniUdpSessionType) < 0) {
+ return NULL;
+ }
+
module = PyModule_Create(&omnisocket_module);
if (module == NULL) {
return NULL;
@@ -327,6 +576,13 @@ PyMODINIT_FUNC PyInit__omnisocket(void) {
return NULL;
}
+ Py_INCREF(&PyOmniUdpSessionType);
+ if (PyModule_AddObject(module, "UdpSession", (PyObject *) &PyOmniUdpSessionType) != 0) {
+ Py_DECREF(&PyOmniUdpSessionType);
+ Py_DECREF(module);
+ return NULL;
+ }
+
if (PyModule_AddIntConstant(module, "MSG_TYPE_TEXT", MSG_TYPE_TEXT) != 0 ||
PyModule_AddIntConstant(module, "MSG_TYPE_FILE", MSG_TYPE_FILE) != 0 ||
PyModule_AddIntConstant(module, "MSG_TYPE_REGISTER", MSG_TYPE_REGISTER) != 0 ||
diff --git a/python/omnisocket/omnisocket_client.c b/python/omnisocket/omnisocket_client.c
index 3557780..3166476 100644
--- a/python/omnisocket/omnisocket_client.c
+++ b/python/omnisocket/omnisocket_client.c
@@ -246,3 +246,245 @@ void omnisocket_session_stats_snapshot(omnisocket_session_t *session, omnisocket
*out_stats = session->stats;
pthread_mutex_unlock(&session->mutex);
}
+
+int omnisocket_udp_session_init(omnisocket_udp_session_t *session) {
+ int rc;
+
+ if (session == NULL) {
+ errno = EINVAL;
+ return -1;
+ }
+ memset(session, 0, sizeof(*session));
+ rc = pthread_mutex_init(&session->mutex, NULL);
+ if (rc != 0) {
+ errno = rc;
+ return -1;
+ }
+ rc = pthread_cond_init(&session->idle_cond, NULL);
+ if (rc != 0) {
+ pthread_mutex_destroy(&session->mutex);
+ errno = rc;
+ return -1;
+ }
+ return 0;
+}
+
+void omnisocket_udp_session_destroy(omnisocket_udp_session_t *session) {
+ if (session == NULL) {
+ return;
+ }
+ (void) omnisocket_udp_session_close(session);
+ pthread_cond_destroy(&session->idle_cond);
+ pthread_mutex_destroy(&session->mutex);
+}
+
+static int omnisocket_udp_session_begin_client_op(omnisocket_udp_session_t *session, udp_client_t **out_client) {
+ if (session == NULL || out_client == NULL) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ pthread_mutex_lock(&session->mutex);
+ if (session->closing) {
+ pthread_mutex_unlock(&session->mutex);
+ errno = ECANCELED;
+ return -1;
+ }
+ if (session->client == NULL) {
+ pthread_mutex_unlock(&session->mutex);
+ errno = ENOTCONN;
+ return -1;
+ }
+ *out_client = session->client;
+ session->active_ops += 1;
+ pthread_mutex_unlock(&session->mutex);
+ return 0;
+}
+
+int omnisocket_udp_session_connect(
+ omnisocket_udp_session_t *session,
+ const char *server_addr,
+ const char *peer_id,
+ const char *bind_ip,
+ const char *bind_device,
+ int enable_timestamping
+) {
+ udp_client_t *client;
+
+ if (session == NULL || server_addr == NULL || peer_id == NULL) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ pthread_mutex_lock(&session->mutex);
+ while (session->closing) {
+ pthread_cond_wait(&session->idle_cond, &session->mutex);
+ }
+ if (session->client != NULL) {
+ pthread_mutex_unlock(&session->mutex);
+ errno = EISCONN;
+ return -1;
+ }
+ client = udp_client_dial_with_options(
+ server_addr,
+ peer_id,
+ bind_ip,
+ bind_device,
+ NULL,
+ NULL,
+ enable_timestamping
+ );
+ if (client == NULL) {
+ pthread_mutex_unlock(&session->mutex);
+ return -1;
+ }
+ session->client = client;
+ session->stats.connected = 1;
+ pthread_mutex_unlock(&session->mutex);
+ return 0;
+}
+
+int omnisocket_udp_session_close(omnisocket_udp_session_t *session) {
+ udp_client_t *client;
+
+ if (session == NULL) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ pthread_mutex_lock(&session->mutex);
+ while (session->closing) {
+ pthread_cond_wait(&session->idle_cond, &session->mutex);
+ }
+ client = session->client;
+ if (client != NULL) {
+ session->closing = 1;
+ session->client = NULL;
+ }
+ session->stats.connected = 0;
+ pthread_mutex_unlock(&session->mutex);
+
+ if (client != NULL) {
+ udp_client_close(client);
+ pthread_mutex_lock(&session->mutex);
+ while (session->active_ops > 0) {
+ pthread_cond_wait(&session->idle_cond, &session->mutex);
+ }
+ pthread_mutex_unlock(&session->mutex);
+ udp_client_free(client);
+ pthread_mutex_lock(&session->mutex);
+ session->closing = 0;
+ pthread_cond_broadcast(&session->idle_cond);
+ pthread_mutex_unlock(&session->mutex);
+ }
+ return 0;
+}
+
+int omnisocket_udp_session_send(omnisocket_udp_session_t *session, const char *to, const void *data, size_t data_len) {
+ udp_client_t *client;
+ int rc;
+
+ if (session == NULL || to == NULL || (data == NULL && data_len > 0)) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (omnisocket_udp_session_begin_client_op(session, &client) != 0) {
+ return -1;
+ }
+ rc = udp_client_send_binary(client, to, data, data_len);
+ pthread_mutex_lock(&session->mutex);
+ if (rc == 0) {
+ session->stats.send_calls += 1;
+ session->stats.send_bytes += (uint64_t) data_len;
+ } else {
+ session->stats.send_errors += 1;
+ }
+ if (session->active_ops > 0) {
+ session->active_ops -= 1;
+ }
+ if (session->closing && session->active_ops == 0) {
+ pthread_cond_broadcast(&session->idle_cond);
+ }
+ pthread_mutex_unlock(&session->mutex);
+ return rc;
+}
+
+int omnisocket_udp_session_recv(omnisocket_udp_session_t *session, message_t *out_msg, int timeout_ms) {
+ udp_client_t *client;
+ int rc;
+
+ if (session == NULL || out_msg == NULL) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (omnisocket_udp_session_begin_client_op(session, &client) != 0) {
+ return -1;
+ }
+ rc = udp_client_receive_timed(client, out_msg, timeout_ms);
+ pthread_mutex_lock(&session->mutex);
+ if (rc == 0) {
+ session->stats.recv_calls += 1;
+ session->stats.recv_bytes += (uint64_t) out_msg->body_len;
+ } else if (rc == 1) {
+ session->stats.recv_timeouts += 1;
+ } else {
+ session->stats.recv_errors += 1;
+ }
+ if (session->active_ops > 0) {
+ session->active_ops -= 1;
+ }
+ if (session->closing && session->active_ops == 0) {
+ pthread_cond_broadcast(&session->idle_cond);
+ }
+ pthread_mutex_unlock(&session->mutex);
+ return rc;
+}
+
+int omnisocket_udp_session_recv_into(
+ omnisocket_udp_session_t *session,
+ void *buffer,
+ size_t buffer_len,
+ udp_client_recv_meta_t *out_meta,
+ int timeout_ms
+) {
+ udp_client_t *client;
+ int rc;
+
+ if (session == NULL || out_meta == NULL || (buffer == NULL && buffer_len > 0)) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (omnisocket_udp_session_begin_client_op(session, &client) != 0) {
+ return -1;
+ }
+ rc = udp_client_receive_into(client, buffer, buffer_len, out_meta, timeout_ms);
+ pthread_mutex_lock(&session->mutex);
+ if (rc == 0) {
+ session->stats.recv_calls += 1;
+ session->stats.recv_bytes += (uint64_t) out_meta->body_len;
+ } else if (rc == 1) {
+ session->stats.recv_timeouts += 1;
+ } else {
+ session->stats.recv_errors += 1;
+ }
+ if (session->active_ops > 0) {
+ session->active_ops -= 1;
+ }
+ if (session->closing && session->active_ops == 0) {
+ pthread_cond_broadcast(&session->idle_cond);
+ }
+ pthread_mutex_unlock(&session->mutex);
+ return rc;
+}
+
+void omnisocket_udp_session_stats_snapshot(omnisocket_udp_session_t *session, omnisocket_session_stats_t *out_stats) {
+ if (session == NULL || out_stats == NULL) {
+ return;
+ }
+ pthread_mutex_lock(&session->mutex);
+ *out_stats = session->stats;
+ pthread_mutex_unlock(&session->mutex);
+}
diff --git a/python/omnisocket/omnisocket_client.h b/python/omnisocket/omnisocket_client.h
index be9e4b2..2ca5fa4 100644
--- a/python/omnisocket/omnisocket_client.h
+++ b/python/omnisocket/omnisocket_client.h
@@ -2,6 +2,7 @@
#define OMNISOCKET_PY_CLIENT_H
#include "peer_kcp_client.h"
+#include "peer_udp_client.h"
typedef struct omnisocket_session_stats {
uint64_t send_calls;
@@ -23,6 +24,15 @@ typedef struct omnisocket_session {
omnisocket_session_stats_t stats;
} omnisocket_session_t;
+typedef struct omnisocket_udp_session {
+ pthread_mutex_t mutex;
+ pthread_cond_t idle_cond;
+ udp_client_t *client;
+ size_t active_ops;
+ int closing;
+ omnisocket_session_stats_t stats;
+} omnisocket_udp_session_t;
+
int omnisocket_session_init(omnisocket_session_t *session);
void omnisocket_session_destroy(omnisocket_session_t *session);
@@ -48,4 +58,27 @@ int omnisocket_session_recv_into(
);
void omnisocket_session_stats_snapshot(omnisocket_session_t *session, omnisocket_session_stats_t *out_stats);
+int omnisocket_udp_session_init(omnisocket_udp_session_t *session);
+void omnisocket_udp_session_destroy(omnisocket_udp_session_t *session);
+
+int omnisocket_udp_session_connect(
+ omnisocket_udp_session_t *session,
+ const char *server_addr,
+ const char *peer_id,
+ const char *bind_ip,
+ const char *bind_device,
+ int enable_timestamping
+);
+int omnisocket_udp_session_close(omnisocket_udp_session_t *session);
+int omnisocket_udp_session_send(omnisocket_udp_session_t *session, const char *to, const void *data, size_t data_len);
+int omnisocket_udp_session_recv(omnisocket_udp_session_t *session, message_t *out_msg, int timeout_ms);
+int omnisocket_udp_session_recv_into(
+ omnisocket_udp_session_t *session,
+ void *buffer,
+ size_t buffer_len,
+ udp_client_recv_meta_t *out_meta,
+ int timeout_ms
+);
+void omnisocket_udp_session_stats_snapshot(omnisocket_udp_session_t *session, omnisocket_session_stats_t *out_stats);
+
#endif
diff --git a/python/tests/test_sessions.py b/python/tests/test_sessions.py
new file mode 100644
index 0000000..fc56919
--- /dev/null
+++ b/python/tests/test_sessions.py
@@ -0,0 +1,180 @@
+from __future__ import annotations
+
+from contextlib import contextmanager
+from pathlib import Path
+import socket
+import subprocess
+import sys
+import threading
+import time
+
+import pytest
+
+
+pytestmark = pytest.mark.skipif(sys.platform != 'linux', reason='Linux-only OmniSocket extension')
+
+ROOT = Path(__file__).resolve().parents[2]
+PYTHON_ROOT = ROOT / 'python'
+if str(PYTHON_ROOT) not in sys.path:
+ sys.path.insert(0, str(PYTHON_ROOT))
+
+omnisocket = pytest.importorskip('omnisocket')
+
+CONTROL_DEFAULTS = omnisocket.CONTROL_DEFAULTS
+MSG_TYPE_BINARY = omnisocket.MSG_TYPE_BINARY
+Session = omnisocket.Session
+UdpSession = omnisocket.UdpSession
+
+
+def _reserve_port() -> int:
+ with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
+ sock.bind(('127.0.0.1', 0))
+ return int(sock.getsockname()[1])
+
+
+@contextmanager
+def _run_server(binary_name: str, listen_addr: str):
+ binary = ROOT / 'bin' / binary_name
+ if not binary.exists():
+ pytest.skip(f'{binary} is not built')
+
+ process = subprocess.Popen(
+ [str(binary), '-listen', listen_addr],
+ cwd=str(ROOT),
+ stdout=subprocess.DEVNULL,
+ stderr=subprocess.DEVNULL,
+ )
+ try:
+ time.sleep(0.2)
+ yield process
+ finally:
+ process.terminate()
+ try:
+ process.wait(timeout=2.0)
+ except subprocess.TimeoutExpired:
+ process.kill()
+ process.wait(timeout=2.0)
+
+
+def _connect_with_retry(session_cls, *, transport: str, server_addr: str, peer_id: str):
+ deadline = time.monotonic() + 3.0
+ last_error: Exception | None = None
+
+ while time.monotonic() < deadline:
+ session = session_cls()
+ try:
+ kwargs: dict[str, object] = {
+ 'server_addr': server_addr,
+ 'peer_id': peer_id,
+ }
+ if transport == 'kcp':
+ kwargs.update(CONTROL_DEFAULTS)
+ else:
+ kwargs['enable_timestamping'] = False
+ session.connect(**kwargs)
+ return session
+ except OSError as exc:
+ last_error = exc
+ time.sleep(0.1)
+
+ raise AssertionError(f'failed to connect {peer_id} to {server_addr}: {last_error}')
+
+
+@pytest.mark.parametrize(
+ ('transport', 'binary_name', 'session_cls'),
+ [
+ ('udp', 'udpserver', UdpSession),
+ ('kcp', 'kcpserver', Session),
+ ],
+)
+def test_control_sessions_smoke(transport: str, binary_name: str, session_cls) -> None:
+ port = _reserve_port()
+ listen_addr = f'127.0.0.1:{port}'
+ sender_id = f'pytest-{transport}-sender'
+ receiver_id = f'pytest-{transport}-receiver'
+
+ with _run_server(binary_name, listen_addr):
+ sender = _connect_with_retry(session_cls, transport=transport, server_addr=listen_addr, peer_id=sender_id)
+ receiver = _connect_with_retry(session_cls, transport=transport, server_addr=listen_addr, peer_id=receiver_id)
+
+ try:
+ assert receiver.recv(timeout_ms=20) is None
+
+ payload = b'control-packet-1'
+ sender.send(to=receiver_id, data=payload)
+ from_peer, msg_type, recv_payload = receiver.recv(timeout_ms=1000)
+ assert from_peer == sender_id
+ assert msg_type == MSG_TYPE_BINARY
+ assert recv_payload == payload
+
+ payload2 = b'control-packet-2'
+ sender.send(to=receiver_id, data=payload2)
+ recv_buffer = bytearray(128)
+ meta = receiver.recv_into(buffer=recv_buffer, timeout_ms=1000)
+ assert meta is not None
+ assert meta['from'] == sender_id
+ assert meta['msg_type'] == MSG_TYPE_BINARY
+ assert meta['body_len'] == len(payload2)
+ assert bytes(recv_buffer[: meta['body_len']]) == payload2
+
+ sender_stats = sender.stats()
+ receiver_stats = receiver.stats()
+ assert sender_stats['connected'] == 1
+ assert receiver_stats['connected'] == 1
+ assert sender_stats['send_calls'] >= 2
+ assert receiver_stats['recv_calls'] >= 2
+ finally:
+ sender.close()
+ receiver.close()
+
+
+def test_udp_session_close_interrupts_blocking_recv() -> None:
+ port = _reserve_port()
+ listen_addr = f'127.0.0.1:{port}'
+ receiver_id = 'pytest-udp-blocking-recv'
+
+ with _run_server('udpserver', listen_addr):
+ receiver = _connect_with_retry(
+ UdpSession,
+ transport='udp',
+ server_addr=listen_addr,
+ peer_id=receiver_id,
+ )
+
+ recv_error: list[BaseException] = []
+ close_error: list[BaseException] = []
+ recv_started = threading.Event()
+ recv_done = threading.Event()
+ close_done = threading.Event()
+
+ def recv_worker() -> None:
+ recv_started.set()
+ try:
+ receiver.recv()
+ except BaseException as exc: # pragma: no cover - assertion is on thread completion
+ recv_error.append(exc)
+ finally:
+ recv_done.set()
+
+ def close_worker() -> None:
+ try:
+ receiver.close()
+ except BaseException as exc: # pragma: no cover - assertion is on thread completion
+ close_error.append(exc)
+ finally:
+ close_done.set()
+
+ recv_thread = threading.Thread(target=recv_worker, daemon=True)
+ recv_thread.start()
+ assert recv_started.wait(timeout=1.0)
+ time.sleep(0.05)
+
+ close_thread = threading.Thread(target=close_worker, daemon=True)
+ close_thread.start()
+
+ assert close_done.wait(timeout=1.0), 'UdpSession.close() blocked while recv() was waiting'
+ assert recv_done.wait(timeout=1.0), 'UdpSession.recv() stayed blocked after close()'
+ assert not close_thread.is_alive()
+ assert not recv_thread.is_alive()
+ assert not close_error
+ assert not recv_error or isinstance(recv_error[0], OSError)
diff --git a/ros-control-c/README.md b/ros-control-c/README.md
index 98b4726..b280639 100644
--- a/ros-control-c/README.md
+++ b/ros-control-c/README.md
@@ -5,6 +5,11 @@
- `udp` (default): unchanged from the original implementation
- `kcp`: sent through OmniSocket using `MSG_TYPE_BINARY`
+Note:
+
+- This README documents the `ros-control-c` path only.
+- `ros-control-py` now uses OmniSocket for both `transport:=udp` and `transport:=kcp`; its `udp` mode is no longer raw socket UDP.
+
## Build
On Linux:
diff --git a/ros-control-py/README.md b/ros-control-py/README.md
new file mode 100644
index 0000000..1e95736
--- /dev/null
+++ b/ros-control-py/README.md
@@ -0,0 +1,246 @@
+# ROS2 Teleop over OmniSocket UDP/KCP
+
+`ros-control-py/udp_teleop_bridge` 现在把 teleop 控制流统一接到 OmniSocket peer 传输上。
+
+- `transport:=udp` 表示 OmniSocket UDP,经 `udpserver/udppeer` 的消息协议传输
+- `transport:=kcp` 表示 OmniSocket KCP,经 `kcpserver/kcppeer` 的消息协议传输
+- 不再使用原来的裸 `socket.sendto()/recvfrom()` UDP 路径
+
+机器人最终接收的话题保持不变:
+
+- topic: `/hric/robot/cmd_vel`
+- type: `geometry_msgs/msg/TwistStamped`
+- frame_id: `pelvis`
+
+控制负载也保持不变:
+
+- fixed payload: 24-byte little-endian `<6f>`
+- order: `lx, ly, lz, ax, ay, az`
+
+## 目录
+
+- `udp_teleop_bridge/udp_teleop_bridge/cmd_vel_udp_sender.py`: 订阅 `TwistStamped`,经 OmniSocket 发送 24 字节控制包
+- `udp_teleop_bridge/udp_teleop_bridge/udp_cmd_vel_receiver.py`: 从 OmniSocket 接收控制包,补时间戳并发布到机器人 ROS2 topic
+- `udp_teleop_bridge/udp_teleop_bridge/omni_transport.py`: 统一封装 OmniSocket UDP/KCP session
+- `udp_teleop_bridge/config/xbox_twist_joy.yaml`: Xbox 手柄映射
+- `udp_teleop_bridge/launch/*.launch.py`: Linux 启动入口
+
+## Linux 构建
+
+先安装 ROS 2 官方 teleop 依赖:
+
+```bash
+sudo apt install ros-${ROS_DISTRO}-joy ros-${ROS_DISTRO}-teleop-twist-joy ros-${ROS_DISTRO}-teleop-twist-keyboard
+```
+
+再构建并安装 OmniSocket Python 扩展:
+
+```bash
+make python-ext
+make python-install
+```
+
+最后构建 ROS 包:
+
+```bash
+colcon build --packages-select udp_teleop_bridge
+source install/setup.bash
+```
+
+如果 `omnisocket` 没有安装到当前 ROS Python 环境,sender/receiver 会直接报错退出。
+
+## 先验证机器人控制语义
+
+在机器人本机先直接低速发布 `/hric/robot/cmd_vel`,确认 `linear.x`、`linear.y`、`angular.z` 的物理方向符合预期:
+
+```bash
+ros2 topic pub /hric/robot/cmd_vel geometry_msgs/msg/TwistStamped \
+ "{header: {frame_id: pelvis}, twist: {linear: {x: 0.10, y: 0.0, z: 0.0}, angular: {x: 0.0, y: 0.0, z: 0.0}}}" \
+ -r 20
+```
+
+```bash
+ros2 topic pub /hric/robot/cmd_vel geometry_msgs/msg/TwistStamped \
+ "{header: {frame_id: pelvis}, twist: {linear: {x: 0.0, y: 0.10, z: 0.0}, angular: {x: 0.0, y: 0.0, z: 0.0}}}" \
+ -r 20
+```
+
+```bash
+ros2 topic pub /hric/robot/cmd_vel geometry_msgs/msg/TwistStamped \
+ "{header: {frame_id: pelvis}, twist: {linear: {x: 0.0, y: 0.0, z: 0.0}, angular: {x: 0.0, y: 0.0, z: 0.30}}}" \
+ -r 20
+```
+
+停止:
+
+```bash
+ros2 topic pub --once /hric/robot/cmd_vel geometry_msgs/msg/TwistStamped \
+ "{header: {frame_id: pelvis}, twist: {linear: {x: 0.0, y: 0.0, z: 0.0}, angular: {x: 0.0, y: 0.0, z: 0.0}}}"
+```
+
+## 启动 OmniSocket Hub
+
+OmniSocket UDP:
+
+```bash
+./bin/udpserver -listen :9001
+```
+
+OmniSocket KCP:
+
+```bash
+./bin/kcpserver -listen :9002
+```
+
+`server_addr` 不传时,节点会按 `transport` 自动选择默认值:
+
+- `udp` -> `127.0.0.1:9001`
+- `kcp` -> `127.0.0.1:9002`
+
+`relay_via` 只在 `transport:=kcp` 时生效。
+
+## 机器人端运行
+
+UDP:
+
+```bash
+ros2 launch udp_teleop_bridge robot_udp_receiver.launch.py \
+ transport:=udp \
+ server_addr:=127.0.0.1:9001 \
+ peer_id:=ros-bridge-ctrl \
+ output_topic:=/hric/robot/cmd_vel \
+ frame_id:=pelvis \
+ watchdog_timeout:=0.5
+```
+
+KCP:
+
+```bash
+ros2 launch udp_teleop_bridge robot_udp_receiver.launch.py \
+ transport:=kcp \
+ server_addr:=127.0.0.1:9002 \
+ peer_id:=ros-bridge-ctrl \
+ output_topic:=/hric/robot/cmd_vel \
+ frame_id:=pelvis \
+ watchdog_timeout:=0.5
+```
+
+如果只允许某个 sender 控制,可以加:
+
+```bash
+expected_sender:=ros-keyboard-ctrl
+```
+
+## 控制端键盘运行
+
+终端 A,启动 sender:
+
+```bash
+ros2 launch udp_teleop_bridge keyboard_sender.launch.py \
+ transport:=udp \
+ server_addr:=127.0.0.1:9001 \
+ peer_id:=ros-keyboard-ctrl \
+ target_peer:=ros-bridge-ctrl
+```
+
+如果走 KCP:
+
+```bash
+ros2 launch udp_teleop_bridge keyboard_sender.launch.py \
+ transport:=kcp \
+ server_addr:=127.0.0.1:9002 \
+ peer_id:=ros-keyboard-ctrl \
+ target_peer:=ros-bridge-ctrl
+```
+
+终端 B,启动官方键盘 teleop:
+
+```bash
+ros2 run teleop_twist_keyboard teleop_twist_keyboard --ros-args \
+ --remap cmd_vel:=/teleop/cmd_vel \
+ -p stamped:=true \
+ -p frame_id:=pelvis \
+ -p speed:=0.20 \
+ -p turn:=0.60
+```
+
+键盘默认键位(`teleop_twist_keyboard`,建议使用 US 键盘布局):
+
+- `i`: 前进(`linear.x > 0`)
+- `,`: 后退(`linear.x < 0`)
+- `j`: 左转(`angular.z > 0`)
+- `l`: 右转(`angular.z < 0`)
+- `Shift + J`: 左平移(`linear.y > 0`)
+- `Shift + L`: 右平移(`linear.y < 0`)
+- `u` / `o` / `m` / `.`: 组合前进或后退加转向
+- `k` 或其他未映射按键: 停止
+- `q` / `z`: 整体速度增加 / 降低 10%
+- `w` / `x`: 仅线速度增加 / 降低 10%
+- `e` / `c`: 仅角速度增加 / 降低 10%
+- `Ctrl-C`: 退出键盘 teleop
+
+## 控制端 Xbox 手柄运行
+
+UDP:
+
+```bash
+ros2 launch udp_teleop_bridge xbox_to_udp.launch.py \
+ transport:=udp \
+ server_addr:=127.0.0.1:9001 \
+ peer_id:=ros-gamepad-ctrl \
+ target_peer:=ros-bridge-ctrl \
+ joy_dev:=/dev/input/js0 \
+ frame_id:=pelvis
+```
+
+KCP:
+
+```bash
+ros2 launch udp_teleop_bridge xbox_to_udp.launch.py \
+ transport:=kcp \
+ server_addr:=127.0.0.1:9002 \
+ peer_id:=ros-gamepad-ctrl \
+ target_peer:=ros-bridge-ctrl \
+ joy_dev:=/dev/input/js0 \
+ frame_id:=pelvis
+```
+
+当前默认手柄映射:
+
+- 左摇杆上下 -> `linear.x`
+- 左摇杆左右 -> `linear.y`
+- 右摇杆左右 -> `angular.z`
+- `RB` 按住才允许运动
+- `LB` 为 turbo
+
+手柄实际操控含义(基于 `config/xbox_twist_joy.yaml` 的 Xbox 默认映射):
+
+- 左摇杆向前 / 向后: 前进 / 后退
+- 左摇杆向左 / 向右: 左平移 / 右平移
+- 右摇杆向左 / 向右: 左转 / 右转
+- 按住 `RB`: 以常速启用运动输出
+- 同时按住 `LB` + `RB`: 启用 turbo,更高的线速度和角速度
+- 松开 `RB` 或将摇杆回中: 输出回到零速
+
+## 数据流
+
+键盘链路:
+
+```text
+teleop_twist_keyboard -> /teleop/cmd_vel (TwistStamped) -> cmd_vel_udp_sender -> OmniSocket UDP/KCP -> udp_cmd_vel_receiver -> /hric/robot/cmd_vel
+```
+
+手柄链路:
+
+```text
+joy_node -> teleop_twist_joy -> /teleop/cmd_vel (TwistStamped) -> cmd_vel_udp_sender -> OmniSocket UDP/KCP -> udp_cmd_vel_receiver -> /hric/robot/cmd_vel
+```
+
+## 安全行为
+
+- sender 默认按 20 Hz 重发最新命令
+- sender 输入超时后会改发零速
+- sender 退出时会主动发送数个零速控制包
+- receiver 超时后会在 ROS 主线程发布零速 stop
+- receiver 只接受 `MSG_TYPE_BINARY` 且长度为 24 字节的负载
+- 非预期 sender、非 binary 消息、错误长度消息都会被丢弃并记录日志
diff --git a/ros-control-py/ROS2 Teleop over UDP.md b/ros-control-py/ROS2 Teleop over UDP.md
new file mode 100644
index 0000000..6f10d33
--- /dev/null
+++ b/ros-control-py/ROS2 Teleop over UDP.md
@@ -0,0 +1,147 @@
+## ROS2 Teleop over OmniSocket UDP/KCP
+
+这个文档对应 `ros-control-py/udp_teleop_bridge` 的当前实现。
+
+核心变化:
+
+- `transport:=udp` 现在表示 OmniSocket UDP
+- `transport:=kcp` 表示 OmniSocket KCP
+- 不再使用原来的裸 `socket` UDP 实现
+
+控制接口保持不变:
+
+- topic: `/hric/robot/cmd_vel`
+- type: `geometry_msgs/msg/TwistStamped`
+- frame_id: `pelvis`
+- payload: fixed 24-byte little-endian `<6f>`
+
+负载顺序:
+
+`lx, ly, lz, ax, ay, az`
+
+### 构建顺序
+
+```bash
+make python-ext
+make python-install
+```
+
+```bash
+colcon build --packages-select udp_teleop_bridge
+source install/setup.bash
+```
+
+### 启动 Hub
+
+OmniSocket UDP:
+
+```bash
+./bin/udpserver -listen :9001
+```
+
+OmniSocket KCP:
+
+```bash
+./bin/kcpserver -listen :9002
+```
+
+### 机器人端 Receiver
+
+```bash
+ros2 launch udp_teleop_bridge robot_udp_receiver.launch.py \
+ transport:=udp \
+ server_addr:=127.0.0.1:9001 \
+ peer_id:=ros-bridge-ctrl \
+ output_topic:=/hric/robot/cmd_vel \
+ frame_id:=pelvis \
+ watchdog_timeout:=0.5
+```
+
+KCP 只需把 `transport` 和 `server_addr` 改成:
+
+```bash
+transport:=kcp server_addr:=127.0.0.1:9002
+```
+
+只接受指定 sender:
+
+```bash
+expected_sender:=ros-keyboard-ctrl
+```
+
+### 键盘 Sender
+
+```bash
+ros2 launch udp_teleop_bridge keyboard_sender.launch.py \
+ transport:=udp \
+ server_addr:=127.0.0.1:9001 \
+ peer_id:=ros-keyboard-ctrl \
+ target_peer:=ros-bridge-ctrl
+```
+
+```bash
+ros2 run teleop_twist_keyboard teleop_twist_keyboard --ros-args \
+ --remap cmd_vel:=/teleop/cmd_vel \
+ -p stamped:=true \
+ -p frame_id:=pelvis \
+ -p speed:=0.20 \
+ -p turn:=0.60
+```
+
+### Xbox Sender
+
+```bash
+ros2 launch udp_teleop_bridge xbox_to_udp.launch.py \
+ transport:=udp \
+ server_addr:=127.0.0.1:9001 \
+ peer_id:=ros-gamepad-ctrl \
+ target_peer:=ros-bridge-ctrl \
+ joy_dev:=/dev/input/js0 \
+ frame_id:=pelvis
+```
+
+### 参数语义
+
+- sender:
+ - `transport`
+ - `server_addr`
+ - `relay_via`
+ - `peer_id`
+ - `target_peer`
+ - `input_topic`
+ - `send_rate_hz`
+ - `input_timeout`
+- receiver:
+ - `transport`
+ - `server_addr`
+ - `relay_via`
+ - `peer_id`
+ - `expected_sender`
+ - `output_topic`
+ - `frame_id`
+ - `watchdog_timeout`
+ - `publish_rate_hz`
+
+`server_addr` 省略时,会按 transport 自动选择:
+
+- `udp` -> `127.0.0.1:9001`
+- `kcp` -> `127.0.0.1:9002`
+
+### 数据流
+
+```text
+teleop_twist_keyboard / teleop_twist_joy
+ -> /teleop/cmd_vel (TwistStamped)
+ -> cmd_vel_udp_sender
+ -> OmniSocket UDP/KCP binary message
+ -> udp_cmd_vel_receiver
+ -> /hric/robot/cmd_vel
+```
+
+### 安全与约束
+
+- sender 默认 20 Hz 重发
+- sender 输入超时后改发零速
+- receiver watchdog 超时后发零速 stop
+- receiver 只接受 24 字节 binary 负载
+- `relay_via` 只在 KCP 模式有效
diff --git a/ros-control-py/udp_teleop_bridge/config/xbox_twist_joy.yaml b/ros-control-py/udp_teleop_bridge/config/xbox_twist_joy.yaml
new file mode 100644
index 0000000..c48735e
--- /dev/null
+++ b/ros-control-py/udp_teleop_bridge/config/xbox_twist_joy.yaml
@@ -0,0 +1,32 @@
+/**:
+ ros__parameters:
+ require_enable_button: true
+ enable_button: 5
+ enable_turbo_button: 4
+ axis_linear:
+ x: 1
+ y: 0
+ z: -1
+ scale_linear:
+ x: -0.30
+ y: -0.25
+ z: 0.0
+ scale_linear_turbo:
+ x: -0.60
+ y: -0.45
+ z: 0.0
+ axis_angular:
+ yaw: 3
+ pitch: -1
+ roll: -1
+ scale_angular:
+ yaw: -0.80
+ pitch: 0.0
+ roll: 0.0
+ scale_angular_turbo:
+ yaw: -1.20
+ pitch: 0.0
+ roll: 0.0
+ inverted_reverse: false
+ publish_stamped_twist: true
+ frame: pelvis
diff --git a/ros-control-py/udp_teleop_bridge/launch/keyboard_sender.launch.py b/ros-control-py/udp_teleop_bridge/launch/keyboard_sender.launch.py
new file mode 100644
index 0000000..49d7667
--- /dev/null
+++ b/ros-control-py/udp_teleop_bridge/launch/keyboard_sender.launch.py
@@ -0,0 +1,34 @@
+from launch import LaunchDescription
+from launch.actions import DeclareLaunchArgument
+from launch.substitutions import LaunchConfiguration
+from launch_ros.actions import Node
+from launch_ros.parameter_descriptions import ParameterValue
+
+
+def generate_launch_description() -> LaunchDescription:
+ return LaunchDescription([
+ DeclareLaunchArgument('transport', default_value='udp'),
+ DeclareLaunchArgument('server_addr', default_value=''),
+ DeclareLaunchArgument('relay_via', default_value=''),
+ DeclareLaunchArgument('peer_id', default_value='ros-keyboard-ctrl'),
+ DeclareLaunchArgument('target_peer', default_value='ros-bridge-ctrl'),
+ DeclareLaunchArgument('input_topic', default_value='/teleop/cmd_vel'),
+ DeclareLaunchArgument('send_rate_hz', default_value='20.0'),
+ DeclareLaunchArgument('input_timeout', default_value='0.75'),
+ Node(
+ package='udp_teleop_bridge',
+ executable='cmd_vel_udp_sender',
+ name='cmd_vel_udp_sender',
+ output='screen',
+ parameters=[{
+ 'transport': LaunchConfiguration('transport'),
+ 'server_addr': LaunchConfiguration('server_addr'),
+ 'relay_via': LaunchConfiguration('relay_via'),
+ 'peer_id': LaunchConfiguration('peer_id'),
+ 'target_peer': LaunchConfiguration('target_peer'),
+ 'input_topic': LaunchConfiguration('input_topic'),
+ 'send_rate_hz': ParameterValue(LaunchConfiguration('send_rate_hz'), value_type=float),
+ 'input_timeout': ParameterValue(LaunchConfiguration('input_timeout'), value_type=float),
+ }],
+ ),
+ ])
diff --git a/ros-control-py/udp_teleop_bridge/launch/robot_udp_receiver.launch.py b/ros-control-py/udp_teleop_bridge/launch/robot_udp_receiver.launch.py
new file mode 100644
index 0000000..c27acc4
--- /dev/null
+++ b/ros-control-py/udp_teleop_bridge/launch/robot_udp_receiver.launch.py
@@ -0,0 +1,36 @@
+from launch import LaunchDescription
+from launch.actions import DeclareLaunchArgument
+from launch.substitutions import LaunchConfiguration
+from launch_ros.actions import Node
+from launch_ros.parameter_descriptions import ParameterValue
+
+
+def generate_launch_description() -> LaunchDescription:
+ return LaunchDescription([
+ DeclareLaunchArgument('transport', default_value='udp'),
+ DeclareLaunchArgument('server_addr', default_value=''),
+ DeclareLaunchArgument('relay_via', default_value=''),
+ DeclareLaunchArgument('peer_id', default_value='ros-bridge-ctrl'),
+ DeclareLaunchArgument('expected_sender', default_value=''),
+ DeclareLaunchArgument('output_topic', default_value='/hric/robot/cmd_vel'),
+ DeclareLaunchArgument('frame_id', default_value='pelvis'),
+ DeclareLaunchArgument('watchdog_timeout', default_value='0.5'),
+ DeclareLaunchArgument('publish_rate_hz', default_value='100.0'),
+ Node(
+ package='udp_teleop_bridge',
+ executable='udp_cmd_vel_receiver',
+ name='udp_cmd_vel_receiver',
+ output='screen',
+ parameters=[{
+ 'transport': LaunchConfiguration('transport'),
+ 'server_addr': LaunchConfiguration('server_addr'),
+ 'relay_via': LaunchConfiguration('relay_via'),
+ 'peer_id': LaunchConfiguration('peer_id'),
+ 'expected_sender': LaunchConfiguration('expected_sender'),
+ 'output_topic': LaunchConfiguration('output_topic'),
+ 'frame_id': LaunchConfiguration('frame_id'),
+ 'watchdog_timeout': ParameterValue(LaunchConfiguration('watchdog_timeout'), value_type=float),
+ 'publish_rate_hz': ParameterValue(LaunchConfiguration('publish_rate_hz'), value_type=float),
+ }],
+ ),
+ ])
diff --git a/ros-control-py/udp_teleop_bridge/launch/xbox_to_udp.launch.py b/ros-control-py/udp_teleop_bridge/launch/xbox_to_udp.launch.py
new file mode 100644
index 0000000..d9038d0
--- /dev/null
+++ b/ros-control-py/udp_teleop_bridge/launch/xbox_to_udp.launch.py
@@ -0,0 +1,74 @@
+from launch import LaunchDescription
+from launch.actions import DeclareLaunchArgument
+from launch.substitutions import LaunchConfiguration, PathJoinSubstitution
+from launch_ros.actions import Node
+from launch_ros.parameter_descriptions import ParameterValue
+from launch_ros.substitutions import FindPackageShare
+
+
+def generate_launch_description() -> LaunchDescription:
+ teleop_config = PathJoinSubstitution([
+ FindPackageShare('udp_teleop_bridge'),
+ 'config',
+ 'xbox_twist_joy.yaml',
+ ])
+
+ teleop_topic = LaunchConfiguration('teleop_topic')
+
+ return LaunchDescription([
+ DeclareLaunchArgument('transport', default_value='udp'),
+ DeclareLaunchArgument('server_addr', default_value=''),
+ DeclareLaunchArgument('relay_via', default_value=''),
+ DeclareLaunchArgument('peer_id', default_value='ros-gamepad-ctrl'),
+ DeclareLaunchArgument('target_peer', default_value='ros-bridge-ctrl'),
+ DeclareLaunchArgument('joy_dev', default_value='/dev/input/js0'),
+ DeclareLaunchArgument('deadzone', default_value='0.10'),
+ DeclareLaunchArgument('autorepeat_rate', default_value='20.0'),
+ DeclareLaunchArgument('frame_id', default_value='pelvis'),
+ DeclareLaunchArgument('teleop_topic', default_value='/teleop/cmd_vel'),
+ DeclareLaunchArgument('send_rate_hz', default_value='20.0'),
+ DeclareLaunchArgument('input_timeout', default_value='0.30'),
+ Node(
+ package='joy',
+ executable='joy_node',
+ name='joy_node',
+ output='screen',
+ parameters=[{
+ 'dev': LaunchConfiguration('joy_dev'),
+ 'deadzone': ParameterValue(LaunchConfiguration('deadzone'), value_type=float),
+ 'autorepeat_rate': ParameterValue(LaunchConfiguration('autorepeat_rate'), value_type=float),
+ }],
+ ),
+ Node(
+ package='teleop_twist_joy',
+ executable='teleop_node',
+ name='teleop_twist_joy',
+ output='screen',
+ parameters=[
+ teleop_config,
+ {
+ 'publish_stamped_twist': True,
+ 'frame': LaunchConfiguration('frame_id'),
+ },
+ ],
+ remappings=[
+ ('cmd_vel', teleop_topic),
+ ],
+ ),
+ Node(
+ package='udp_teleop_bridge',
+ executable='cmd_vel_udp_sender',
+ name='cmd_vel_udp_sender',
+ output='screen',
+ parameters=[{
+ 'transport': LaunchConfiguration('transport'),
+ 'server_addr': LaunchConfiguration('server_addr'),
+ 'relay_via': LaunchConfiguration('relay_via'),
+ 'peer_id': LaunchConfiguration('peer_id'),
+ 'target_peer': LaunchConfiguration('target_peer'),
+ 'input_topic': teleop_topic,
+ 'send_rate_hz': ParameterValue(LaunchConfiguration('send_rate_hz'), value_type=float),
+ 'input_timeout': ParameterValue(LaunchConfiguration('input_timeout'), value_type=float),
+ }],
+ ),
+ ])
diff --git a/ros-control-py/udp_teleop_bridge/package.xml b/ros-control-py/udp_teleop_bridge/package.xml
new file mode 100644
index 0000000..28262bd
--- /dev/null
+++ b/ros-control-py/udp_teleop_bridge/package.xml
@@ -0,0 +1,24 @@
+
+
+ udp_teleop_bridge
+ 0.1.0
+ ROS 2 OmniSocket UDP/KCP bridge for teleop TwistStamped commands.
+
+ Codex
+ MIT
+
+ ament_python
+
+ ament_index_python
+ geometry_msgs
+ joy
+ launch
+ launch_ros
+ rclpy
+ teleop_twist_joy
+ teleop_twist_keyboard
+
+
+ ament_python
+
+
diff --git a/ros-control-py/udp_teleop_bridge/resource/udp_teleop_bridge b/ros-control-py/udp_teleop_bridge/resource/udp_teleop_bridge
new file mode 100644
index 0000000..9cc185f
--- /dev/null
+++ b/ros-control-py/udp_teleop_bridge/resource/udp_teleop_bridge
@@ -0,0 +1 @@
+udp_teleop_bridge
diff --git a/ros-control-py/udp_teleop_bridge/setup.cfg b/ros-control-py/udp_teleop_bridge/setup.cfg
new file mode 100644
index 0000000..8f79a94
--- /dev/null
+++ b/ros-control-py/udp_teleop_bridge/setup.cfg
@@ -0,0 +1,5 @@
+[develop]
+script_dir=$base/lib/udp_teleop_bridge
+
+[install]
+install_scripts=$base/lib/udp_teleop_bridge
diff --git a/ros-control-py/udp_teleop_bridge/setup.py b/ros-control-py/udp_teleop_bridge/setup.py
new file mode 100644
index 0000000..4957d6d
--- /dev/null
+++ b/ros-control-py/udp_teleop_bridge/setup.py
@@ -0,0 +1,34 @@
+from setuptools import find_packages, setup
+
+
+package_name = 'udp_teleop_bridge'
+
+
+setup(
+ name=package_name,
+ version='0.1.0',
+ packages=find_packages(exclude=['test']),
+ data_files=[
+ ('share/ament_index/resource_index/packages', [f'resource/{package_name}']),
+ (f'share/{package_name}', ['package.xml']),
+ (f'share/{package_name}/launch', [
+ 'launch/keyboard_sender.launch.py',
+ 'launch/robot_udp_receiver.launch.py',
+ 'launch/xbox_to_udp.launch.py',
+ ]),
+ (f'share/{package_name}/config', ['config/xbox_twist_joy.yaml']),
+ ],
+ install_requires=['setuptools'],
+ zip_safe=True,
+ maintainer='Codex',
+ maintainer_email='codex@example.com',
+ description='ROS 2 OmniSocket UDP/KCP bridge for teleop TwistStamped commands.',
+ license='MIT',
+ tests_require=['pytest'],
+ entry_points={
+ 'console_scripts': [
+ 'cmd_vel_udp_sender = udp_teleop_bridge.cmd_vel_udp_sender:main',
+ 'udp_cmd_vel_receiver = udp_teleop_bridge.udp_cmd_vel_receiver:main',
+ ],
+ },
+)
diff --git a/ros-control-py/udp_teleop_bridge/test/test_protocol.py b/ros-control-py/udp_teleop_bridge/test/test_protocol.py
new file mode 100644
index 0000000..87cfbe1
--- /dev/null
+++ b/ros-control-py/udp_teleop_bridge/test/test_protocol.py
@@ -0,0 +1,54 @@
+from pathlib import Path
+import sys
+
+import pytest
+
+
+ROOT = Path(__file__).resolve().parents[1]
+if str(ROOT) not in sys.path:
+ sys.path.insert(0, str(ROOT))
+
+from udp_teleop_bridge.protocol import ( # noqa: E402
+ PACKET_SIZE,
+ default_server_addr_for_transport,
+ normalize_command,
+ normalize_transport,
+ pack_command,
+ unpack_command,
+)
+
+
+def test_pack_unpack_round_trip() -> None:
+ command = (0.1, -0.2, 0.3, -0.4, 0.5, -0.6)
+
+ payload = pack_command(command)
+
+ assert len(payload) == PACKET_SIZE
+ assert unpack_command(payload) == pytest.approx(command)
+
+
+@pytest.mark.parametrize('value', [float('nan'), float('inf'), float('-inf')])
+def test_normalize_command_rejects_non_finite_values(value: float) -> None:
+ with pytest.raises(ValueError, match='non-finite'):
+ normalize_command((0.0, 0.0, value, 0.0, 0.0, 0.0))
+
+
+def test_unpack_command_rejects_wrong_length() -> None:
+ with pytest.raises(ValueError, match='Expected'):
+ unpack_command(b'\x00' * (PACKET_SIZE - 1))
+
+
+@pytest.mark.parametrize(
+ ('transport', 'expected'),
+ [
+ ('udp', '127.0.0.1:9001'),
+ ('kcp', '127.0.0.1:9002'),
+ ],
+)
+def test_default_server_addr_for_transport(transport: str, expected: str) -> None:
+ assert default_server_addr_for_transport(transport) == expected
+
+
+def test_normalize_transport_rejects_unknown_value() -> None:
+ with pytest.raises(ValueError, match='Unsupported transport'):
+ normalize_transport('sctp')
diff --git a/ros-control-py/udp_teleop_bridge/udp_teleop_bridge/__init__.py b/ros-control-py/udp_teleop_bridge/udp_teleop_bridge/__init__.py
new file mode 100644
index 0000000..094feaf
--- /dev/null
+++ b/ros-control-py/udp_teleop_bridge/udp_teleop_bridge/__init__.py
@@ -0,0 +1 @@
+"""OmniSocket teleop bridge package."""
diff --git a/ros-control-py/udp_teleop_bridge/udp_teleop_bridge/cmd_vel_udp_sender.py b/ros-control-py/udp_teleop_bridge/udp_teleop_bridge/cmd_vel_udp_sender.py
new file mode 100644
index 0000000..34a5ce1
--- /dev/null
+++ b/ros-control-py/udp_teleop_bridge/udp_teleop_bridge/cmd_vel_udp_sender.py
@@ -0,0 +1,207 @@
+"""ROS 2 node that forwards TwistStamped teleop commands over OmniSocket."""
+
+from __future__ import annotations
+
+import threading
+import time
+from typing import Dict, Optional, Tuple
+
+import rclpy
+from geometry_msgs.msg import TwistStamped
+from rclpy.node import Node
+
+from .omni_transport import MSG_TYPE_ERROR, OmniTransport
+from .protocol import (
+ DEFAULT_EXIT_ZERO_PACKETS,
+ DEFAULT_INPUT_TIMEOUT,
+ DEFAULT_INPUT_TOPIC,
+ DEFAULT_KEYBOARD_PEER_ID,
+ DEFAULT_QUEUE_DEPTH,
+ DEFAULT_SEND_RATE_HZ,
+ DEFAULT_TARGET_PEER,
+ DEFAULT_TRANSPORT,
+ ZERO_COMMAND,
+ pack_command,
+)
+
+
+CommandTuple = Tuple[float, float, float, float, float, float]
+
+
+class CmdVelUdpSender(Node):
+ """Forward TwistStamped messages to a remote OmniSocket peer."""
+
+ def __init__(self) -> None:
+ super().__init__('cmd_vel_udp_sender')
+
+ self.declare_parameter('transport', DEFAULT_TRANSPORT)
+ self.declare_parameter('server_addr', '')
+ self.declare_parameter('relay_via', '')
+ self.declare_parameter('peer_id', DEFAULT_KEYBOARD_PEER_ID)
+ self.declare_parameter('target_peer', DEFAULT_TARGET_PEER)
+ self.declare_parameter('input_topic', DEFAULT_INPUT_TOPIC)
+ self.declare_parameter('send_rate_hz', DEFAULT_SEND_RATE_HZ)
+ self.declare_parameter('input_timeout', DEFAULT_INPUT_TIMEOUT)
+ self.declare_parameter('queue_depth', DEFAULT_QUEUE_DEPTH)
+ self.declare_parameter('exit_zero_packets', DEFAULT_EXIT_ZERO_PACKETS)
+
+ self._transport_name = str(self.get_parameter('transport').value)
+ self._server_addr = str(self.get_parameter('server_addr').value)
+ self._relay_via = str(self.get_parameter('relay_via').value)
+ self._peer_id = str(self.get_parameter('peer_id').value)
+ self._target_peer = str(self.get_parameter('target_peer').value).strip()
+ self._input_topic = str(self.get_parameter('input_topic').value)
+ self._send_rate_hz = float(self.get_parameter('send_rate_hz').value)
+ self._input_timeout = float(self.get_parameter('input_timeout').value)
+ self._queue_depth = int(self.get_parameter('queue_depth').value)
+ self._exit_zero_packets = int(self.get_parameter('exit_zero_packets').value)
+
+ if self._send_rate_hz <= 0.0:
+ raise ValueError('send_rate_hz must be > 0')
+ if self._input_timeout < 0.0:
+ raise ValueError('input_timeout must be >= 0')
+ if self._queue_depth <= 0:
+ raise ValueError('queue_depth must be > 0')
+ if not self._target_peer:
+ raise ValueError('target_peer must not be empty')
+
+ self._transport = OmniTransport(
+ transport=self._transport_name,
+ server_addr=self._server_addr,
+ relay_via=self._relay_via,
+ peer_id=self._peer_id,
+ )
+ self._last_log_times: Dict[str, float] = {}
+ self._latest_command: CommandTuple = ZERO_COMMAND
+ self._last_input_monotonic: Optional[float] = None
+ self._last_sent_command: Optional[CommandTuple] = None
+ self._closing = threading.Event()
+
+ self.create_subscription(
+ TwistStamped,
+ self._input_topic,
+ self._handle_twist,
+ self._queue_depth,
+ )
+ self.create_timer(1.0 / self._send_rate_hz, self._send_latest_command)
+
+ self._drain_thread = threading.Thread(target=self._drain_incoming, daemon=True)
+ self._drain_thread.start()
+
+ self.get_logger().info(
+ 'Forwarding TwistStamped from %s via %s://%s as %s -> %s at %.1f Hz '
+ '(input timeout %.2f s)'
+ % (
+ self._input_topic,
+ self._transport.transport,
+ self._transport.server_addr,
+ self._peer_id,
+ self._target_peer,
+ self._send_rate_hz,
+ self._input_timeout,
+ )
+ )
+
+ def _should_log(self, key: str, throttle_sec: float) -> bool:
+ now = time.monotonic()
+ previous = self._last_log_times.get(key)
+ if previous is None or (now - previous) >= throttle_sec:
+ self._last_log_times[key] = now
+ return True
+ return False
+
+ def _handle_twist(self, msg: TwistStamped) -> None:
+ self._latest_command = (
+ float(msg.twist.linear.x),
+ float(msg.twist.linear.y),
+ float(msg.twist.linear.z),
+ float(msg.twist.angular.x),
+ float(msg.twist.angular.y),
+ float(msg.twist.angular.z),
+ )
+ self._last_input_monotonic = time.monotonic()
+
+ def _command_for_current_tick(self) -> CommandTuple:
+ if self._last_input_monotonic is None:
+ return ZERO_COMMAND
+ if self._input_timeout == 0.0:
+ return self._latest_command
+ age = time.monotonic() - self._last_input_monotonic
+ if age > self._input_timeout:
+ return ZERO_COMMAND
+ return self._latest_command
+
+ def _send_command(self, command: CommandTuple) -> None:
+ payload = pack_command(command)
+ try:
+ self._transport.send(to=self._target_peer, data=payload)
+ self._last_sent_command = command
+ except OSError as exc:
+ if self._should_log('send_error', 2.0):
+ self.get_logger().error(f'OmniSocket send failed: {exc}')
+
+ def _send_latest_command(self) -> None:
+ self._send_command(self._command_for_current_tick())
+
+ def _log_inbound_message(self, from_peer: str, msg_type: int, payload: bytes) -> None:
+ if msg_type == MSG_TYPE_ERROR:
+ if self._should_log('server_error', 1.0):
+ text = payload.decode('utf-8', errors='replace')
+ self.get_logger().error(f'OmniSocket server error from {from_peer}: {text}')
+ return
+
+ if self._should_log('unexpected_inbound', 2.0):
+ self.get_logger().warning(
+ 'Ignoring unexpected inbound message type %d from %s (%d bytes)'
+ % (msg_type, from_peer, len(payload))
+ )
+
+ def _drain_incoming(self) -> None:
+ while not self._closing.is_set() and rclpy.ok():
+ try:
+ result = self._transport.recv(timeout_ms=100)
+ except OSError as exc:
+ if not self._closing.is_set() and self._should_log('drain_error', 2.0):
+ self.get_logger().error(f'OmniSocket receive loop stopped: {exc}')
+ return
+
+ if result is None:
+ continue
+
+ from_peer, msg_type, payload = result
+ self._log_inbound_message(from_peer, msg_type, payload)
+
+ def send_zero_burst(self) -> None:
+ """Best-effort stop command sent during shutdown."""
+ for _ in range(max(1, self._exit_zero_packets)):
+ self._send_command(ZERO_COMMAND)
+ time.sleep(0.02)
+
+ def close(self) -> None:
+ self._closing.set()
+ if hasattr(self, '_transport') and self._transport is not None:
+ try:
+ self._transport.close()
+ except OSError as exc:
+ if self._should_log('close_error', 2.0):
+ self.get_logger().warning(f'Closing OmniSocket transport failed: {exc}')
+ self._transport = None
+ if hasattr(self, '_drain_thread') and self._drain_thread.is_alive():
+ self._drain_thread.join(timeout=0.5)
+
+ def destroy_node(self) -> bool:
+ self.close()
+ return super().destroy_node()
+
+
+def main(args: Optional[list[str]] = None) -> None:
+ rclpy.init(args=args)
+ node = CmdVelUdpSender()
+ try:
+ rclpy.spin(node)
+ except KeyboardInterrupt:
+ pass
+ finally:
+ node.send_zero_burst()
+ node.destroy_node()
+ rclpy.shutdown()
diff --git a/ros-control-py/udp_teleop_bridge/udp_teleop_bridge/omni_transport.py b/ros-control-py/udp_teleop_bridge/udp_teleop_bridge/omni_transport.py
new file mode 100644
index 0000000..6c0353c
--- /dev/null
+++ b/ros-control-py/udp_teleop_bridge/udp_teleop_bridge/omni_transport.py
@@ -0,0 +1,95 @@
+"""Helpers for working with OmniSocket transport sessions."""
+
+from __future__ import annotations
+
+from .protocol import default_server_addr_for_transport, normalize_transport
+
+
+try:
+ from omnisocket import (
+ CONTROL_DEFAULTS,
+ MSG_TYPE_BINARY,
+ MSG_TYPE_ERROR,
+ Session,
+ UdpSession,
+ )
+except ImportError as exc: # pragma: no cover - depends on external build/install
+ raise RuntimeError(
+ 'omnisocket is not installed for this Python environment; run '
+ '`make python-ext && make python-install` on a Linux host first'
+ ) from exc
+
+
+def _normalize_optional(value: object) -> str:
+ return str(value).strip()
+
+
+class OmniTransport:
+ """Small wrapper that normalizes OmniSocket UDP/KCP session setup."""
+
+ def __init__(
+ self,
+ *,
+ transport: object,
+ server_addr: object,
+ peer_id: object,
+ relay_via: object = '',
+ bind_ip: object = '',
+ bind_device: object = '',
+ enable_timestamping: bool = False,
+ ) -> None:
+ self.transport = normalize_transport(transport)
+ self.server_addr = _normalize_optional(server_addr) or default_server_addr_for_transport(self.transport)
+ self.peer_id = _normalize_optional(peer_id)
+ self.relay_via = _normalize_optional(relay_via)
+ self.bind_ip = _normalize_optional(bind_ip)
+ self.bind_device = _normalize_optional(bind_device)
+
+ if not self.peer_id:
+ raise ValueError('peer_id must not be empty')
+
+ session_cls = Session if self.transport == 'kcp' else UdpSession
+ self._session = session_cls()
+
+ connect_kwargs: dict[str, object] = {
+ 'server_addr': self.server_addr,
+ 'peer_id': self.peer_id,
+ }
+ if self.bind_ip:
+ connect_kwargs['bind_ip'] = self.bind_ip
+ if self.bind_device:
+ connect_kwargs['bind_device'] = self.bind_device
+
+ if self.transport == 'kcp':
+ if self.relay_via:
+ connect_kwargs['relay_via'] = self.relay_via
+ connect_kwargs.update(CONTROL_DEFAULTS)
+ else:
+ connect_kwargs['enable_timestamping'] = bool(enable_timestamping)
+
+ self._session.connect(**connect_kwargs)
+
+ def send(self, *, to: str, data: bytes) -> None:
+ self._session.send(to=to, data=data)
+
+ def recv(self, *, timeout_ms: int = -1):
+ return self._session.recv(timeout_ms=timeout_ms)
+
+ def recv_into(self, *, buffer, timeout_ms: int = -1):
+ return self._session.recv_into(buffer=buffer, timeout_ms=timeout_ms)
+
+ def close(self) -> None:
+ self._session.close()
+
+ def stats(self) -> dict[str, int]:
+ return self._session.stats()
+
+
+__all__ = [
+ 'CONTROL_DEFAULTS',
+ 'MSG_TYPE_BINARY',
+ 'MSG_TYPE_ERROR',
+ 'OmniTransport',
+ 'Session',
+ 'UdpSession',
+]
diff --git a/ros-control-py/udp_teleop_bridge/udp_teleop_bridge/protocol.py b/ros-control-py/udp_teleop_bridge/udp_teleop_bridge/protocol.py
new file mode 100644
index 0000000..44f4641
--- /dev/null
+++ b/ros-control-py/udp_teleop_bridge/udp_teleop_bridge/protocol.py
@@ -0,0 +1,74 @@
+"""Shared teleop protocol helpers and transport defaults."""
+
+from __future__ import annotations
+
+import math
+import struct
+from typing import Iterable, Tuple
+
+
+COMMAND_STRUCT = struct.Struct('<6f')
+PACKET_SIZE = COMMAND_STRUCT.size
+
+SUPPORTED_TRANSPORTS = ('udp', 'kcp')
+DEFAULT_TRANSPORT = 'udp'
+
+DEFAULT_OMNI_UDP_SERVER_ADDR = '127.0.0.1:9001'
+DEFAULT_OMNI_KCP_SERVER_ADDR = '127.0.0.1:9002'
+
+DEFAULT_KEYBOARD_PEER_ID = 'ros-keyboard-ctrl'
+DEFAULT_GAMEPAD_PEER_ID = 'ros-gamepad-ctrl'
+DEFAULT_BRIDGE_PEER_ID = 'ros-bridge-ctrl'
+DEFAULT_TARGET_PEER = DEFAULT_BRIDGE_PEER_ID
+
+DEFAULT_FRAME_ID = 'pelvis'
+DEFAULT_INPUT_TOPIC = '/teleop/cmd_vel'
+DEFAULT_OUTPUT_TOPIC = '/hric/robot/cmd_vel'
+DEFAULT_SEND_RATE_HZ = 20.0
+DEFAULT_INPUT_TIMEOUT = 0.75
+DEFAULT_WATCHDOG_TIMEOUT = 0.5
+DEFAULT_PUBLISH_RATE_HZ = 100.0
+DEFAULT_QUEUE_DEPTH = 10
+DEFAULT_EXIT_ZERO_PACKETS = 3
+DEFAULT_RECV_BUFFER_BYTES = 2048
+
+ZERO_COMMAND = (0.0, 0.0, 0.0, 0.0, 0.0, 0.0)
+
+
+def normalize_transport(value: object) -> str:
+ """Return a supported transport name."""
+ transport = str(value).strip().lower()
+ if transport not in SUPPORTED_TRANSPORTS:
+ supported = ', '.join(SUPPORTED_TRANSPORTS)
+ raise ValueError(f"Unsupported transport '{transport}', expected one of: {supported}")
+ return transport
+
+
+def default_server_addr_for_transport(transport: str) -> str:
+ """Return the default OmniSocket server for the chosen transport."""
+ transport = normalize_transport(transport)
+ if transport == 'udp':
+ return DEFAULT_OMNI_UDP_SERVER_ADDR
+ return DEFAULT_OMNI_KCP_SERVER_ADDR
+
+
+def normalize_command(values: Iterable[float]) -> Tuple[float, float, float, float, float, float]:
+ """Return a finite six-float command tuple."""
+ command = tuple(float(value) for value in values)
+ if len(command) != 6:
+ raise ValueError(f'Expected 6 command values, got {len(command)}')
+ if any(not math.isfinite(value) for value in command):
+ raise ValueError('Command contains a non-finite value')
+ return command
+
+
+def pack_command(values: Iterable[float]) -> bytes:
+ """Pack six floats into the wire format."""
+ return COMMAND_STRUCT.pack(*normalize_command(values))
+
+
+def unpack_command(payload: bytes) -> Tuple[float, float, float, float, float, float]:
+ """Decode a control packet into a six-float command tuple."""
+ if len(payload) != PACKET_SIZE:
+ raise ValueError(f'Expected {PACKET_SIZE} bytes, got {len(payload)}')
+ return normalize_command(COMMAND_STRUCT.unpack(payload))
diff --git a/ros-control-py/udp_teleop_bridge/udp_teleop_bridge/udp_cmd_vel_receiver.py b/ros-control-py/udp_teleop_bridge/udp_teleop_bridge/udp_cmd_vel_receiver.py
new file mode 100644
index 0000000..5ead458
--- /dev/null
+++ b/ros-control-py/udp_teleop_bridge/udp_teleop_bridge/udp_cmd_vel_receiver.py
@@ -0,0 +1,238 @@
+"""ROS 2 node that receives OmniSocket teleop packets and republishes TwistStamped."""
+
+from __future__ import annotations
+
+import threading
+import time
+from typing import Dict, Optional, Tuple
+
+import rclpy
+from geometry_msgs.msg import TwistStamped
+from rclpy.node import Node
+
+from .omni_transport import MSG_TYPE_BINARY, MSG_TYPE_ERROR, OmniTransport
+from .protocol import (
+ DEFAULT_BRIDGE_PEER_ID,
+ DEFAULT_FRAME_ID,
+ DEFAULT_OUTPUT_TOPIC,
+ DEFAULT_PUBLISH_RATE_HZ,
+ DEFAULT_QUEUE_DEPTH,
+ DEFAULT_RECV_BUFFER_BYTES,
+ DEFAULT_TRANSPORT,
+ DEFAULT_WATCHDOG_TIMEOUT,
+ PACKET_SIZE,
+ ZERO_COMMAND,
+ unpack_command,
+)
+
+
+CommandTuple = Tuple[float, float, float, float, float, float]
+
+
+class UdpCmdVelReceiver(Node):
+ """Publish TwistStamped commands from the OmniSocket control wire format."""
+
+ def __init__(self) -> None:
+ super().__init__('udp_cmd_vel_receiver')
+
+ self.declare_parameter('transport', DEFAULT_TRANSPORT)
+ self.declare_parameter('server_addr', '')
+ self.declare_parameter('relay_via', '')
+ self.declare_parameter('peer_id', DEFAULT_BRIDGE_PEER_ID)
+ self.declare_parameter('expected_sender', '')
+ self.declare_parameter('output_topic', DEFAULT_OUTPUT_TOPIC)
+ self.declare_parameter('frame_id', DEFAULT_FRAME_ID)
+ self.declare_parameter('watchdog_timeout', DEFAULT_WATCHDOG_TIMEOUT)
+ self.declare_parameter('publish_rate_hz', DEFAULT_PUBLISH_RATE_HZ)
+ self.declare_parameter('queue_depth', DEFAULT_QUEUE_DEPTH)
+
+ self._transport_name = str(self.get_parameter('transport').value)
+ self._server_addr = str(self.get_parameter('server_addr').value)
+ self._relay_via = str(self.get_parameter('relay_via').value)
+ self._peer_id = str(self.get_parameter('peer_id').value)
+ self._expected_sender = str(self.get_parameter('expected_sender').value).strip()
+ self._output_topic = str(self.get_parameter('output_topic').value)
+ self._frame_id = str(self.get_parameter('frame_id').value)
+ self._watchdog_timeout = float(self.get_parameter('watchdog_timeout').value)
+ self._publish_rate_hz = float(self.get_parameter('publish_rate_hz').value)
+ self._queue_depth = int(self.get_parameter('queue_depth').value)
+
+ if self._watchdog_timeout <= 0.0:
+ raise ValueError('watchdog_timeout must be > 0')
+ if self._publish_rate_hz <= 0.0:
+ raise ValueError('publish_rate_hz must be > 0')
+ if self._queue_depth <= 0:
+ raise ValueError('queue_depth must be > 0')
+
+ self._publisher = self.create_publisher(TwistStamped, self._output_topic, self._queue_depth)
+ self._transport = OmniTransport(
+ transport=self._transport_name,
+ server_addr=self._server_addr,
+ relay_via=self._relay_via,
+ peer_id=self._peer_id,
+ )
+
+ self._lock = threading.Lock()
+ self._last_log_times: Dict[str, float] = {}
+ self._latest_command: CommandTuple = ZERO_COMMAND
+ self._last_packet_monotonic: Optional[float] = None
+ self._last_published_command: CommandTuple = ZERO_COMMAND
+ self._closing = threading.Event()
+ self._recv_buffer = bytearray(DEFAULT_RECV_BUFFER_BYTES)
+
+ self.create_timer(1.0 / self._publish_rate_hz, self._publish_tick)
+
+ self._recv_thread = threading.Thread(target=self._recv_loop, daemon=True)
+ self._recv_thread.start()
+
+ self.get_logger().info(
+ 'Receiving teleop commands via %s://%s as %s and publishing TwistStamped to %s '
+ 'at %.1f Hz (frame_id=%s, watchdog %.2f s)'
+ % (
+ self._transport.transport,
+ self._transport.server_addr,
+ self._peer_id,
+ self._output_topic,
+ self._publish_rate_hz,
+ self._frame_id,
+ self._watchdog_timeout,
+ )
+ )
+
+ def _should_log(self, key: str, throttle_sec: float) -> bool:
+ now = time.monotonic()
+ previous = self._last_log_times.get(key)
+ if previous is None or (now - previous) >= throttle_sec:
+ self._last_log_times[key] = now
+ return True
+ return False
+
+ def _publish_command(self, command: CommandTuple) -> None:
+ msg = TwistStamped()
+ msg.header.stamp = self.get_clock().now().to_msg()
+ msg.header.frame_id = self._frame_id
+ msg.twist.linear.x = command[0]
+ msg.twist.linear.y = command[1]
+ msg.twist.linear.z = command[2]
+ msg.twist.angular.x = command[3]
+ msg.twist.angular.y = command[4]
+ msg.twist.angular.z = command[5]
+ self._publisher.publish(msg)
+ self._last_published_command = command
+
+ def _handle_error_message(self, from_peer: str, body_len: int) -> None:
+ if self._should_log('server_error', 1.0):
+ text = bytes(self._recv_buffer[:body_len]).decode('utf-8', errors='replace')
+ self.get_logger().error(f'OmniSocket server error from {from_peer}: {text}')
+
+ def _recv_loop(self) -> None:
+ while not self._closing.is_set() and rclpy.ok():
+ try:
+ meta = self._transport.recv_into(buffer=self._recv_buffer, timeout_ms=100)
+ except BufferError as exc:
+ if self._should_log('buffer_error', 2.0):
+ self.get_logger().warning(f'Dropped oversized OmniSocket frame: {exc}')
+ continue
+ except OSError as exc:
+ if not self._closing.is_set() and self._should_log('recv_error', 2.0):
+ self.get_logger().error(f'OmniSocket receive loop stopped: {exc}')
+ return
+
+ if meta is None:
+ continue
+
+ from_peer = str(meta['from'])
+ msg_type = int(meta['msg_type'])
+ body_len = int(meta['body_len'])
+
+ if msg_type == MSG_TYPE_ERROR:
+ self._handle_error_message(from_peer, body_len)
+ continue
+
+ if self._expected_sender and from_peer != self._expected_sender:
+ if self._should_log('unexpected_sender', 2.0):
+ self.get_logger().warning(
+ 'Ignoring message from unexpected sender %s (expected %s)'
+ % (from_peer, self._expected_sender)
+ )
+ continue
+
+ if msg_type != MSG_TYPE_BINARY:
+ if self._should_log('unexpected_type', 2.0):
+ self.get_logger().warning(
+ 'Ignoring unexpected message type %d from %s (%d bytes)'
+ % (msg_type, from_peer, body_len)
+ )
+ continue
+
+ if body_len != PACKET_SIZE:
+ if self._should_log('packet_size', 2.0):
+ self.get_logger().warning(
+ 'Dropped binary payload from %s with invalid size %d (expected %d)'
+ % (from_peer, body_len, PACKET_SIZE)
+ )
+ continue
+
+ try:
+ command = unpack_command(self._recv_buffer[:PACKET_SIZE])
+ except ValueError as exc:
+ if self._should_log('decode_error', 2.0):
+ self.get_logger().warning(f'Dropped malformed command payload: {exc}')
+ continue
+
+ with self._lock:
+ self._latest_command = command
+ self._last_packet_monotonic = time.monotonic()
+
+ def _command_for_publish_tick(self) -> tuple[CommandTuple, Optional[float], bool]:
+ with self._lock:
+ latest_command = self._latest_command
+ last_packet_monotonic = self._last_packet_monotonic
+
+ if last_packet_monotonic is None:
+ return ZERO_COMMAND, None, False
+
+ age = time.monotonic() - last_packet_monotonic
+ if age > self._watchdog_timeout:
+ return ZERO_COMMAND, age, True
+ return latest_command, age, False
+
+ def _publish_tick(self) -> None:
+ publish_command, age, timed_out = self._command_for_publish_tick()
+
+ if timed_out and self._last_published_command != ZERO_COMMAND:
+ if self._should_log('watchdog_stop', 2.0):
+ self.get_logger().warning(
+ 'Command stream timed out after %.2f s, publishing zero velocity stop'
+ % age
+ )
+
+ self._publish_command(publish_command)
+
+ def close(self) -> None:
+ self._closing.set()
+ if hasattr(self, '_transport') and self._transport is not None:
+ try:
+ self._transport.close()
+ except OSError as exc:
+ if self._should_log('close_error', 2.0):
+ self.get_logger().warning(f'Closing OmniSocket transport failed: {exc}')
+ self._transport = None
+ if hasattr(self, '_recv_thread') and self._recv_thread.is_alive():
+ self._recv_thread.join(timeout=0.5)
+
+ def destroy_node(self) -> bool:
+ self.close()
+ return super().destroy_node()
+
+
+def main(args: Optional[list[str]] = None) -> None:
+ rclpy.init(args=args)
+ node = UdpCmdVelReceiver()
+ try:
+ rclpy.spin(node)
+ except KeyboardInterrupt:
+ pass
+ finally:
+ node.destroy_node()
+ rclpy.shutdown()
diff --git a/src/peer_udp_client.c b/src/peer_udp_client.c
index 4fc3bb5..9e617ab 100644
--- a/src/peer_udp_client.c
+++ b/src/peer_udp_client.c
@@ -1,6 +1,8 @@
#include "peer_udp_client.h"
+#include
#include
+#include
struct udp_client {
char id[OMNI_MAX_PEER_ID];
@@ -17,6 +19,19 @@ static int client_next_message_id(udp_client_t *client, uint64_t *out_id) {
return 0;
}
+static void udp_client_fill_recv_meta(udp_client_recv_meta_t *meta, const message_t *msg) {
+ if (meta == NULL || msg == NULL) {
+ return;
+ }
+ memset(meta, 0, sizeof(*meta));
+ meta->type = msg->type;
+ meta->id = msg->id;
+ meta->body_len = msg->body_len;
+ snprintf(meta->from, sizeof(meta->from), "%s", msg->from);
+ snprintf(meta->to, sizeof(meta->to), "%s", msg->to);
+ snprintf(meta->file_name, sizeof(meta->file_name), "%s", msg->file_name);
+}
+
static int client_persist_message_to_disk(const message_t *msg, const char *inbox_dir, char *out_path, size_t out_path_len) {
char path[512];
if (omni_ensure_dir(inbox_dir) != 0) {
@@ -70,7 +85,7 @@ static int client_persist_message_to_disk(const message_t *msg, const char *inbo
return 0;
}
-udp_client_t *udp_client_dial(const char *server_addr, const char *peer_id, const char *bind_ip, latency_logger_t *logger, tx_timestamp_debug_logger_t *debug_logger, int enable_timestamping) {
+udp_client_t *udp_client_dial_with_options(const char *server_addr, const char *peer_id, const char *bind_ip, const char *bind_device, latency_logger_t *logger, tx_timestamp_debug_logger_t *debug_logger, int enable_timestamping) {
udp_client_t *client;
message_t register_msg;
client = (udp_client_t *) calloc(1, sizeof(*client));
@@ -80,7 +95,7 @@ udp_client_t *udp_client_dial(const char *server_addr, const char *peer_id, cons
snprintf(client->id, sizeof(client->id), "%s", peer_id);
pthread_mutex_init(&client->id_mu, NULL);
client->logger = logger;
- client->conn = udp_conn_dial(server_addr, bind_ip, NULL, enable_timestamping, logger, OMNI_NODE_ROLE_PEER, peer_id, debug_logger);
+ client->conn = udp_conn_dial(server_addr, bind_ip, bind_device, enable_timestamping, logger, OMNI_NODE_ROLE_PEER, peer_id, debug_logger);
if (client->conn == NULL) {
udp_client_free(client);
return NULL;
@@ -97,6 +112,10 @@ udp_client_t *udp_client_dial(const char *server_addr, const char *peer_id, cons
return client;
}
+udp_client_t *udp_client_dial(const char *server_addr, const char *peer_id, const char *bind_ip, latency_logger_t *logger, tx_timestamp_debug_logger_t *debug_logger, int enable_timestamping) {
+ return udp_client_dial_with_options(server_addr, peer_id, bind_ip, NULL, logger, debug_logger, enable_timestamping);
+}
+
const char *udp_client_id(const udp_client_t *client) {
return client == NULL ? "" : client->id;
}
@@ -124,6 +143,38 @@ int udp_client_send_text(udp_client_t *client, const char *to, const char *text)
return 0;
}
+int udp_client_send_binary(udp_client_t *client, const char *to, const void *data, size_t data_len) {
+ message_t msg;
+ uint64_t id;
+
+ if (client == NULL || to == NULL || (data == NULL && data_len > 0)) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ protocol_message_init(&msg);
+ client_next_message_id(client, &id);
+ msg.type = MSG_TYPE_BINARY;
+ msg.id = id;
+ snprintf(msg.from, sizeof(msg.from), "%s", client->id);
+ snprintf(msg.to, sizeof(msg.to), "%s", to);
+ if (data_len > 0) {
+ msg.body = (uint8_t *) malloc(data_len);
+ if (msg.body == NULL) {
+ return -1;
+ }
+ memcpy(msg.body, data, data_len);
+ }
+ msg.body_len = data_len;
+ latencylog_log_message_event(client->logger, OMNI_NODE_ROLE_PEER, client->id, EVENT_A_APP_PREP_BEGIN, &msg);
+ if (udp_conn_send(client->conn, &msg) != 0) {
+ protocol_message_clear(&msg);
+ return -1;
+ }
+ protocol_message_clear(&msg);
+ return 0;
+}
+
int udp_client_send_file_path(udp_client_t *client, const char *to, const char *path) {
message_t msg;
uint64_t id;
@@ -151,7 +202,34 @@ int udp_client_send_file_path(udp_client_t *client, const char *to, const char *
return 0;
}
-int udp_client_receive(udp_client_t *client, message_t *out_msg) {
+int udp_client_receive_timed(udp_client_t *client, message_t *out_msg, int timeout_ms) {
+ if (client == NULL || out_msg == NULL) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (timeout_ms >= 0) {
+ struct pollfd pfd;
+ int rc;
+
+ memset(&pfd, 0, sizeof(pfd));
+ pfd.fd = udp_conn_fd(client->conn);
+ pfd.events = POLLIN | POLLERR | POLLHUP;
+ do {
+ rc = poll(&pfd, 1, timeout_ms);
+ } while (rc < 0 && errno == EINTR);
+ if (rc == 0) {
+ return 1;
+ }
+ if (rc < 0) {
+ return -1;
+ }
+ if ((pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) != 0 && (pfd.revents & POLLIN) == 0) {
+ errno = ECONNRESET;
+ return -1;
+ }
+ }
+
if (udp_conn_receive(client->conn, out_msg, NULL, NULL) != 0) {
return -1;
}
@@ -159,6 +237,39 @@ int udp_client_receive(udp_client_t *client, message_t *out_msg) {
return 0;
}
+int udp_client_receive(udp_client_t *client, message_t *out_msg) {
+ return udp_client_receive_timed(client, out_msg, -1);
+}
+
+int udp_client_receive_into(udp_client_t *client, void *buffer, size_t buffer_len, udp_client_recv_meta_t *out_meta, int timeout_ms) {
+ message_t msg;
+ int rc;
+
+ if (client == NULL || (buffer == NULL && buffer_len > 0) || out_meta == NULL) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ protocol_message_init(&msg);
+ rc = udp_client_receive_timed(client, &msg, timeout_ms);
+ if (rc != 0) {
+ return rc;
+ }
+
+ udp_client_fill_recv_meta(out_meta, &msg);
+ if (msg.body_len > buffer_len) {
+ protocol_message_clear(&msg);
+ errno = EMSGSIZE;
+ return 2;
+ }
+
+ if (msg.body_len > 0) {
+ memcpy(buffer, msg.body, msg.body_len);
+ }
+ protocol_message_clear(&msg);
+ return 0;
+}
+
int udp_client_persist_message(udp_client_t *client, const message_t *msg, const char *inbox_dir, char *out_path, size_t out_path_len) {
if (!latencylog_is_business_message(msg)) {
errno = EINVAL;
diff --git a/src/transport_kcp.c b/src/transport_kcp.c
index 30778ac..5c473fb 100644
--- a/src/transport_kcp.c
+++ b/src/transport_kcp.c
@@ -1799,6 +1799,8 @@ int kcp_conn_close(kcp_conn_t *conn) {
pthread_mutex_lock(&conn->kcp_mu);
atomic_store(&conn->closed, 1);
if (conn->owns_socket && !conn->socket_closed) {
+ /* Wake the blocking recv thread before closing the shared UDP socket. */
+ (void) shutdown(conn->fd, SHUT_RDWR);
close(conn->fd);
conn->socket_closed = 1;
}
diff --git a/src/transport_udp.c b/src/transport_udp.c
index c296082..49a0e18 100644
--- a/src/transport_udp.c
+++ b/src/transport_udp.c
@@ -3,6 +3,7 @@
#include
#include
#include
+#include
#include
typedef struct udp_pending_tx {
@@ -413,6 +414,13 @@ int udp_conn_receive(udp_conn_t *conn, message_t *out_msg, struct sockaddr_stora
}
n = recvmsg(conn->fd, &msg, 0);
if (n < 0) {
+ if (conn->closed) {
+ errno = ECANCELED;
+ }
+ return -1;
+ }
+ if (n == 0 && conn->closed) {
+ errno = ECANCELED;
return -1;
}
if (conn->timestamping_enabled) {
@@ -454,6 +462,8 @@ int udp_conn_close(udp_conn_t *conn) {
}
if (!conn->closed) {
conn->closed = 1;
+ /* Wake blocking recvmsg()/poll users before tearing down the socket. */
+ (void) shutdown(conn->fd, SHUT_RDWR);
close(conn->fd);
if (conn->errqueue_thread_started) {
pthread_join(conn->errqueue_thread, NULL);