From 9ffc36f50d500b32ef398247359fcfdd3f2c4dc2 Mon Sep 17 00:00:00 2001 From: Mock Date: Fri, 3 Apr 2026 20:00:33 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=9F=BA=E4=BA=8EPython=20ROS2?= =?UTF-8?q?=E7=9A=84=E6=8E=A7=E5=88=B6=E7=A8=8B=E5=BA=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- include/peer_udp_client.h | 12 + python/omnisocket/__init__.py | 2 + python/omnisocket/_omnisocket.c | 326 ++++++++++++++++-- python/omnisocket/omnisocket_client.c | 242 +++++++++++++ python/omnisocket/omnisocket_client.h | 33 ++ python/tests/test_sessions.py | 180 ++++++++++ ros-control-c/README.md | 5 + ros-control-py/README.md | 246 +++++++++++++ ros-control-py/ROS2 Teleop over UDP.md | 147 ++++++++ .../config/xbox_twist_joy.yaml | 32 ++ .../launch/keyboard_sender.launch.py | 34 ++ .../launch/robot_udp_receiver.launch.py | 36 ++ .../launch/xbox_to_udp.launch.py | 74 ++++ ros-control-py/udp_teleop_bridge/package.xml | 24 ++ .../resource/udp_teleop_bridge | 1 + ros-control-py/udp_teleop_bridge/setup.cfg | 5 + ros-control-py/udp_teleop_bridge/setup.py | 34 ++ .../udp_teleop_bridge/test/test_protocol.py | 54 +++ .../udp_teleop_bridge/__init__.py | 1 + .../udp_teleop_bridge/cmd_vel_udp_sender.py | 207 +++++++++++ .../udp_teleop_bridge/omni_transport.py | 95 +++++ .../udp_teleop_bridge/protocol.py | 74 ++++ .../udp_teleop_bridge/udp_cmd_vel_receiver.py | 238 +++++++++++++ src/peer_udp_client.c | 117 ++++++- src/transport_kcp.c | 2 + src/transport_udp.c | 10 + 26 files changed, 2193 insertions(+), 38 deletions(-) create mode 100644 python/tests/test_sessions.py create mode 100644 ros-control-py/README.md create mode 100644 ros-control-py/ROS2 Teleop over UDP.md create mode 100644 ros-control-py/udp_teleop_bridge/config/xbox_twist_joy.yaml create mode 100644 ros-control-py/udp_teleop_bridge/launch/keyboard_sender.launch.py create mode 100644 ros-control-py/udp_teleop_bridge/launch/robot_udp_receiver.launch.py create mode 100644 ros-control-py/udp_teleop_bridge/launch/xbox_to_udp.launch.py create mode 100644 ros-control-py/udp_teleop_bridge/package.xml create mode 100644 ros-control-py/udp_teleop_bridge/resource/udp_teleop_bridge create mode 100644 ros-control-py/udp_teleop_bridge/setup.cfg create mode 100644 ros-control-py/udp_teleop_bridge/setup.py create mode 100644 ros-control-py/udp_teleop_bridge/test/test_protocol.py create mode 100644 ros-control-py/udp_teleop_bridge/udp_teleop_bridge/__init__.py create mode 100644 ros-control-py/udp_teleop_bridge/udp_teleop_bridge/cmd_vel_udp_sender.py create mode 100644 ros-control-py/udp_teleop_bridge/udp_teleop_bridge/omni_transport.py create mode 100644 ros-control-py/udp_teleop_bridge/udp_teleop_bridge/protocol.py create mode 100644 ros-control-py/udp_teleop_bridge/udp_teleop_bridge/udp_cmd_vel_receiver.py diff --git a/include/peer_udp_client.h b/include/peer_udp_client.h index fc8f64f..937e49e 100644 --- a/include/peer_udp_client.h +++ b/include/peer_udp_client.h @@ -8,12 +8,24 @@ extern "C" { #endif typedef struct udp_client udp_client_t; +typedef struct udp_client_recv_meta { + message_type_t type; + uint64_t id; + char from[OMNI_MAX_PEER_ID]; + char to[OMNI_MAX_PEER_ID]; + char file_name[OMNI_MAX_FILE_NAME]; + size_t body_len; +} udp_client_recv_meta_t; +udp_client_t *udp_client_dial_with_options(const char *server_addr, const char *peer_id, const char *bind_ip, const char *bind_device, latency_logger_t *logger, tx_timestamp_debug_logger_t *debug_logger, int enable_timestamping); udp_client_t *udp_client_dial(const char *server_addr, const char *peer_id, const char *bind_ip, latency_logger_t *logger, tx_timestamp_debug_logger_t *debug_logger, int enable_timestamping); const char *udp_client_id(const udp_client_t *client); int udp_client_send_text(udp_client_t *client, const char *to, const char *text); +int udp_client_send_binary(udp_client_t *client, const char *to, const void *data, size_t data_len); int udp_client_send_file_path(udp_client_t *client, const char *to, const char *path); +int udp_client_receive_timed(udp_client_t *client, message_t *out_msg, int timeout_ms); int udp_client_receive(udp_client_t *client, message_t *out_msg); +int udp_client_receive_into(udp_client_t *client, void *buffer, size_t buffer_len, udp_client_recv_meta_t *out_meta, int timeout_ms); int udp_client_persist_message(udp_client_t *client, const message_t *msg, const char *inbox_dir, char *out_path, size_t out_path_len); int udp_client_close(udp_client_t *client); void udp_client_free(udp_client_t *client); diff --git a/python/omnisocket/__init__.py b/python/omnisocket/__init__.py index d9244cf..47fbdd5 100644 --- a/python/omnisocket/__init__.py +++ b/python/omnisocket/__init__.py @@ -6,6 +6,7 @@ try: MSG_TYPE_REGISTER, MSG_TYPE_TEXT, Session, + UdpSession, ) except ImportError as exc: raise ImportError( @@ -41,4 +42,5 @@ __all__ = [ "MSG_TYPE_REGISTER", "MSG_TYPE_TEXT", "Session", + "UdpSession", ] diff --git a/python/omnisocket/_omnisocket.c b/python/omnisocket/_omnisocket.c index 38d852f..5536688 100644 --- a/python/omnisocket/_omnisocket.c +++ b/python/omnisocket/_omnisocket.c @@ -8,6 +8,11 @@ typedef struct PyOmniSession { omnisocket_session_t session; } PyOmniSession; +typedef struct PyOmniUdpSession { + PyObject_HEAD + omnisocket_udp_session_t session; +} PyOmniUdpSession; + PyDoc_STRVAR( PyOmniSession_recv_doc, "recv(timeout_ms=-1) -> (from_peer, msg_type, payload) | None" @@ -22,6 +27,66 @@ PyDoc_STRVAR( "current frame has already been consumed and is lost." ); +static PyObject *build_recv_result(const message_t *msg) { + PyObject *body = NULL; + PyObject *result = NULL; + + body = PyBytes_FromStringAndSize((const char *) msg->body, (Py_ssize_t) msg->body_len); + if (body == NULL) { + return NULL; + } + result = Py_BuildValue("(siO)", msg->from, (int) msg->type, body); + Py_DECREF(body); + return result; +} + +static PyObject *build_recv_meta_dict( + const char *from_peer, + const char *to_peer, + const char *file_name, + int msg_type, + unsigned long long message_id, + unsigned long long body_len +) { + return Py_BuildValue( + "{s:s,s:s,s:s,s:i,s:K,s:K}", + "from", + from_peer, + "to", + to_peer, + "file_name", + file_name, + "msg_type", + msg_type, + "message_id", + message_id, + "body_len", + body_len + ); +} + +static PyObject *build_stats_dict(const omnisocket_session_stats_t *stats) { + return Py_BuildValue( + "{s:K,s:K,s:K,s:K,s:K,s:K,s:K,s:i}", + "send_calls", + (unsigned long long) stats->send_calls, + "send_bytes", + (unsigned long long) stats->send_bytes, + "send_errors", + (unsigned long long) stats->send_errors, + "recv_calls", + (unsigned long long) stats->recv_calls, + "recv_bytes", + (unsigned long long) stats->recv_bytes, + "recv_timeouts", + (unsigned long long) stats->recv_timeouts, + "recv_errors", + (unsigned long long) stats->recv_errors, + "connected", + stats->connected + ); +} + static PyObject *PyOmniSession_new(PyTypeObject *type, PyObject *args, PyObject *kwargs) { PyOmniSession *self; (void) args; @@ -165,7 +230,6 @@ static PyObject *PyOmniSession_recv(PyOmniSession *self, PyObject *args, PyObjec int timeout_ms = -1; int rc; message_t msg; - PyObject *body = NULL; PyObject *result = NULL; static char *kwlist[] = {"timeout_ms", NULL}; @@ -187,13 +251,7 @@ static PyObject *PyOmniSession_recv(PyOmniSession *self, PyObject *args, PyObjec return PyErr_SetFromErrno(PyExc_OSError); } - body = PyBytes_FromStringAndSize((const char *) msg.body, (Py_ssize_t) msg.body_len); - if (body == NULL) { - protocol_message_clear(&msg); - return NULL; - } - result = Py_BuildValue("(siO)", msg.from, (int) msg.type, body); - Py_DECREF(body); + result = build_recv_result(&msg); protocol_message_clear(&msg); return result; } @@ -237,19 +295,12 @@ static PyObject *PyOmniSession_recv_into(PyOmniSession *self, PyObject *args, Py return PyErr_SetFromErrno(PyExc_OSError); } - result = Py_BuildValue( - "{s:s,s:s,s:s,s:i,s:K,s:K}", - "from", + result = build_recv_meta_dict( meta.from, - "to", meta.to, - "file_name", meta.file_name, - "msg_type", (int) meta.type, - "message_id", (unsigned long long) meta.id, - "body_len", (unsigned long long) meta.body_len ); return result; @@ -260,25 +311,7 @@ static PyObject *PyOmniSession_stats(PyOmniSession *self, PyObject *Py_UNUSED(ig memset(&stats, 0, sizeof(stats)); omnisocket_session_stats_snapshot(&self->session, &stats); - return Py_BuildValue( - "{s:K,s:K,s:K,s:K,s:K,s:K,s:K,s:i}", - "send_calls", - (unsigned long long) stats.send_calls, - "send_bytes", - (unsigned long long) stats.send_bytes, - "send_errors", - (unsigned long long) stats.send_errors, - "recv_calls", - (unsigned long long) stats.recv_calls, - "recv_bytes", - (unsigned long long) stats.recv_bytes, - "recv_timeouts", - (unsigned long long) stats.recv_timeouts, - "recv_errors", - (unsigned long long) stats.recv_errors, - "connected", - stats.connected - ); + return build_stats_dict(&stats); } static PyMethodDef PyOmniSession_methods[] = { @@ -295,6 +328,211 @@ static PyTypeObject PyOmniSessionType = { PyVarObject_HEAD_INIT(NULL, 0) }; +static PyObject *PyOmniUdpSession_new(PyTypeObject *type, PyObject *args, PyObject *kwargs) { + PyOmniUdpSession *self; + (void) args; + (void) kwargs; + + self = (PyOmniUdpSession *) type->tp_alloc(type, 0); + if (self == NULL) { + return NULL; + } + if (omnisocket_udp_session_init(&self->session) != 0) { + type->tp_free((PyObject *) self); + return PyErr_SetFromErrno(PyExc_OSError); + } + return (PyObject *) self; +} + +static void PyOmniUdpSession_dealloc(PyOmniUdpSession *self) { + omnisocket_udp_session_destroy(&self->session); + Py_TYPE(self)->tp_free((PyObject *) self); +} + +static PyObject *PyOmniUdpSession_connect(PyOmniUdpSession *self, PyObject *args, PyObject *kwargs) { + const char *server_addr; + const char *peer_id; + const char *bind_ip = ""; + const char *bind_device = ""; + int enable_timestamping = 0; + int rc; + + static char *kwlist[] = { + "server_addr", + "peer_id", + "bind_ip", + "bind_device", + "enable_timestamping", + NULL + }; + + if (!PyArg_ParseTupleAndKeywords( + args, + kwargs, + "ss|ssi", + kwlist, + &server_addr, + &peer_id, + &bind_ip, + &bind_device, + &enable_timestamping)) { + return NULL; + } + + Py_BEGIN_ALLOW_THREADS + rc = omnisocket_udp_session_connect( + &self->session, + server_addr, + peer_id, + bind_ip, + bind_device, + enable_timestamping + ); + Py_END_ALLOW_THREADS + + if (rc != 0) { + return PyErr_SetFromErrno(PyExc_OSError); + } + Py_RETURN_NONE; +} + +static PyObject *PyOmniUdpSession_close(PyOmniUdpSession *self, PyObject *Py_UNUSED(ignored)) { + int rc; + + Py_BEGIN_ALLOW_THREADS + rc = omnisocket_udp_session_close(&self->session); + Py_END_ALLOW_THREADS + + if (rc != 0) { + return PyErr_SetFromErrno(PyExc_OSError); + } + Py_RETURN_NONE; +} + +static PyObject *PyOmniUdpSession_send(PyOmniUdpSession *self, PyObject *args, PyObject *kwargs) { + const char *to; + Py_buffer payload; + int rc; + static char *kwlist[] = {"to", "data", NULL}; + + memset(&payload, 0, sizeof(payload)); + if (!PyArg_ParseTupleAndKeywords(args, kwargs, "sy*", kwlist, &to, &payload)) { + return NULL; + } + + Py_BEGIN_ALLOW_THREADS + rc = omnisocket_udp_session_send(&self->session, to, payload.buf, (size_t) payload.len); + Py_END_ALLOW_THREADS + + PyBuffer_Release(&payload); + if (rc != 0) { + return PyErr_SetFromErrno(PyExc_OSError); + } + Py_RETURN_NONE; +} + +static PyObject *PyOmniUdpSession_recv(PyOmniUdpSession *self, PyObject *args, PyObject *kwargs) { + int timeout_ms = -1; + int rc; + message_t msg; + PyObject *result = NULL; + static char *kwlist[] = {"timeout_ms", NULL}; + + if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|i", kwlist, &timeout_ms)) { + return NULL; + } + + protocol_message_init(&msg); + Py_BEGIN_ALLOW_THREADS + rc = omnisocket_udp_session_recv(&self->session, &msg, timeout_ms); + Py_END_ALLOW_THREADS + + if (rc == 1) { + protocol_message_clear(&msg); + Py_RETURN_NONE; + } + if (rc != 0) { + protocol_message_clear(&msg); + return PyErr_SetFromErrno(PyExc_OSError); + } + + result = build_recv_result(&msg); + protocol_message_clear(&msg); + return result; +} + +static PyObject *PyOmniUdpSession_recv_into(PyOmniUdpSession *self, PyObject *args, PyObject *kwargs) { + PyObject *buffer_obj; + Py_buffer view; + int timeout_ms = -1; + int rc; + udp_client_recv_meta_t meta; + PyObject *result = NULL; + static char *kwlist[] = {"buffer", "timeout_ms", NULL}; + + memset(&view, 0, sizeof(view)); + memset(&meta, 0, sizeof(meta)); + + if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|i", kwlist, &buffer_obj, &timeout_ms)) { + return NULL; + } + if (PyObject_GetBuffer(buffer_obj, &view, PyBUF_WRITABLE) != 0) { + return NULL; + } + + Py_BEGIN_ALLOW_THREADS + rc = omnisocket_udp_session_recv_into(&self->session, view.buf, (size_t) view.len, &meta, timeout_ms); + Py_END_ALLOW_THREADS + + PyBuffer_Release(&view); + if (rc == 1) { + Py_RETURN_NONE; + } + if (rc == 2) { + PyErr_Format( + PyExc_BufferError, + "buffer too small: need %zu bytes; current frame was already consumed and dropped", + meta.body_len + ); + return NULL; + } + if (rc != 0) { + return PyErr_SetFromErrno(PyExc_OSError); + } + + result = build_recv_meta_dict( + meta.from, + meta.to, + meta.file_name, + (int) meta.type, + (unsigned long long) meta.id, + (unsigned long long) meta.body_len + ); + return result; +} + +static PyObject *PyOmniUdpSession_stats(PyOmniUdpSession *self, PyObject *Py_UNUSED(ignored)) { + omnisocket_session_stats_t stats; + + memset(&stats, 0, sizeof(stats)); + omnisocket_udp_session_stats_snapshot(&self->session, &stats); + return build_stats_dict(&stats); +} + +static PyMethodDef PyOmniUdpSession_methods[] = { + {"connect", (PyCFunction) PyOmniUdpSession_connect, METH_VARARGS | METH_KEYWORDS, NULL}, + {"close", (PyCFunction) PyOmniUdpSession_close, METH_NOARGS, NULL}, + {"send", (PyCFunction) PyOmniUdpSession_send, METH_VARARGS | METH_KEYWORDS, NULL}, + {"recv", (PyCFunction) PyOmniUdpSession_recv, METH_VARARGS | METH_KEYWORDS, PyOmniSession_recv_doc}, + {"recv_into", (PyCFunction) PyOmniUdpSession_recv_into, METH_VARARGS | METH_KEYWORDS, PyOmniSession_recv_into_doc}, + {"stats", (PyCFunction) PyOmniUdpSession_stats, METH_NOARGS, NULL}, + {NULL, NULL, 0, NULL} +}; + +static PyTypeObject PyOmniUdpSessionType = { + PyVarObject_HEAD_INIT(NULL, 0) +}; + static PyModuleDef omnisocket_module = { PyModuleDef_HEAD_INIT, .m_name = "_omnisocket", @@ -315,6 +553,17 @@ PyMODINIT_FUNC PyInit__omnisocket(void) { return NULL; } + PyOmniUdpSessionType.tp_name = "omnisocket.UdpSession"; + PyOmniUdpSessionType.tp_basicsize = sizeof(PyOmniUdpSession); + PyOmniUdpSessionType.tp_flags = Py_TPFLAGS_DEFAULT; + PyOmniUdpSessionType.tp_new = PyOmniUdpSession_new; + PyOmniUdpSessionType.tp_dealloc = (destructor) PyOmniUdpSession_dealloc; + PyOmniUdpSessionType.tp_methods = PyOmniUdpSession_methods; + + if (PyType_Ready(&PyOmniUdpSessionType) < 0) { + return NULL; + } + module = PyModule_Create(&omnisocket_module); if (module == NULL) { return NULL; @@ -327,6 +576,13 @@ PyMODINIT_FUNC PyInit__omnisocket(void) { return NULL; } + Py_INCREF(&PyOmniUdpSessionType); + if (PyModule_AddObject(module, "UdpSession", (PyObject *) &PyOmniUdpSessionType) != 0) { + Py_DECREF(&PyOmniUdpSessionType); + Py_DECREF(module); + return NULL; + } + if (PyModule_AddIntConstant(module, "MSG_TYPE_TEXT", MSG_TYPE_TEXT) != 0 || PyModule_AddIntConstant(module, "MSG_TYPE_FILE", MSG_TYPE_FILE) != 0 || PyModule_AddIntConstant(module, "MSG_TYPE_REGISTER", MSG_TYPE_REGISTER) != 0 || diff --git a/python/omnisocket/omnisocket_client.c b/python/omnisocket/omnisocket_client.c index 3557780..3166476 100644 --- a/python/omnisocket/omnisocket_client.c +++ b/python/omnisocket/omnisocket_client.c @@ -246,3 +246,245 @@ void omnisocket_session_stats_snapshot(omnisocket_session_t *session, omnisocket *out_stats = session->stats; pthread_mutex_unlock(&session->mutex); } + +int omnisocket_udp_session_init(omnisocket_udp_session_t *session) { + int rc; + + if (session == NULL) { + errno = EINVAL; + return -1; + } + memset(session, 0, sizeof(*session)); + rc = pthread_mutex_init(&session->mutex, NULL); + if (rc != 0) { + errno = rc; + return -1; + } + rc = pthread_cond_init(&session->idle_cond, NULL); + if (rc != 0) { + pthread_mutex_destroy(&session->mutex); + errno = rc; + return -1; + } + return 0; +} + +void omnisocket_udp_session_destroy(omnisocket_udp_session_t *session) { + if (session == NULL) { + return; + } + (void) omnisocket_udp_session_close(session); + pthread_cond_destroy(&session->idle_cond); + pthread_mutex_destroy(&session->mutex); +} + +static int omnisocket_udp_session_begin_client_op(omnisocket_udp_session_t *session, udp_client_t **out_client) { + if (session == NULL || out_client == NULL) { + errno = EINVAL; + return -1; + } + + pthread_mutex_lock(&session->mutex); + if (session->closing) { + pthread_mutex_unlock(&session->mutex); + errno = ECANCELED; + return -1; + } + if (session->client == NULL) { + pthread_mutex_unlock(&session->mutex); + errno = ENOTCONN; + return -1; + } + *out_client = session->client; + session->active_ops += 1; + pthread_mutex_unlock(&session->mutex); + return 0; +} + +int omnisocket_udp_session_connect( + omnisocket_udp_session_t *session, + const char *server_addr, + const char *peer_id, + const char *bind_ip, + const char *bind_device, + int enable_timestamping +) { + udp_client_t *client; + + if (session == NULL || server_addr == NULL || peer_id == NULL) { + errno = EINVAL; + return -1; + } + + pthread_mutex_lock(&session->mutex); + while (session->closing) { + pthread_cond_wait(&session->idle_cond, &session->mutex); + } + if (session->client != NULL) { + pthread_mutex_unlock(&session->mutex); + errno = EISCONN; + return -1; + } + client = udp_client_dial_with_options( + server_addr, + peer_id, + bind_ip, + bind_device, + NULL, + NULL, + enable_timestamping + ); + if (client == NULL) { + pthread_mutex_unlock(&session->mutex); + return -1; + } + session->client = client; + session->stats.connected = 1; + pthread_mutex_unlock(&session->mutex); + return 0; +} + +int omnisocket_udp_session_close(omnisocket_udp_session_t *session) { + udp_client_t *client; + + if (session == NULL) { + errno = EINVAL; + return -1; + } + + pthread_mutex_lock(&session->mutex); + while (session->closing) { + pthread_cond_wait(&session->idle_cond, &session->mutex); + } + client = session->client; + if (client != NULL) { + session->closing = 1; + session->client = NULL; + } + session->stats.connected = 0; + pthread_mutex_unlock(&session->mutex); + + if (client != NULL) { + udp_client_close(client); + pthread_mutex_lock(&session->mutex); + while (session->active_ops > 0) { + pthread_cond_wait(&session->idle_cond, &session->mutex); + } + pthread_mutex_unlock(&session->mutex); + udp_client_free(client); + pthread_mutex_lock(&session->mutex); + session->closing = 0; + pthread_cond_broadcast(&session->idle_cond); + pthread_mutex_unlock(&session->mutex); + } + return 0; +} + +int omnisocket_udp_session_send(omnisocket_udp_session_t *session, const char *to, const void *data, size_t data_len) { + udp_client_t *client; + int rc; + + if (session == NULL || to == NULL || (data == NULL && data_len > 0)) { + errno = EINVAL; + return -1; + } + + if (omnisocket_udp_session_begin_client_op(session, &client) != 0) { + return -1; + } + rc = udp_client_send_binary(client, to, data, data_len); + pthread_mutex_lock(&session->mutex); + if (rc == 0) { + session->stats.send_calls += 1; + session->stats.send_bytes += (uint64_t) data_len; + } else { + session->stats.send_errors += 1; + } + if (session->active_ops > 0) { + session->active_ops -= 1; + } + if (session->closing && session->active_ops == 0) { + pthread_cond_broadcast(&session->idle_cond); + } + pthread_mutex_unlock(&session->mutex); + return rc; +} + +int omnisocket_udp_session_recv(omnisocket_udp_session_t *session, message_t *out_msg, int timeout_ms) { + udp_client_t *client; + int rc; + + if (session == NULL || out_msg == NULL) { + errno = EINVAL; + return -1; + } + + if (omnisocket_udp_session_begin_client_op(session, &client) != 0) { + return -1; + } + rc = udp_client_receive_timed(client, out_msg, timeout_ms); + pthread_mutex_lock(&session->mutex); + if (rc == 0) { + session->stats.recv_calls += 1; + session->stats.recv_bytes += (uint64_t) out_msg->body_len; + } else if (rc == 1) { + session->stats.recv_timeouts += 1; + } else { + session->stats.recv_errors += 1; + } + if (session->active_ops > 0) { + session->active_ops -= 1; + } + if (session->closing && session->active_ops == 0) { + pthread_cond_broadcast(&session->idle_cond); + } + pthread_mutex_unlock(&session->mutex); + return rc; +} + +int omnisocket_udp_session_recv_into( + omnisocket_udp_session_t *session, + void *buffer, + size_t buffer_len, + udp_client_recv_meta_t *out_meta, + int timeout_ms +) { + udp_client_t *client; + int rc; + + if (session == NULL || out_meta == NULL || (buffer == NULL && buffer_len > 0)) { + errno = EINVAL; + return -1; + } + + if (omnisocket_udp_session_begin_client_op(session, &client) != 0) { + return -1; + } + rc = udp_client_receive_into(client, buffer, buffer_len, out_meta, timeout_ms); + pthread_mutex_lock(&session->mutex); + if (rc == 0) { + session->stats.recv_calls += 1; + session->stats.recv_bytes += (uint64_t) out_meta->body_len; + } else if (rc == 1) { + session->stats.recv_timeouts += 1; + } else { + session->stats.recv_errors += 1; + } + if (session->active_ops > 0) { + session->active_ops -= 1; + } + if (session->closing && session->active_ops == 0) { + pthread_cond_broadcast(&session->idle_cond); + } + pthread_mutex_unlock(&session->mutex); + return rc; +} + +void omnisocket_udp_session_stats_snapshot(omnisocket_udp_session_t *session, omnisocket_session_stats_t *out_stats) { + if (session == NULL || out_stats == NULL) { + return; + } + pthread_mutex_lock(&session->mutex); + *out_stats = session->stats; + pthread_mutex_unlock(&session->mutex); +} diff --git a/python/omnisocket/omnisocket_client.h b/python/omnisocket/omnisocket_client.h index be9e4b2..2ca5fa4 100644 --- a/python/omnisocket/omnisocket_client.h +++ b/python/omnisocket/omnisocket_client.h @@ -2,6 +2,7 @@ #define OMNISOCKET_PY_CLIENT_H #include "peer_kcp_client.h" +#include "peer_udp_client.h" typedef struct omnisocket_session_stats { uint64_t send_calls; @@ -23,6 +24,15 @@ typedef struct omnisocket_session { omnisocket_session_stats_t stats; } omnisocket_session_t; +typedef struct omnisocket_udp_session { + pthread_mutex_t mutex; + pthread_cond_t idle_cond; + udp_client_t *client; + size_t active_ops; + int closing; + omnisocket_session_stats_t stats; +} omnisocket_udp_session_t; + int omnisocket_session_init(omnisocket_session_t *session); void omnisocket_session_destroy(omnisocket_session_t *session); @@ -48,4 +58,27 @@ int omnisocket_session_recv_into( ); void omnisocket_session_stats_snapshot(omnisocket_session_t *session, omnisocket_session_stats_t *out_stats); +int omnisocket_udp_session_init(omnisocket_udp_session_t *session); +void omnisocket_udp_session_destroy(omnisocket_udp_session_t *session); + +int omnisocket_udp_session_connect( + omnisocket_udp_session_t *session, + const char *server_addr, + const char *peer_id, + const char *bind_ip, + const char *bind_device, + int enable_timestamping +); +int omnisocket_udp_session_close(omnisocket_udp_session_t *session); +int omnisocket_udp_session_send(omnisocket_udp_session_t *session, const char *to, const void *data, size_t data_len); +int omnisocket_udp_session_recv(omnisocket_udp_session_t *session, message_t *out_msg, int timeout_ms); +int omnisocket_udp_session_recv_into( + omnisocket_udp_session_t *session, + void *buffer, + size_t buffer_len, + udp_client_recv_meta_t *out_meta, + int timeout_ms +); +void omnisocket_udp_session_stats_snapshot(omnisocket_udp_session_t *session, omnisocket_session_stats_t *out_stats); + #endif diff --git a/python/tests/test_sessions.py b/python/tests/test_sessions.py new file mode 100644 index 0000000..fc56919 --- /dev/null +++ b/python/tests/test_sessions.py @@ -0,0 +1,180 @@ +from __future__ import annotations + +from contextlib import contextmanager +from pathlib import Path +import socket +import subprocess +import sys +import threading +import time + +import pytest + + +pytestmark = pytest.mark.skipif(sys.platform != 'linux', reason='Linux-only OmniSocket extension') + +ROOT = Path(__file__).resolve().parents[2] +PYTHON_ROOT = ROOT / 'python' +if str(PYTHON_ROOT) not in sys.path: + sys.path.insert(0, str(PYTHON_ROOT)) + +omnisocket = pytest.importorskip('omnisocket') + +CONTROL_DEFAULTS = omnisocket.CONTROL_DEFAULTS +MSG_TYPE_BINARY = omnisocket.MSG_TYPE_BINARY +Session = omnisocket.Session +UdpSession = omnisocket.UdpSession + + +def _reserve_port() -> int: + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: + sock.bind(('127.0.0.1', 0)) + return int(sock.getsockname()[1]) + + +@contextmanager +def _run_server(binary_name: str, listen_addr: str): + binary = ROOT / 'bin' / binary_name + if not binary.exists(): + pytest.skip(f'{binary} is not built') + + process = subprocess.Popen( + [str(binary), '-listen', listen_addr], + cwd=str(ROOT), + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ) + try: + time.sleep(0.2) + yield process + finally: + process.terminate() + try: + process.wait(timeout=2.0) + except subprocess.TimeoutExpired: + process.kill() + process.wait(timeout=2.0) + + +def _connect_with_retry(session_cls, *, transport: str, server_addr: str, peer_id: str): + deadline = time.monotonic() + 3.0 + last_error: Exception | None = None + + while time.monotonic() < deadline: + session = session_cls() + try: + kwargs: dict[str, object] = { + 'server_addr': server_addr, + 'peer_id': peer_id, + } + if transport == 'kcp': + kwargs.update(CONTROL_DEFAULTS) + else: + kwargs['enable_timestamping'] = False + session.connect(**kwargs) + return session + except OSError as exc: + last_error = exc + time.sleep(0.1) + + raise AssertionError(f'failed to connect {peer_id} to {server_addr}: {last_error}') + + +@pytest.mark.parametrize( + ('transport', 'binary_name', 'session_cls'), + [ + ('udp', 'udpserver', UdpSession), + ('kcp', 'kcpserver', Session), + ], +) +def test_control_sessions_smoke(transport: str, binary_name: str, session_cls) -> None: + port = _reserve_port() + listen_addr = f'127.0.0.1:{port}' + sender_id = f'pytest-{transport}-sender' + receiver_id = f'pytest-{transport}-receiver' + + with _run_server(binary_name, listen_addr): + sender = _connect_with_retry(session_cls, transport=transport, server_addr=listen_addr, peer_id=sender_id) + receiver = _connect_with_retry(session_cls, transport=transport, server_addr=listen_addr, peer_id=receiver_id) + + try: + assert receiver.recv(timeout_ms=20) is None + + payload = b'control-packet-1' + sender.send(to=receiver_id, data=payload) + from_peer, msg_type, recv_payload = receiver.recv(timeout_ms=1000) + assert from_peer == sender_id + assert msg_type == MSG_TYPE_BINARY + assert recv_payload == payload + + payload2 = b'control-packet-2' + sender.send(to=receiver_id, data=payload2) + recv_buffer = bytearray(128) + meta = receiver.recv_into(buffer=recv_buffer, timeout_ms=1000) + assert meta is not None + assert meta['from'] == sender_id + assert meta['msg_type'] == MSG_TYPE_BINARY + assert meta['body_len'] == len(payload2) + assert bytes(recv_buffer[: meta['body_len']]) == payload2 + + sender_stats = sender.stats() + receiver_stats = receiver.stats() + assert sender_stats['connected'] == 1 + assert receiver_stats['connected'] == 1 + assert sender_stats['send_calls'] >= 2 + assert receiver_stats['recv_calls'] >= 2 + finally: + sender.close() + receiver.close() + + +def test_udp_session_close_interrupts_blocking_recv() -> None: + port = _reserve_port() + listen_addr = f'127.0.0.1:{port}' + receiver_id = 'pytest-udp-blocking-recv' + + with _run_server('udpserver', listen_addr): + receiver = _connect_with_retry( + UdpSession, + transport='udp', + server_addr=listen_addr, + peer_id=receiver_id, + ) + + recv_error: list[BaseException] = [] + close_error: list[BaseException] = [] + recv_started = threading.Event() + recv_done = threading.Event() + close_done = threading.Event() + + def recv_worker() -> None: + recv_started.set() + try: + receiver.recv() + except BaseException as exc: # pragma: no cover - assertion is on thread completion + recv_error.append(exc) + finally: + recv_done.set() + + def close_worker() -> None: + try: + receiver.close() + except BaseException as exc: # pragma: no cover - assertion is on thread completion + close_error.append(exc) + finally: + close_done.set() + + recv_thread = threading.Thread(target=recv_worker, daemon=True) + recv_thread.start() + assert recv_started.wait(timeout=1.0) + time.sleep(0.05) + + close_thread = threading.Thread(target=close_worker, daemon=True) + close_thread.start() + + assert close_done.wait(timeout=1.0), 'UdpSession.close() blocked while recv() was waiting' + assert recv_done.wait(timeout=1.0), 'UdpSession.recv() stayed blocked after close()' + assert not close_thread.is_alive() + assert not recv_thread.is_alive() + assert not close_error + assert not recv_error or isinstance(recv_error[0], OSError) diff --git a/ros-control-c/README.md b/ros-control-c/README.md index 98b4726..b280639 100644 --- a/ros-control-c/README.md +++ b/ros-control-c/README.md @@ -5,6 +5,11 @@ - `udp` (default): unchanged from the original implementation - `kcp`: sent through OmniSocket using `MSG_TYPE_BINARY` +Note: + +- This README documents the `ros-control-c` path only. +- `ros-control-py` now uses OmniSocket for both `transport:=udp` and `transport:=kcp`; its `udp` mode is no longer raw socket UDP. + ## Build On Linux: diff --git a/ros-control-py/README.md b/ros-control-py/README.md new file mode 100644 index 0000000..1e95736 --- /dev/null +++ b/ros-control-py/README.md @@ -0,0 +1,246 @@ +# ROS2 Teleop over OmniSocket UDP/KCP + +`ros-control-py/udp_teleop_bridge` 现在把 teleop 控制流统一接到 OmniSocket peer 传输上。 + +- `transport:=udp` 表示 OmniSocket UDP,经 `udpserver/udppeer` 的消息协议传输 +- `transport:=kcp` 表示 OmniSocket KCP,经 `kcpserver/kcppeer` 的消息协议传输 +- 不再使用原来的裸 `socket.sendto()/recvfrom()` UDP 路径 + +机器人最终接收的话题保持不变: + +- topic: `/hric/robot/cmd_vel` +- type: `geometry_msgs/msg/TwistStamped` +- frame_id: `pelvis` + +控制负载也保持不变: + +- fixed payload: 24-byte little-endian `<6f>` +- order: `lx, ly, lz, ax, ay, az` + +## 目录 + +- `udp_teleop_bridge/udp_teleop_bridge/cmd_vel_udp_sender.py`: 订阅 `TwistStamped`,经 OmniSocket 发送 24 字节控制包 +- `udp_teleop_bridge/udp_teleop_bridge/udp_cmd_vel_receiver.py`: 从 OmniSocket 接收控制包,补时间戳并发布到机器人 ROS2 topic +- `udp_teleop_bridge/udp_teleop_bridge/omni_transport.py`: 统一封装 OmniSocket UDP/KCP session +- `udp_teleop_bridge/config/xbox_twist_joy.yaml`: Xbox 手柄映射 +- `udp_teleop_bridge/launch/*.launch.py`: Linux 启动入口 + +## Linux 构建 + +先安装 ROS 2 官方 teleop 依赖: + +```bash +sudo apt install ros-${ROS_DISTRO}-joy ros-${ROS_DISTRO}-teleop-twist-joy ros-${ROS_DISTRO}-teleop-twist-keyboard +``` + +再构建并安装 OmniSocket Python 扩展: + +```bash +make python-ext +make python-install +``` + +最后构建 ROS 包: + +```bash +colcon build --packages-select udp_teleop_bridge +source install/setup.bash +``` + +如果 `omnisocket` 没有安装到当前 ROS Python 环境,sender/receiver 会直接报错退出。 + +## 先验证机器人控制语义 + +在机器人本机先直接低速发布 `/hric/robot/cmd_vel`,确认 `linear.x`、`linear.y`、`angular.z` 的物理方向符合预期: + +```bash +ros2 topic pub /hric/robot/cmd_vel geometry_msgs/msg/TwistStamped \ + "{header: {frame_id: pelvis}, twist: {linear: {x: 0.10, y: 0.0, z: 0.0}, angular: {x: 0.0, y: 0.0, z: 0.0}}}" \ + -r 20 +``` + +```bash +ros2 topic pub /hric/robot/cmd_vel geometry_msgs/msg/TwistStamped \ + "{header: {frame_id: pelvis}, twist: {linear: {x: 0.0, y: 0.10, z: 0.0}, angular: {x: 0.0, y: 0.0, z: 0.0}}}" \ + -r 20 +``` + +```bash +ros2 topic pub /hric/robot/cmd_vel geometry_msgs/msg/TwistStamped \ + "{header: {frame_id: pelvis}, twist: {linear: {x: 0.0, y: 0.0, z: 0.0}, angular: {x: 0.0, y: 0.0, z: 0.30}}}" \ + -r 20 +``` + +停止: + +```bash +ros2 topic pub --once /hric/robot/cmd_vel geometry_msgs/msg/TwistStamped \ + "{header: {frame_id: pelvis}, twist: {linear: {x: 0.0, y: 0.0, z: 0.0}, angular: {x: 0.0, y: 0.0, z: 0.0}}}" +``` + +## 启动 OmniSocket Hub + +OmniSocket UDP: + +```bash +./bin/udpserver -listen :9001 +``` + +OmniSocket KCP: + +```bash +./bin/kcpserver -listen :9002 +``` + +`server_addr` 不传时,节点会按 `transport` 自动选择默认值: + +- `udp` -> `127.0.0.1:9001` +- `kcp` -> `127.0.0.1:9002` + +`relay_via` 只在 `transport:=kcp` 时生效。 + +## 机器人端运行 + +UDP: + +```bash +ros2 launch udp_teleop_bridge robot_udp_receiver.launch.py \ + transport:=udp \ + server_addr:=127.0.0.1:9001 \ + peer_id:=ros-bridge-ctrl \ + output_topic:=/hric/robot/cmd_vel \ + frame_id:=pelvis \ + watchdog_timeout:=0.5 +``` + +KCP: + +```bash +ros2 launch udp_teleop_bridge robot_udp_receiver.launch.py \ + transport:=kcp \ + server_addr:=127.0.0.1:9002 \ + peer_id:=ros-bridge-ctrl \ + output_topic:=/hric/robot/cmd_vel \ + frame_id:=pelvis \ + watchdog_timeout:=0.5 +``` + +如果只允许某个 sender 控制,可以加: + +```bash +expected_sender:=ros-keyboard-ctrl +``` + +## 控制端键盘运行 + +终端 A,启动 sender: + +```bash +ros2 launch udp_teleop_bridge keyboard_sender.launch.py \ + transport:=udp \ + server_addr:=127.0.0.1:9001 \ + peer_id:=ros-keyboard-ctrl \ + target_peer:=ros-bridge-ctrl +``` + +如果走 KCP: + +```bash +ros2 launch udp_teleop_bridge keyboard_sender.launch.py \ + transport:=kcp \ + server_addr:=127.0.0.1:9002 \ + peer_id:=ros-keyboard-ctrl \ + target_peer:=ros-bridge-ctrl +``` + +终端 B,启动官方键盘 teleop: + +```bash +ros2 run teleop_twist_keyboard teleop_twist_keyboard --ros-args \ + --remap cmd_vel:=/teleop/cmd_vel \ + -p stamped:=true \ + -p frame_id:=pelvis \ + -p speed:=0.20 \ + -p turn:=0.60 +``` + +键盘默认键位(`teleop_twist_keyboard`,建议使用 US 键盘布局): + +- `i`: 前进(`linear.x > 0`) +- `,`: 后退(`linear.x < 0`) +- `j`: 左转(`angular.z > 0`) +- `l`: 右转(`angular.z < 0`) +- `Shift + J`: 左平移(`linear.y > 0`) +- `Shift + L`: 右平移(`linear.y < 0`) +- `u` / `o` / `m` / `.`: 组合前进或后退加转向 +- `k` 或其他未映射按键: 停止 +- `q` / `z`: 整体速度增加 / 降低 10% +- `w` / `x`: 仅线速度增加 / 降低 10% +- `e` / `c`: 仅角速度增加 / 降低 10% +- `Ctrl-C`: 退出键盘 teleop + +## 控制端 Xbox 手柄运行 + +UDP: + +```bash +ros2 launch udp_teleop_bridge xbox_to_udp.launch.py \ + transport:=udp \ + server_addr:=127.0.0.1:9001 \ + peer_id:=ros-gamepad-ctrl \ + target_peer:=ros-bridge-ctrl \ + joy_dev:=/dev/input/js0 \ + frame_id:=pelvis +``` + +KCP: + +```bash +ros2 launch udp_teleop_bridge xbox_to_udp.launch.py \ + transport:=kcp \ + server_addr:=127.0.0.1:9002 \ + peer_id:=ros-gamepad-ctrl \ + target_peer:=ros-bridge-ctrl \ + joy_dev:=/dev/input/js0 \ + frame_id:=pelvis +``` + +当前默认手柄映射: + +- 左摇杆上下 -> `linear.x` +- 左摇杆左右 -> `linear.y` +- 右摇杆左右 -> `angular.z` +- `RB` 按住才允许运动 +- `LB` 为 turbo + +手柄实际操控含义(基于 `config/xbox_twist_joy.yaml` 的 Xbox 默认映射): + +- 左摇杆向前 / 向后: 前进 / 后退 +- 左摇杆向左 / 向右: 左平移 / 右平移 +- 右摇杆向左 / 向右: 左转 / 右转 +- 按住 `RB`: 以常速启用运动输出 +- 同时按住 `LB` + `RB`: 启用 turbo,更高的线速度和角速度 +- 松开 `RB` 或将摇杆回中: 输出回到零速 + +## 数据流 + +键盘链路: + +```text +teleop_twist_keyboard -> /teleop/cmd_vel (TwistStamped) -> cmd_vel_udp_sender -> OmniSocket UDP/KCP -> udp_cmd_vel_receiver -> /hric/robot/cmd_vel +``` + +手柄链路: + +```text +joy_node -> teleop_twist_joy -> /teleop/cmd_vel (TwistStamped) -> cmd_vel_udp_sender -> OmniSocket UDP/KCP -> udp_cmd_vel_receiver -> /hric/robot/cmd_vel +``` + +## 安全行为 + +- sender 默认按 20 Hz 重发最新命令 +- sender 输入超时后会改发零速 +- sender 退出时会主动发送数个零速控制包 +- receiver 超时后会在 ROS 主线程发布零速 stop +- receiver 只接受 `MSG_TYPE_BINARY` 且长度为 24 字节的负载 +- 非预期 sender、非 binary 消息、错误长度消息都会被丢弃并记录日志 diff --git a/ros-control-py/ROS2 Teleop over UDP.md b/ros-control-py/ROS2 Teleop over UDP.md new file mode 100644 index 0000000..6f10d33 --- /dev/null +++ b/ros-control-py/ROS2 Teleop over UDP.md @@ -0,0 +1,147 @@ +## ROS2 Teleop over OmniSocket UDP/KCP + +这个文档对应 `ros-control-py/udp_teleop_bridge` 的当前实现。 + +核心变化: + +- `transport:=udp` 现在表示 OmniSocket UDP +- `transport:=kcp` 表示 OmniSocket KCP +- 不再使用原来的裸 `socket` UDP 实现 + +控制接口保持不变: + +- topic: `/hric/robot/cmd_vel` +- type: `geometry_msgs/msg/TwistStamped` +- frame_id: `pelvis` +- payload: fixed 24-byte little-endian `<6f>` + +负载顺序: + +`lx, ly, lz, ax, ay, az` + +### 构建顺序 + +```bash +make python-ext +make python-install +``` + +```bash +colcon build --packages-select udp_teleop_bridge +source install/setup.bash +``` + +### 启动 Hub + +OmniSocket UDP: + +```bash +./bin/udpserver -listen :9001 +``` + +OmniSocket KCP: + +```bash +./bin/kcpserver -listen :9002 +``` + +### 机器人端 Receiver + +```bash +ros2 launch udp_teleop_bridge robot_udp_receiver.launch.py \ + transport:=udp \ + server_addr:=127.0.0.1:9001 \ + peer_id:=ros-bridge-ctrl \ + output_topic:=/hric/robot/cmd_vel \ + frame_id:=pelvis \ + watchdog_timeout:=0.5 +``` + +KCP 只需把 `transport` 和 `server_addr` 改成: + +```bash +transport:=kcp server_addr:=127.0.0.1:9002 +``` + +只接受指定 sender: + +```bash +expected_sender:=ros-keyboard-ctrl +``` + +### 键盘 Sender + +```bash +ros2 launch udp_teleop_bridge keyboard_sender.launch.py \ + transport:=udp \ + server_addr:=127.0.0.1:9001 \ + peer_id:=ros-keyboard-ctrl \ + target_peer:=ros-bridge-ctrl +``` + +```bash +ros2 run teleop_twist_keyboard teleop_twist_keyboard --ros-args \ + --remap cmd_vel:=/teleop/cmd_vel \ + -p stamped:=true \ + -p frame_id:=pelvis \ + -p speed:=0.20 \ + -p turn:=0.60 +``` + +### Xbox Sender + +```bash +ros2 launch udp_teleop_bridge xbox_to_udp.launch.py \ + transport:=udp \ + server_addr:=127.0.0.1:9001 \ + peer_id:=ros-gamepad-ctrl \ + target_peer:=ros-bridge-ctrl \ + joy_dev:=/dev/input/js0 \ + frame_id:=pelvis +``` + +### 参数语义 + +- sender: + - `transport` + - `server_addr` + - `relay_via` + - `peer_id` + - `target_peer` + - `input_topic` + - `send_rate_hz` + - `input_timeout` +- receiver: + - `transport` + - `server_addr` + - `relay_via` + - `peer_id` + - `expected_sender` + - `output_topic` + - `frame_id` + - `watchdog_timeout` + - `publish_rate_hz` + +`server_addr` 省略时,会按 transport 自动选择: + +- `udp` -> `127.0.0.1:9001` +- `kcp` -> `127.0.0.1:9002` + +### 数据流 + +```text +teleop_twist_keyboard / teleop_twist_joy + -> /teleop/cmd_vel (TwistStamped) + -> cmd_vel_udp_sender + -> OmniSocket UDP/KCP binary message + -> udp_cmd_vel_receiver + -> /hric/robot/cmd_vel +``` + +### 安全与约束 + +- sender 默认 20 Hz 重发 +- sender 输入超时后改发零速 +- receiver watchdog 超时后发零速 stop +- receiver 只接受 24 字节 binary 负载 +- `relay_via` 只在 KCP 模式有效 diff --git a/ros-control-py/udp_teleop_bridge/config/xbox_twist_joy.yaml b/ros-control-py/udp_teleop_bridge/config/xbox_twist_joy.yaml new file mode 100644 index 0000000..c48735e --- /dev/null +++ b/ros-control-py/udp_teleop_bridge/config/xbox_twist_joy.yaml @@ -0,0 +1,32 @@ +/**: + ros__parameters: + require_enable_button: true + enable_button: 5 + enable_turbo_button: 4 + axis_linear: + x: 1 + y: 0 + z: -1 + scale_linear: + x: -0.30 + y: -0.25 + z: 0.0 + scale_linear_turbo: + x: -0.60 + y: -0.45 + z: 0.0 + axis_angular: + yaw: 3 + pitch: -1 + roll: -1 + scale_angular: + yaw: -0.80 + pitch: 0.0 + roll: 0.0 + scale_angular_turbo: + yaw: -1.20 + pitch: 0.0 + roll: 0.0 + inverted_reverse: false + publish_stamped_twist: true + frame: pelvis diff --git a/ros-control-py/udp_teleop_bridge/launch/keyboard_sender.launch.py b/ros-control-py/udp_teleop_bridge/launch/keyboard_sender.launch.py new file mode 100644 index 0000000..49d7667 --- /dev/null +++ b/ros-control-py/udp_teleop_bridge/launch/keyboard_sender.launch.py @@ -0,0 +1,34 @@ +from launch import LaunchDescription +from launch.actions import DeclareLaunchArgument +from launch.substitutions import LaunchConfiguration +from launch_ros.actions import Node +from launch_ros.parameter_descriptions import ParameterValue + + +def generate_launch_description() -> LaunchDescription: + return LaunchDescription([ + DeclareLaunchArgument('transport', default_value='udp'), + DeclareLaunchArgument('server_addr', default_value=''), + DeclareLaunchArgument('relay_via', default_value=''), + DeclareLaunchArgument('peer_id', default_value='ros-keyboard-ctrl'), + DeclareLaunchArgument('target_peer', default_value='ros-bridge-ctrl'), + DeclareLaunchArgument('input_topic', default_value='/teleop/cmd_vel'), + DeclareLaunchArgument('send_rate_hz', default_value='20.0'), + DeclareLaunchArgument('input_timeout', default_value='0.75'), + Node( + package='udp_teleop_bridge', + executable='cmd_vel_udp_sender', + name='cmd_vel_udp_sender', + output='screen', + parameters=[{ + 'transport': LaunchConfiguration('transport'), + 'server_addr': LaunchConfiguration('server_addr'), + 'relay_via': LaunchConfiguration('relay_via'), + 'peer_id': LaunchConfiguration('peer_id'), + 'target_peer': LaunchConfiguration('target_peer'), + 'input_topic': LaunchConfiguration('input_topic'), + 'send_rate_hz': ParameterValue(LaunchConfiguration('send_rate_hz'), value_type=float), + 'input_timeout': ParameterValue(LaunchConfiguration('input_timeout'), value_type=float), + }], + ), + ]) diff --git a/ros-control-py/udp_teleop_bridge/launch/robot_udp_receiver.launch.py b/ros-control-py/udp_teleop_bridge/launch/robot_udp_receiver.launch.py new file mode 100644 index 0000000..c27acc4 --- /dev/null +++ b/ros-control-py/udp_teleop_bridge/launch/robot_udp_receiver.launch.py @@ -0,0 +1,36 @@ +from launch import LaunchDescription +from launch.actions import DeclareLaunchArgument +from launch.substitutions import LaunchConfiguration +from launch_ros.actions import Node +from launch_ros.parameter_descriptions import ParameterValue + + +def generate_launch_description() -> LaunchDescription: + return LaunchDescription([ + DeclareLaunchArgument('transport', default_value='udp'), + DeclareLaunchArgument('server_addr', default_value=''), + DeclareLaunchArgument('relay_via', default_value=''), + DeclareLaunchArgument('peer_id', default_value='ros-bridge-ctrl'), + DeclareLaunchArgument('expected_sender', default_value=''), + DeclareLaunchArgument('output_topic', default_value='/hric/robot/cmd_vel'), + DeclareLaunchArgument('frame_id', default_value='pelvis'), + DeclareLaunchArgument('watchdog_timeout', default_value='0.5'), + DeclareLaunchArgument('publish_rate_hz', default_value='100.0'), + Node( + package='udp_teleop_bridge', + executable='udp_cmd_vel_receiver', + name='udp_cmd_vel_receiver', + output='screen', + parameters=[{ + 'transport': LaunchConfiguration('transport'), + 'server_addr': LaunchConfiguration('server_addr'), + 'relay_via': LaunchConfiguration('relay_via'), + 'peer_id': LaunchConfiguration('peer_id'), + 'expected_sender': LaunchConfiguration('expected_sender'), + 'output_topic': LaunchConfiguration('output_topic'), + 'frame_id': LaunchConfiguration('frame_id'), + 'watchdog_timeout': ParameterValue(LaunchConfiguration('watchdog_timeout'), value_type=float), + 'publish_rate_hz': ParameterValue(LaunchConfiguration('publish_rate_hz'), value_type=float), + }], + ), + ]) diff --git a/ros-control-py/udp_teleop_bridge/launch/xbox_to_udp.launch.py b/ros-control-py/udp_teleop_bridge/launch/xbox_to_udp.launch.py new file mode 100644 index 0000000..d9038d0 --- /dev/null +++ b/ros-control-py/udp_teleop_bridge/launch/xbox_to_udp.launch.py @@ -0,0 +1,74 @@ +from launch import LaunchDescription +from launch.actions import DeclareLaunchArgument +from launch.substitutions import LaunchConfiguration, PathJoinSubstitution +from launch_ros.actions import Node +from launch_ros.parameter_descriptions import ParameterValue +from launch_ros.substitutions import FindPackageShare + + +def generate_launch_description() -> LaunchDescription: + teleop_config = PathJoinSubstitution([ + FindPackageShare('udp_teleop_bridge'), + 'config', + 'xbox_twist_joy.yaml', + ]) + + teleop_topic = LaunchConfiguration('teleop_topic') + + return LaunchDescription([ + DeclareLaunchArgument('transport', default_value='udp'), + DeclareLaunchArgument('server_addr', default_value=''), + DeclareLaunchArgument('relay_via', default_value=''), + DeclareLaunchArgument('peer_id', default_value='ros-gamepad-ctrl'), + DeclareLaunchArgument('target_peer', default_value='ros-bridge-ctrl'), + DeclareLaunchArgument('joy_dev', default_value='/dev/input/js0'), + DeclareLaunchArgument('deadzone', default_value='0.10'), + DeclareLaunchArgument('autorepeat_rate', default_value='20.0'), + DeclareLaunchArgument('frame_id', default_value='pelvis'), + DeclareLaunchArgument('teleop_topic', default_value='/teleop/cmd_vel'), + DeclareLaunchArgument('send_rate_hz', default_value='20.0'), + DeclareLaunchArgument('input_timeout', default_value='0.30'), + Node( + package='joy', + executable='joy_node', + name='joy_node', + output='screen', + parameters=[{ + 'dev': LaunchConfiguration('joy_dev'), + 'deadzone': ParameterValue(LaunchConfiguration('deadzone'), value_type=float), + 'autorepeat_rate': ParameterValue(LaunchConfiguration('autorepeat_rate'), value_type=float), + }], + ), + Node( + package='teleop_twist_joy', + executable='teleop_node', + name='teleop_twist_joy', + output='screen', + parameters=[ + teleop_config, + { + 'publish_stamped_twist': True, + 'frame': LaunchConfiguration('frame_id'), + }, + ], + remappings=[ + ('cmd_vel', teleop_topic), + ], + ), + Node( + package='udp_teleop_bridge', + executable='cmd_vel_udp_sender', + name='cmd_vel_udp_sender', + output='screen', + parameters=[{ + 'transport': LaunchConfiguration('transport'), + 'server_addr': LaunchConfiguration('server_addr'), + 'relay_via': LaunchConfiguration('relay_via'), + 'peer_id': LaunchConfiguration('peer_id'), + 'target_peer': LaunchConfiguration('target_peer'), + 'input_topic': teleop_topic, + 'send_rate_hz': ParameterValue(LaunchConfiguration('send_rate_hz'), value_type=float), + 'input_timeout': ParameterValue(LaunchConfiguration('input_timeout'), value_type=float), + }], + ), + ]) diff --git a/ros-control-py/udp_teleop_bridge/package.xml b/ros-control-py/udp_teleop_bridge/package.xml new file mode 100644 index 0000000..28262bd --- /dev/null +++ b/ros-control-py/udp_teleop_bridge/package.xml @@ -0,0 +1,24 @@ + + + udp_teleop_bridge + 0.1.0 + ROS 2 OmniSocket UDP/KCP bridge for teleop TwistStamped commands. + + Codex + MIT + + ament_python + + ament_index_python + geometry_msgs + joy + launch + launch_ros + rclpy + teleop_twist_joy + teleop_twist_keyboard + + + ament_python + + diff --git a/ros-control-py/udp_teleop_bridge/resource/udp_teleop_bridge b/ros-control-py/udp_teleop_bridge/resource/udp_teleop_bridge new file mode 100644 index 0000000..9cc185f --- /dev/null +++ b/ros-control-py/udp_teleop_bridge/resource/udp_teleop_bridge @@ -0,0 +1 @@ +udp_teleop_bridge diff --git a/ros-control-py/udp_teleop_bridge/setup.cfg b/ros-control-py/udp_teleop_bridge/setup.cfg new file mode 100644 index 0000000..8f79a94 --- /dev/null +++ b/ros-control-py/udp_teleop_bridge/setup.cfg @@ -0,0 +1,5 @@ +[develop] +script_dir=$base/lib/udp_teleop_bridge + +[install] +install_scripts=$base/lib/udp_teleop_bridge diff --git a/ros-control-py/udp_teleop_bridge/setup.py b/ros-control-py/udp_teleop_bridge/setup.py new file mode 100644 index 0000000..4957d6d --- /dev/null +++ b/ros-control-py/udp_teleop_bridge/setup.py @@ -0,0 +1,34 @@ +from setuptools import find_packages, setup + + +package_name = 'udp_teleop_bridge' + + +setup( + name=package_name, + version='0.1.0', + packages=find_packages(exclude=['test']), + data_files=[ + ('share/ament_index/resource_index/packages', [f'resource/{package_name}']), + (f'share/{package_name}', ['package.xml']), + (f'share/{package_name}/launch', [ + 'launch/keyboard_sender.launch.py', + 'launch/robot_udp_receiver.launch.py', + 'launch/xbox_to_udp.launch.py', + ]), + (f'share/{package_name}/config', ['config/xbox_twist_joy.yaml']), + ], + install_requires=['setuptools'], + zip_safe=True, + maintainer='Codex', + maintainer_email='codex@example.com', + description='ROS 2 OmniSocket UDP/KCP bridge for teleop TwistStamped commands.', + license='MIT', + tests_require=['pytest'], + entry_points={ + 'console_scripts': [ + 'cmd_vel_udp_sender = udp_teleop_bridge.cmd_vel_udp_sender:main', + 'udp_cmd_vel_receiver = udp_teleop_bridge.udp_cmd_vel_receiver:main', + ], + }, +) diff --git a/ros-control-py/udp_teleop_bridge/test/test_protocol.py b/ros-control-py/udp_teleop_bridge/test/test_protocol.py new file mode 100644 index 0000000..87cfbe1 --- /dev/null +++ b/ros-control-py/udp_teleop_bridge/test/test_protocol.py @@ -0,0 +1,54 @@ +from pathlib import Path +import sys + +import pytest + + +ROOT = Path(__file__).resolve().parents[1] +if str(ROOT) not in sys.path: + sys.path.insert(0, str(ROOT)) + +from udp_teleop_bridge.protocol import ( # noqa: E402 + PACKET_SIZE, + default_server_addr_for_transport, + normalize_command, + normalize_transport, + pack_command, + unpack_command, +) + + +def test_pack_unpack_round_trip() -> None: + command = (0.1, -0.2, 0.3, -0.4, 0.5, -0.6) + + payload = pack_command(command) + + assert len(payload) == PACKET_SIZE + assert unpack_command(payload) == pytest.approx(command) + + +@pytest.mark.parametrize('value', [float('nan'), float('inf'), float('-inf')]) +def test_normalize_command_rejects_non_finite_values(value: float) -> None: + with pytest.raises(ValueError, match='non-finite'): + normalize_command((0.0, 0.0, value, 0.0, 0.0, 0.0)) + + +def test_unpack_command_rejects_wrong_length() -> None: + with pytest.raises(ValueError, match='Expected'): + unpack_command(b'\x00' * (PACKET_SIZE - 1)) + + +@pytest.mark.parametrize( + ('transport', 'expected'), + [ + ('udp', '127.0.0.1:9001'), + ('kcp', '127.0.0.1:9002'), + ], +) +def test_default_server_addr_for_transport(transport: str, expected: str) -> None: + assert default_server_addr_for_transport(transport) == expected + + +def test_normalize_transport_rejects_unknown_value() -> None: + with pytest.raises(ValueError, match='Unsupported transport'): + normalize_transport('sctp') diff --git a/ros-control-py/udp_teleop_bridge/udp_teleop_bridge/__init__.py b/ros-control-py/udp_teleop_bridge/udp_teleop_bridge/__init__.py new file mode 100644 index 0000000..094feaf --- /dev/null +++ b/ros-control-py/udp_teleop_bridge/udp_teleop_bridge/__init__.py @@ -0,0 +1 @@ +"""OmniSocket teleop bridge package.""" diff --git a/ros-control-py/udp_teleop_bridge/udp_teleop_bridge/cmd_vel_udp_sender.py b/ros-control-py/udp_teleop_bridge/udp_teleop_bridge/cmd_vel_udp_sender.py new file mode 100644 index 0000000..34a5ce1 --- /dev/null +++ b/ros-control-py/udp_teleop_bridge/udp_teleop_bridge/cmd_vel_udp_sender.py @@ -0,0 +1,207 @@ +"""ROS 2 node that forwards TwistStamped teleop commands over OmniSocket.""" + +from __future__ import annotations + +import threading +import time +from typing import Dict, Optional, Tuple + +import rclpy +from geometry_msgs.msg import TwistStamped +from rclpy.node import Node + +from .omni_transport import MSG_TYPE_ERROR, OmniTransport +from .protocol import ( + DEFAULT_EXIT_ZERO_PACKETS, + DEFAULT_INPUT_TIMEOUT, + DEFAULT_INPUT_TOPIC, + DEFAULT_KEYBOARD_PEER_ID, + DEFAULT_QUEUE_DEPTH, + DEFAULT_SEND_RATE_HZ, + DEFAULT_TARGET_PEER, + DEFAULT_TRANSPORT, + ZERO_COMMAND, + pack_command, +) + + +CommandTuple = Tuple[float, float, float, float, float, float] + + +class CmdVelUdpSender(Node): + """Forward TwistStamped messages to a remote OmniSocket peer.""" + + def __init__(self) -> None: + super().__init__('cmd_vel_udp_sender') + + self.declare_parameter('transport', DEFAULT_TRANSPORT) + self.declare_parameter('server_addr', '') + self.declare_parameter('relay_via', '') + self.declare_parameter('peer_id', DEFAULT_KEYBOARD_PEER_ID) + self.declare_parameter('target_peer', DEFAULT_TARGET_PEER) + self.declare_parameter('input_topic', DEFAULT_INPUT_TOPIC) + self.declare_parameter('send_rate_hz', DEFAULT_SEND_RATE_HZ) + self.declare_parameter('input_timeout', DEFAULT_INPUT_TIMEOUT) + self.declare_parameter('queue_depth', DEFAULT_QUEUE_DEPTH) + self.declare_parameter('exit_zero_packets', DEFAULT_EXIT_ZERO_PACKETS) + + self._transport_name = str(self.get_parameter('transport').value) + self._server_addr = str(self.get_parameter('server_addr').value) + self._relay_via = str(self.get_parameter('relay_via').value) + self._peer_id = str(self.get_parameter('peer_id').value) + self._target_peer = str(self.get_parameter('target_peer').value).strip() + self._input_topic = str(self.get_parameter('input_topic').value) + self._send_rate_hz = float(self.get_parameter('send_rate_hz').value) + self._input_timeout = float(self.get_parameter('input_timeout').value) + self._queue_depth = int(self.get_parameter('queue_depth').value) + self._exit_zero_packets = int(self.get_parameter('exit_zero_packets').value) + + if self._send_rate_hz <= 0.0: + raise ValueError('send_rate_hz must be > 0') + if self._input_timeout < 0.0: + raise ValueError('input_timeout must be >= 0') + if self._queue_depth <= 0: + raise ValueError('queue_depth must be > 0') + if not self._target_peer: + raise ValueError('target_peer must not be empty') + + self._transport = OmniTransport( + transport=self._transport_name, + server_addr=self._server_addr, + relay_via=self._relay_via, + peer_id=self._peer_id, + ) + self._last_log_times: Dict[str, float] = {} + self._latest_command: CommandTuple = ZERO_COMMAND + self._last_input_monotonic: Optional[float] = None + self._last_sent_command: Optional[CommandTuple] = None + self._closing = threading.Event() + + self.create_subscription( + TwistStamped, + self._input_topic, + self._handle_twist, + self._queue_depth, + ) + self.create_timer(1.0 / self._send_rate_hz, self._send_latest_command) + + self._drain_thread = threading.Thread(target=self._drain_incoming, daemon=True) + self._drain_thread.start() + + self.get_logger().info( + 'Forwarding TwistStamped from %s via %s://%s as %s -> %s at %.1f Hz ' + '(input timeout %.2f s)' + % ( + self._input_topic, + self._transport.transport, + self._transport.server_addr, + self._peer_id, + self._target_peer, + self._send_rate_hz, + self._input_timeout, + ) + ) + + def _should_log(self, key: str, throttle_sec: float) -> bool: + now = time.monotonic() + previous = self._last_log_times.get(key) + if previous is None or (now - previous) >= throttle_sec: + self._last_log_times[key] = now + return True + return False + + def _handle_twist(self, msg: TwistStamped) -> None: + self._latest_command = ( + float(msg.twist.linear.x), + float(msg.twist.linear.y), + float(msg.twist.linear.z), + float(msg.twist.angular.x), + float(msg.twist.angular.y), + float(msg.twist.angular.z), + ) + self._last_input_monotonic = time.monotonic() + + def _command_for_current_tick(self) -> CommandTuple: + if self._last_input_monotonic is None: + return ZERO_COMMAND + if self._input_timeout == 0.0: + return self._latest_command + age = time.monotonic() - self._last_input_monotonic + if age > self._input_timeout: + return ZERO_COMMAND + return self._latest_command + + def _send_command(self, command: CommandTuple) -> None: + payload = pack_command(command) + try: + self._transport.send(to=self._target_peer, data=payload) + self._last_sent_command = command + except OSError as exc: + if self._should_log('send_error', 2.0): + self.get_logger().error(f'OmniSocket send failed: {exc}') + + def _send_latest_command(self) -> None: + self._send_command(self._command_for_current_tick()) + + def _log_inbound_message(self, from_peer: str, msg_type: int, payload: bytes) -> None: + if msg_type == MSG_TYPE_ERROR: + if self._should_log('server_error', 1.0): + text = payload.decode('utf-8', errors='replace') + self.get_logger().error(f'OmniSocket server error from {from_peer}: {text}') + return + + if self._should_log('unexpected_inbound', 2.0): + self.get_logger().warning( + 'Ignoring unexpected inbound message type %d from %s (%d bytes)' + % (msg_type, from_peer, len(payload)) + ) + + def _drain_incoming(self) -> None: + while not self._closing.is_set() and rclpy.ok(): + try: + result = self._transport.recv(timeout_ms=100) + except OSError as exc: + if not self._closing.is_set() and self._should_log('drain_error', 2.0): + self.get_logger().error(f'OmniSocket receive loop stopped: {exc}') + return + + if result is None: + continue + + from_peer, msg_type, payload = result + self._log_inbound_message(from_peer, msg_type, payload) + + def send_zero_burst(self) -> None: + """Best-effort stop command sent during shutdown.""" + for _ in range(max(1, self._exit_zero_packets)): + self._send_command(ZERO_COMMAND) + time.sleep(0.02) + + def close(self) -> None: + self._closing.set() + if hasattr(self, '_transport') and self._transport is not None: + try: + self._transport.close() + except OSError as exc: + if self._should_log('close_error', 2.0): + self.get_logger().warning(f'Closing OmniSocket transport failed: {exc}') + self._transport = None + if hasattr(self, '_drain_thread') and self._drain_thread.is_alive(): + self._drain_thread.join(timeout=0.5) + + def destroy_node(self) -> bool: + self.close() + return super().destroy_node() + + +def main(args: Optional[list[str]] = None) -> None: + rclpy.init(args=args) + node = CmdVelUdpSender() + try: + rclpy.spin(node) + except KeyboardInterrupt: + pass + finally: + node.send_zero_burst() + node.destroy_node() + rclpy.shutdown() diff --git a/ros-control-py/udp_teleop_bridge/udp_teleop_bridge/omni_transport.py b/ros-control-py/udp_teleop_bridge/udp_teleop_bridge/omni_transport.py new file mode 100644 index 0000000..6c0353c --- /dev/null +++ b/ros-control-py/udp_teleop_bridge/udp_teleop_bridge/omni_transport.py @@ -0,0 +1,95 @@ +"""Helpers for working with OmniSocket transport sessions.""" + +from __future__ import annotations + +from .protocol import default_server_addr_for_transport, normalize_transport + + +try: + from omnisocket import ( + CONTROL_DEFAULTS, + MSG_TYPE_BINARY, + MSG_TYPE_ERROR, + Session, + UdpSession, + ) +except ImportError as exc: # pragma: no cover - depends on external build/install + raise RuntimeError( + 'omnisocket is not installed for this Python environment; run ' + '`make python-ext && make python-install` on a Linux host first' + ) from exc + + +def _normalize_optional(value: object) -> str: + return str(value).strip() + + +class OmniTransport: + """Small wrapper that normalizes OmniSocket UDP/KCP session setup.""" + + def __init__( + self, + *, + transport: object, + server_addr: object, + peer_id: object, + relay_via: object = '', + bind_ip: object = '', + bind_device: object = '', + enable_timestamping: bool = False, + ) -> None: + self.transport = normalize_transport(transport) + self.server_addr = _normalize_optional(server_addr) or default_server_addr_for_transport(self.transport) + self.peer_id = _normalize_optional(peer_id) + self.relay_via = _normalize_optional(relay_via) + self.bind_ip = _normalize_optional(bind_ip) + self.bind_device = _normalize_optional(bind_device) + + if not self.peer_id: + raise ValueError('peer_id must not be empty') + + session_cls = Session if self.transport == 'kcp' else UdpSession + self._session = session_cls() + + connect_kwargs: dict[str, object] = { + 'server_addr': self.server_addr, + 'peer_id': self.peer_id, + } + if self.bind_ip: + connect_kwargs['bind_ip'] = self.bind_ip + if self.bind_device: + connect_kwargs['bind_device'] = self.bind_device + + if self.transport == 'kcp': + if self.relay_via: + connect_kwargs['relay_via'] = self.relay_via + connect_kwargs.update(CONTROL_DEFAULTS) + else: + connect_kwargs['enable_timestamping'] = bool(enable_timestamping) + + self._session.connect(**connect_kwargs) + + def send(self, *, to: str, data: bytes) -> None: + self._session.send(to=to, data=data) + + def recv(self, *, timeout_ms: int = -1): + return self._session.recv(timeout_ms=timeout_ms) + + def recv_into(self, *, buffer, timeout_ms: int = -1): + return self._session.recv_into(buffer=buffer, timeout_ms=timeout_ms) + + def close(self) -> None: + self._session.close() + + def stats(self) -> dict[str, int]: + return self._session.stats() + + +__all__ = [ + 'CONTROL_DEFAULTS', + 'MSG_TYPE_BINARY', + 'MSG_TYPE_ERROR', + 'OmniTransport', + 'Session', + 'UdpSession', +] diff --git a/ros-control-py/udp_teleop_bridge/udp_teleop_bridge/protocol.py b/ros-control-py/udp_teleop_bridge/udp_teleop_bridge/protocol.py new file mode 100644 index 0000000..44f4641 --- /dev/null +++ b/ros-control-py/udp_teleop_bridge/udp_teleop_bridge/protocol.py @@ -0,0 +1,74 @@ +"""Shared teleop protocol helpers and transport defaults.""" + +from __future__ import annotations + +import math +import struct +from typing import Iterable, Tuple + + +COMMAND_STRUCT = struct.Struct('<6f') +PACKET_SIZE = COMMAND_STRUCT.size + +SUPPORTED_TRANSPORTS = ('udp', 'kcp') +DEFAULT_TRANSPORT = 'udp' + +DEFAULT_OMNI_UDP_SERVER_ADDR = '127.0.0.1:9001' +DEFAULT_OMNI_KCP_SERVER_ADDR = '127.0.0.1:9002' + +DEFAULT_KEYBOARD_PEER_ID = 'ros-keyboard-ctrl' +DEFAULT_GAMEPAD_PEER_ID = 'ros-gamepad-ctrl' +DEFAULT_BRIDGE_PEER_ID = 'ros-bridge-ctrl' +DEFAULT_TARGET_PEER = DEFAULT_BRIDGE_PEER_ID + +DEFAULT_FRAME_ID = 'pelvis' +DEFAULT_INPUT_TOPIC = '/teleop/cmd_vel' +DEFAULT_OUTPUT_TOPIC = '/hric/robot/cmd_vel' +DEFAULT_SEND_RATE_HZ = 20.0 +DEFAULT_INPUT_TIMEOUT = 0.75 +DEFAULT_WATCHDOG_TIMEOUT = 0.5 +DEFAULT_PUBLISH_RATE_HZ = 100.0 +DEFAULT_QUEUE_DEPTH = 10 +DEFAULT_EXIT_ZERO_PACKETS = 3 +DEFAULT_RECV_BUFFER_BYTES = 2048 + +ZERO_COMMAND = (0.0, 0.0, 0.0, 0.0, 0.0, 0.0) + + +def normalize_transport(value: object) -> str: + """Return a supported transport name.""" + transport = str(value).strip().lower() + if transport not in SUPPORTED_TRANSPORTS: + supported = ', '.join(SUPPORTED_TRANSPORTS) + raise ValueError(f"Unsupported transport '{transport}', expected one of: {supported}") + return transport + + +def default_server_addr_for_transport(transport: str) -> str: + """Return the default OmniSocket server for the chosen transport.""" + transport = normalize_transport(transport) + if transport == 'udp': + return DEFAULT_OMNI_UDP_SERVER_ADDR + return DEFAULT_OMNI_KCP_SERVER_ADDR + + +def normalize_command(values: Iterable[float]) -> Tuple[float, float, float, float, float, float]: + """Return a finite six-float command tuple.""" + command = tuple(float(value) for value in values) + if len(command) != 6: + raise ValueError(f'Expected 6 command values, got {len(command)}') + if any(not math.isfinite(value) for value in command): + raise ValueError('Command contains a non-finite value') + return command + + +def pack_command(values: Iterable[float]) -> bytes: + """Pack six floats into the wire format.""" + return COMMAND_STRUCT.pack(*normalize_command(values)) + + +def unpack_command(payload: bytes) -> Tuple[float, float, float, float, float, float]: + """Decode a control packet into a six-float command tuple.""" + if len(payload) != PACKET_SIZE: + raise ValueError(f'Expected {PACKET_SIZE} bytes, got {len(payload)}') + return normalize_command(COMMAND_STRUCT.unpack(payload)) diff --git a/ros-control-py/udp_teleop_bridge/udp_teleop_bridge/udp_cmd_vel_receiver.py b/ros-control-py/udp_teleop_bridge/udp_teleop_bridge/udp_cmd_vel_receiver.py new file mode 100644 index 0000000..5ead458 --- /dev/null +++ b/ros-control-py/udp_teleop_bridge/udp_teleop_bridge/udp_cmd_vel_receiver.py @@ -0,0 +1,238 @@ +"""ROS 2 node that receives OmniSocket teleop packets and republishes TwistStamped.""" + +from __future__ import annotations + +import threading +import time +from typing import Dict, Optional, Tuple + +import rclpy +from geometry_msgs.msg import TwistStamped +from rclpy.node import Node + +from .omni_transport import MSG_TYPE_BINARY, MSG_TYPE_ERROR, OmniTransport +from .protocol import ( + DEFAULT_BRIDGE_PEER_ID, + DEFAULT_FRAME_ID, + DEFAULT_OUTPUT_TOPIC, + DEFAULT_PUBLISH_RATE_HZ, + DEFAULT_QUEUE_DEPTH, + DEFAULT_RECV_BUFFER_BYTES, + DEFAULT_TRANSPORT, + DEFAULT_WATCHDOG_TIMEOUT, + PACKET_SIZE, + ZERO_COMMAND, + unpack_command, +) + + +CommandTuple = Tuple[float, float, float, float, float, float] + + +class UdpCmdVelReceiver(Node): + """Publish TwistStamped commands from the OmniSocket control wire format.""" + + def __init__(self) -> None: + super().__init__('udp_cmd_vel_receiver') + + self.declare_parameter('transport', DEFAULT_TRANSPORT) + self.declare_parameter('server_addr', '') + self.declare_parameter('relay_via', '') + self.declare_parameter('peer_id', DEFAULT_BRIDGE_PEER_ID) + self.declare_parameter('expected_sender', '') + self.declare_parameter('output_topic', DEFAULT_OUTPUT_TOPIC) + self.declare_parameter('frame_id', DEFAULT_FRAME_ID) + self.declare_parameter('watchdog_timeout', DEFAULT_WATCHDOG_TIMEOUT) + self.declare_parameter('publish_rate_hz', DEFAULT_PUBLISH_RATE_HZ) + self.declare_parameter('queue_depth', DEFAULT_QUEUE_DEPTH) + + self._transport_name = str(self.get_parameter('transport').value) + self._server_addr = str(self.get_parameter('server_addr').value) + self._relay_via = str(self.get_parameter('relay_via').value) + self._peer_id = str(self.get_parameter('peer_id').value) + self._expected_sender = str(self.get_parameter('expected_sender').value).strip() + self._output_topic = str(self.get_parameter('output_topic').value) + self._frame_id = str(self.get_parameter('frame_id').value) + self._watchdog_timeout = float(self.get_parameter('watchdog_timeout').value) + self._publish_rate_hz = float(self.get_parameter('publish_rate_hz').value) + self._queue_depth = int(self.get_parameter('queue_depth').value) + + if self._watchdog_timeout <= 0.0: + raise ValueError('watchdog_timeout must be > 0') + if self._publish_rate_hz <= 0.0: + raise ValueError('publish_rate_hz must be > 0') + if self._queue_depth <= 0: + raise ValueError('queue_depth must be > 0') + + self._publisher = self.create_publisher(TwistStamped, self._output_topic, self._queue_depth) + self._transport = OmniTransport( + transport=self._transport_name, + server_addr=self._server_addr, + relay_via=self._relay_via, + peer_id=self._peer_id, + ) + + self._lock = threading.Lock() + self._last_log_times: Dict[str, float] = {} + self._latest_command: CommandTuple = ZERO_COMMAND + self._last_packet_monotonic: Optional[float] = None + self._last_published_command: CommandTuple = ZERO_COMMAND + self._closing = threading.Event() + self._recv_buffer = bytearray(DEFAULT_RECV_BUFFER_BYTES) + + self.create_timer(1.0 / self._publish_rate_hz, self._publish_tick) + + self._recv_thread = threading.Thread(target=self._recv_loop, daemon=True) + self._recv_thread.start() + + self.get_logger().info( + 'Receiving teleop commands via %s://%s as %s and publishing TwistStamped to %s ' + 'at %.1f Hz (frame_id=%s, watchdog %.2f s)' + % ( + self._transport.transport, + self._transport.server_addr, + self._peer_id, + self._output_topic, + self._publish_rate_hz, + self._frame_id, + self._watchdog_timeout, + ) + ) + + def _should_log(self, key: str, throttle_sec: float) -> bool: + now = time.monotonic() + previous = self._last_log_times.get(key) + if previous is None or (now - previous) >= throttle_sec: + self._last_log_times[key] = now + return True + return False + + def _publish_command(self, command: CommandTuple) -> None: + msg = TwistStamped() + msg.header.stamp = self.get_clock().now().to_msg() + msg.header.frame_id = self._frame_id + msg.twist.linear.x = command[0] + msg.twist.linear.y = command[1] + msg.twist.linear.z = command[2] + msg.twist.angular.x = command[3] + msg.twist.angular.y = command[4] + msg.twist.angular.z = command[5] + self._publisher.publish(msg) + self._last_published_command = command + + def _handle_error_message(self, from_peer: str, body_len: int) -> None: + if self._should_log('server_error', 1.0): + text = bytes(self._recv_buffer[:body_len]).decode('utf-8', errors='replace') + self.get_logger().error(f'OmniSocket server error from {from_peer}: {text}') + + def _recv_loop(self) -> None: + while not self._closing.is_set() and rclpy.ok(): + try: + meta = self._transport.recv_into(buffer=self._recv_buffer, timeout_ms=100) + except BufferError as exc: + if self._should_log('buffer_error', 2.0): + self.get_logger().warning(f'Dropped oversized OmniSocket frame: {exc}') + continue + except OSError as exc: + if not self._closing.is_set() and self._should_log('recv_error', 2.0): + self.get_logger().error(f'OmniSocket receive loop stopped: {exc}') + return + + if meta is None: + continue + + from_peer = str(meta['from']) + msg_type = int(meta['msg_type']) + body_len = int(meta['body_len']) + + if msg_type == MSG_TYPE_ERROR: + self._handle_error_message(from_peer, body_len) + continue + + if self._expected_sender and from_peer != self._expected_sender: + if self._should_log('unexpected_sender', 2.0): + self.get_logger().warning( + 'Ignoring message from unexpected sender %s (expected %s)' + % (from_peer, self._expected_sender) + ) + continue + + if msg_type != MSG_TYPE_BINARY: + if self._should_log('unexpected_type', 2.0): + self.get_logger().warning( + 'Ignoring unexpected message type %d from %s (%d bytes)' + % (msg_type, from_peer, body_len) + ) + continue + + if body_len != PACKET_SIZE: + if self._should_log('packet_size', 2.0): + self.get_logger().warning( + 'Dropped binary payload from %s with invalid size %d (expected %d)' + % (from_peer, body_len, PACKET_SIZE) + ) + continue + + try: + command = unpack_command(self._recv_buffer[:PACKET_SIZE]) + except ValueError as exc: + if self._should_log('decode_error', 2.0): + self.get_logger().warning(f'Dropped malformed command payload: {exc}') + continue + + with self._lock: + self._latest_command = command + self._last_packet_monotonic = time.monotonic() + + def _command_for_publish_tick(self) -> tuple[CommandTuple, Optional[float], bool]: + with self._lock: + latest_command = self._latest_command + last_packet_monotonic = self._last_packet_monotonic + + if last_packet_monotonic is None: + return ZERO_COMMAND, None, False + + age = time.monotonic() - last_packet_monotonic + if age > self._watchdog_timeout: + return ZERO_COMMAND, age, True + return latest_command, age, False + + def _publish_tick(self) -> None: + publish_command, age, timed_out = self._command_for_publish_tick() + + if timed_out and self._last_published_command != ZERO_COMMAND: + if self._should_log('watchdog_stop', 2.0): + self.get_logger().warning( + 'Command stream timed out after %.2f s, publishing zero velocity stop' + % age + ) + + self._publish_command(publish_command) + + def close(self) -> None: + self._closing.set() + if hasattr(self, '_transport') and self._transport is not None: + try: + self._transport.close() + except OSError as exc: + if self._should_log('close_error', 2.0): + self.get_logger().warning(f'Closing OmniSocket transport failed: {exc}') + self._transport = None + if hasattr(self, '_recv_thread') and self._recv_thread.is_alive(): + self._recv_thread.join(timeout=0.5) + + def destroy_node(self) -> bool: + self.close() + return super().destroy_node() + + +def main(args: Optional[list[str]] = None) -> None: + rclpy.init(args=args) + node = UdpCmdVelReceiver() + try: + rclpy.spin(node) + except KeyboardInterrupt: + pass + finally: + node.destroy_node() + rclpy.shutdown() diff --git a/src/peer_udp_client.c b/src/peer_udp_client.c index 4fc3bb5..9e617ab 100644 --- a/src/peer_udp_client.c +++ b/src/peer_udp_client.c @@ -1,6 +1,8 @@ #include "peer_udp_client.h" +#include #include +#include struct udp_client { char id[OMNI_MAX_PEER_ID]; @@ -17,6 +19,19 @@ static int client_next_message_id(udp_client_t *client, uint64_t *out_id) { return 0; } +static void udp_client_fill_recv_meta(udp_client_recv_meta_t *meta, const message_t *msg) { + if (meta == NULL || msg == NULL) { + return; + } + memset(meta, 0, sizeof(*meta)); + meta->type = msg->type; + meta->id = msg->id; + meta->body_len = msg->body_len; + snprintf(meta->from, sizeof(meta->from), "%s", msg->from); + snprintf(meta->to, sizeof(meta->to), "%s", msg->to); + snprintf(meta->file_name, sizeof(meta->file_name), "%s", msg->file_name); +} + static int client_persist_message_to_disk(const message_t *msg, const char *inbox_dir, char *out_path, size_t out_path_len) { char path[512]; if (omni_ensure_dir(inbox_dir) != 0) { @@ -70,7 +85,7 @@ static int client_persist_message_to_disk(const message_t *msg, const char *inbo return 0; } -udp_client_t *udp_client_dial(const char *server_addr, const char *peer_id, const char *bind_ip, latency_logger_t *logger, tx_timestamp_debug_logger_t *debug_logger, int enable_timestamping) { +udp_client_t *udp_client_dial_with_options(const char *server_addr, const char *peer_id, const char *bind_ip, const char *bind_device, latency_logger_t *logger, tx_timestamp_debug_logger_t *debug_logger, int enable_timestamping) { udp_client_t *client; message_t register_msg; client = (udp_client_t *) calloc(1, sizeof(*client)); @@ -80,7 +95,7 @@ udp_client_t *udp_client_dial(const char *server_addr, const char *peer_id, cons snprintf(client->id, sizeof(client->id), "%s", peer_id); pthread_mutex_init(&client->id_mu, NULL); client->logger = logger; - client->conn = udp_conn_dial(server_addr, bind_ip, NULL, enable_timestamping, logger, OMNI_NODE_ROLE_PEER, peer_id, debug_logger); + client->conn = udp_conn_dial(server_addr, bind_ip, bind_device, enable_timestamping, logger, OMNI_NODE_ROLE_PEER, peer_id, debug_logger); if (client->conn == NULL) { udp_client_free(client); return NULL; @@ -97,6 +112,10 @@ udp_client_t *udp_client_dial(const char *server_addr, const char *peer_id, cons return client; } +udp_client_t *udp_client_dial(const char *server_addr, const char *peer_id, const char *bind_ip, latency_logger_t *logger, tx_timestamp_debug_logger_t *debug_logger, int enable_timestamping) { + return udp_client_dial_with_options(server_addr, peer_id, bind_ip, NULL, logger, debug_logger, enable_timestamping); +} + const char *udp_client_id(const udp_client_t *client) { return client == NULL ? "" : client->id; } @@ -124,6 +143,38 @@ int udp_client_send_text(udp_client_t *client, const char *to, const char *text) return 0; } +int udp_client_send_binary(udp_client_t *client, const char *to, const void *data, size_t data_len) { + message_t msg; + uint64_t id; + + if (client == NULL || to == NULL || (data == NULL && data_len > 0)) { + errno = EINVAL; + return -1; + } + + protocol_message_init(&msg); + client_next_message_id(client, &id); + msg.type = MSG_TYPE_BINARY; + msg.id = id; + snprintf(msg.from, sizeof(msg.from), "%s", client->id); + snprintf(msg.to, sizeof(msg.to), "%s", to); + if (data_len > 0) { + msg.body = (uint8_t *) malloc(data_len); + if (msg.body == NULL) { + return -1; + } + memcpy(msg.body, data, data_len); + } + msg.body_len = data_len; + latencylog_log_message_event(client->logger, OMNI_NODE_ROLE_PEER, client->id, EVENT_A_APP_PREP_BEGIN, &msg); + if (udp_conn_send(client->conn, &msg) != 0) { + protocol_message_clear(&msg); + return -1; + } + protocol_message_clear(&msg); + return 0; +} + int udp_client_send_file_path(udp_client_t *client, const char *to, const char *path) { message_t msg; uint64_t id; @@ -151,7 +202,34 @@ int udp_client_send_file_path(udp_client_t *client, const char *to, const char * return 0; } -int udp_client_receive(udp_client_t *client, message_t *out_msg) { +int udp_client_receive_timed(udp_client_t *client, message_t *out_msg, int timeout_ms) { + if (client == NULL || out_msg == NULL) { + errno = EINVAL; + return -1; + } + + if (timeout_ms >= 0) { + struct pollfd pfd; + int rc; + + memset(&pfd, 0, sizeof(pfd)); + pfd.fd = udp_conn_fd(client->conn); + pfd.events = POLLIN | POLLERR | POLLHUP; + do { + rc = poll(&pfd, 1, timeout_ms); + } while (rc < 0 && errno == EINTR); + if (rc == 0) { + return 1; + } + if (rc < 0) { + return -1; + } + if ((pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) != 0 && (pfd.revents & POLLIN) == 0) { + errno = ECONNRESET; + return -1; + } + } + if (udp_conn_receive(client->conn, out_msg, NULL, NULL) != 0) { return -1; } @@ -159,6 +237,39 @@ int udp_client_receive(udp_client_t *client, message_t *out_msg) { return 0; } +int udp_client_receive(udp_client_t *client, message_t *out_msg) { + return udp_client_receive_timed(client, out_msg, -1); +} + +int udp_client_receive_into(udp_client_t *client, void *buffer, size_t buffer_len, udp_client_recv_meta_t *out_meta, int timeout_ms) { + message_t msg; + int rc; + + if (client == NULL || (buffer == NULL && buffer_len > 0) || out_meta == NULL) { + errno = EINVAL; + return -1; + } + + protocol_message_init(&msg); + rc = udp_client_receive_timed(client, &msg, timeout_ms); + if (rc != 0) { + return rc; + } + + udp_client_fill_recv_meta(out_meta, &msg); + if (msg.body_len > buffer_len) { + protocol_message_clear(&msg); + errno = EMSGSIZE; + return 2; + } + + if (msg.body_len > 0) { + memcpy(buffer, msg.body, msg.body_len); + } + protocol_message_clear(&msg); + return 0; +} + int udp_client_persist_message(udp_client_t *client, const message_t *msg, const char *inbox_dir, char *out_path, size_t out_path_len) { if (!latencylog_is_business_message(msg)) { errno = EINVAL; diff --git a/src/transport_kcp.c b/src/transport_kcp.c index 30778ac..5c473fb 100644 --- a/src/transport_kcp.c +++ b/src/transport_kcp.c @@ -1799,6 +1799,8 @@ int kcp_conn_close(kcp_conn_t *conn) { pthread_mutex_lock(&conn->kcp_mu); atomic_store(&conn->closed, 1); if (conn->owns_socket && !conn->socket_closed) { + /* Wake the blocking recv thread before closing the shared UDP socket. */ + (void) shutdown(conn->fd, SHUT_RDWR); close(conn->fd); conn->socket_closed = 1; } diff --git a/src/transport_udp.c b/src/transport_udp.c index c296082..49a0e18 100644 --- a/src/transport_udp.c +++ b/src/transport_udp.c @@ -3,6 +3,7 @@ #include #include #include +#include #include typedef struct udp_pending_tx { @@ -413,6 +414,13 @@ int udp_conn_receive(udp_conn_t *conn, message_t *out_msg, struct sockaddr_stora } n = recvmsg(conn->fd, &msg, 0); if (n < 0) { + if (conn->closed) { + errno = ECANCELED; + } + return -1; + } + if (n == 0 && conn->closed) { + errno = ECANCELED; return -1; } if (conn->timestamping_enabled) { @@ -454,6 +462,8 @@ int udp_conn_close(udp_conn_t *conn) { } if (!conn->closed) { conn->closed = 1; + /* Wake blocking recvmsg()/poll users before tearing down the socket. */ + (void) shutdown(conn->fd, SHUT_RDWR); close(conn->fd); if (conn->errqueue_thread_started) { pthread_join(conn->errqueue_thread, NULL);