feat: 基于Python ROS2的控制程序

This commit is contained in:
2026-04-03 20:00:33 +08:00
parent 6ece408d9f
commit 9ffc36f50d
26 changed files with 2193 additions and 38 deletions

View File

@@ -8,12 +8,24 @@ extern "C" {
#endif
typedef struct udp_client udp_client_t;
typedef struct udp_client_recv_meta {
message_type_t type;
uint64_t id;
char from[OMNI_MAX_PEER_ID];
char to[OMNI_MAX_PEER_ID];
char file_name[OMNI_MAX_FILE_NAME];
size_t body_len;
} udp_client_recv_meta_t;
udp_client_t *udp_client_dial_with_options(const char *server_addr, const char *peer_id, const char *bind_ip, const char *bind_device, latency_logger_t *logger, tx_timestamp_debug_logger_t *debug_logger, int enable_timestamping);
udp_client_t *udp_client_dial(const char *server_addr, const char *peer_id, const char *bind_ip, latency_logger_t *logger, tx_timestamp_debug_logger_t *debug_logger, int enable_timestamping);
const char *udp_client_id(const udp_client_t *client);
int udp_client_send_text(udp_client_t *client, const char *to, const char *text);
int udp_client_send_binary(udp_client_t *client, const char *to, const void *data, size_t data_len);
int udp_client_send_file_path(udp_client_t *client, const char *to, const char *path);
int udp_client_receive_timed(udp_client_t *client, message_t *out_msg, int timeout_ms);
int udp_client_receive(udp_client_t *client, message_t *out_msg);
int udp_client_receive_into(udp_client_t *client, void *buffer, size_t buffer_len, udp_client_recv_meta_t *out_meta, int timeout_ms);
int udp_client_persist_message(udp_client_t *client, const message_t *msg, const char *inbox_dir, char *out_path, size_t out_path_len);
int udp_client_close(udp_client_t *client);
void udp_client_free(udp_client_t *client);

View File

@@ -6,6 +6,7 @@ try:
MSG_TYPE_REGISTER,
MSG_TYPE_TEXT,
Session,
UdpSession,
)
except ImportError as exc:
raise ImportError(
@@ -41,4 +42,5 @@ __all__ = [
"MSG_TYPE_REGISTER",
"MSG_TYPE_TEXT",
"Session",
"UdpSession",
]

View File

@@ -8,6 +8,11 @@ typedef struct PyOmniSession {
omnisocket_session_t session;
} PyOmniSession;
typedef struct PyOmniUdpSession {
PyObject_HEAD
omnisocket_udp_session_t session;
} PyOmniUdpSession;
PyDoc_STRVAR(
PyOmniSession_recv_doc,
"recv(timeout_ms=-1) -> (from_peer, msg_type, payload) | None"
@@ -22,6 +27,66 @@ PyDoc_STRVAR(
"current frame has already been consumed and is lost."
);
static PyObject *build_recv_result(const message_t *msg) {
PyObject *body = NULL;
PyObject *result = NULL;
body = PyBytes_FromStringAndSize((const char *) msg->body, (Py_ssize_t) msg->body_len);
if (body == NULL) {
return NULL;
}
result = Py_BuildValue("(siO)", msg->from, (int) msg->type, body);
Py_DECREF(body);
return result;
}
static PyObject *build_recv_meta_dict(
const char *from_peer,
const char *to_peer,
const char *file_name,
int msg_type,
unsigned long long message_id,
unsigned long long body_len
) {
return Py_BuildValue(
"{s:s,s:s,s:s,s:i,s:K,s:K}",
"from",
from_peer,
"to",
to_peer,
"file_name",
file_name,
"msg_type",
msg_type,
"message_id",
message_id,
"body_len",
body_len
);
}
static PyObject *build_stats_dict(const omnisocket_session_stats_t *stats) {
return Py_BuildValue(
"{s:K,s:K,s:K,s:K,s:K,s:K,s:K,s:i}",
"send_calls",
(unsigned long long) stats->send_calls,
"send_bytes",
(unsigned long long) stats->send_bytes,
"send_errors",
(unsigned long long) stats->send_errors,
"recv_calls",
(unsigned long long) stats->recv_calls,
"recv_bytes",
(unsigned long long) stats->recv_bytes,
"recv_timeouts",
(unsigned long long) stats->recv_timeouts,
"recv_errors",
(unsigned long long) stats->recv_errors,
"connected",
stats->connected
);
}
static PyObject *PyOmniSession_new(PyTypeObject *type, PyObject *args, PyObject *kwargs) {
PyOmniSession *self;
(void) args;
@@ -165,7 +230,6 @@ static PyObject *PyOmniSession_recv(PyOmniSession *self, PyObject *args, PyObjec
int timeout_ms = -1;
int rc;
message_t msg;
PyObject *body = NULL;
PyObject *result = NULL;
static char *kwlist[] = {"timeout_ms", NULL};
@@ -187,13 +251,7 @@ static PyObject *PyOmniSession_recv(PyOmniSession *self, PyObject *args, PyObjec
return PyErr_SetFromErrno(PyExc_OSError);
}
body = PyBytes_FromStringAndSize((const char *) msg.body, (Py_ssize_t) msg.body_len);
if (body == NULL) {
protocol_message_clear(&msg);
return NULL;
}
result = Py_BuildValue("(siO)", msg.from, (int) msg.type, body);
Py_DECREF(body);
result = build_recv_result(&msg);
protocol_message_clear(&msg);
return result;
}
@@ -237,19 +295,12 @@ static PyObject *PyOmniSession_recv_into(PyOmniSession *self, PyObject *args, Py
return PyErr_SetFromErrno(PyExc_OSError);
}
result = Py_BuildValue(
"{s:s,s:s,s:s,s:i,s:K,s:K}",
"from",
result = build_recv_meta_dict(
meta.from,
"to",
meta.to,
"file_name",
meta.file_name,
"msg_type",
(int) meta.type,
"message_id",
(unsigned long long) meta.id,
"body_len",
(unsigned long long) meta.body_len
);
return result;
@@ -260,25 +311,7 @@ static PyObject *PyOmniSession_stats(PyOmniSession *self, PyObject *Py_UNUSED(ig
memset(&stats, 0, sizeof(stats));
omnisocket_session_stats_snapshot(&self->session, &stats);
return Py_BuildValue(
"{s:K,s:K,s:K,s:K,s:K,s:K,s:K,s:i}",
"send_calls",
(unsigned long long) stats.send_calls,
"send_bytes",
(unsigned long long) stats.send_bytes,
"send_errors",
(unsigned long long) stats.send_errors,
"recv_calls",
(unsigned long long) stats.recv_calls,
"recv_bytes",
(unsigned long long) stats.recv_bytes,
"recv_timeouts",
(unsigned long long) stats.recv_timeouts,
"recv_errors",
(unsigned long long) stats.recv_errors,
"connected",
stats.connected
);
return build_stats_dict(&stats);
}
static PyMethodDef PyOmniSession_methods[] = {
@@ -295,6 +328,211 @@ static PyTypeObject PyOmniSessionType = {
PyVarObject_HEAD_INIT(NULL, 0)
};
static PyObject *PyOmniUdpSession_new(PyTypeObject *type, PyObject *args, PyObject *kwargs) {
PyOmniUdpSession *self;
(void) args;
(void) kwargs;
self = (PyOmniUdpSession *) type->tp_alloc(type, 0);
if (self == NULL) {
return NULL;
}
if (omnisocket_udp_session_init(&self->session) != 0) {
type->tp_free((PyObject *) self);
return PyErr_SetFromErrno(PyExc_OSError);
}
return (PyObject *) self;
}
static void PyOmniUdpSession_dealloc(PyOmniUdpSession *self) {
omnisocket_udp_session_destroy(&self->session);
Py_TYPE(self)->tp_free((PyObject *) self);
}
static PyObject *PyOmniUdpSession_connect(PyOmniUdpSession *self, PyObject *args, PyObject *kwargs) {
const char *server_addr;
const char *peer_id;
const char *bind_ip = "";
const char *bind_device = "";
int enable_timestamping = 0;
int rc;
static char *kwlist[] = {
"server_addr",
"peer_id",
"bind_ip",
"bind_device",
"enable_timestamping",
NULL
};
if (!PyArg_ParseTupleAndKeywords(
args,
kwargs,
"ss|ssi",
kwlist,
&server_addr,
&peer_id,
&bind_ip,
&bind_device,
&enable_timestamping)) {
return NULL;
}
Py_BEGIN_ALLOW_THREADS
rc = omnisocket_udp_session_connect(
&self->session,
server_addr,
peer_id,
bind_ip,
bind_device,
enable_timestamping
);
Py_END_ALLOW_THREADS
if (rc != 0) {
return PyErr_SetFromErrno(PyExc_OSError);
}
Py_RETURN_NONE;
}
static PyObject *PyOmniUdpSession_close(PyOmniUdpSession *self, PyObject *Py_UNUSED(ignored)) {
int rc;
Py_BEGIN_ALLOW_THREADS
rc = omnisocket_udp_session_close(&self->session);
Py_END_ALLOW_THREADS
if (rc != 0) {
return PyErr_SetFromErrno(PyExc_OSError);
}
Py_RETURN_NONE;
}
static PyObject *PyOmniUdpSession_send(PyOmniUdpSession *self, PyObject *args, PyObject *kwargs) {
const char *to;
Py_buffer payload;
int rc;
static char *kwlist[] = {"to", "data", NULL};
memset(&payload, 0, sizeof(payload));
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "sy*", kwlist, &to, &payload)) {
return NULL;
}
Py_BEGIN_ALLOW_THREADS
rc = omnisocket_udp_session_send(&self->session, to, payload.buf, (size_t) payload.len);
Py_END_ALLOW_THREADS
PyBuffer_Release(&payload);
if (rc != 0) {
return PyErr_SetFromErrno(PyExc_OSError);
}
Py_RETURN_NONE;
}
static PyObject *PyOmniUdpSession_recv(PyOmniUdpSession *self, PyObject *args, PyObject *kwargs) {
int timeout_ms = -1;
int rc;
message_t msg;
PyObject *result = NULL;
static char *kwlist[] = {"timeout_ms", NULL};
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|i", kwlist, &timeout_ms)) {
return NULL;
}
protocol_message_init(&msg);
Py_BEGIN_ALLOW_THREADS
rc = omnisocket_udp_session_recv(&self->session, &msg, timeout_ms);
Py_END_ALLOW_THREADS
if (rc == 1) {
protocol_message_clear(&msg);
Py_RETURN_NONE;
}
if (rc != 0) {
protocol_message_clear(&msg);
return PyErr_SetFromErrno(PyExc_OSError);
}
result = build_recv_result(&msg);
protocol_message_clear(&msg);
return result;
}
static PyObject *PyOmniUdpSession_recv_into(PyOmniUdpSession *self, PyObject *args, PyObject *kwargs) {
PyObject *buffer_obj;
Py_buffer view;
int timeout_ms = -1;
int rc;
udp_client_recv_meta_t meta;
PyObject *result = NULL;
static char *kwlist[] = {"buffer", "timeout_ms", NULL};
memset(&view, 0, sizeof(view));
memset(&meta, 0, sizeof(meta));
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|i", kwlist, &buffer_obj, &timeout_ms)) {
return NULL;
}
if (PyObject_GetBuffer(buffer_obj, &view, PyBUF_WRITABLE) != 0) {
return NULL;
}
Py_BEGIN_ALLOW_THREADS
rc = omnisocket_udp_session_recv_into(&self->session, view.buf, (size_t) view.len, &meta, timeout_ms);
Py_END_ALLOW_THREADS
PyBuffer_Release(&view);
if (rc == 1) {
Py_RETURN_NONE;
}
if (rc == 2) {
PyErr_Format(
PyExc_BufferError,
"buffer too small: need %zu bytes; current frame was already consumed and dropped",
meta.body_len
);
return NULL;
}
if (rc != 0) {
return PyErr_SetFromErrno(PyExc_OSError);
}
result = build_recv_meta_dict(
meta.from,
meta.to,
meta.file_name,
(int) meta.type,
(unsigned long long) meta.id,
(unsigned long long) meta.body_len
);
return result;
}
static PyObject *PyOmniUdpSession_stats(PyOmniUdpSession *self, PyObject *Py_UNUSED(ignored)) {
omnisocket_session_stats_t stats;
memset(&stats, 0, sizeof(stats));
omnisocket_udp_session_stats_snapshot(&self->session, &stats);
return build_stats_dict(&stats);
}
static PyMethodDef PyOmniUdpSession_methods[] = {
{"connect", (PyCFunction) PyOmniUdpSession_connect, METH_VARARGS | METH_KEYWORDS, NULL},
{"close", (PyCFunction) PyOmniUdpSession_close, METH_NOARGS, NULL},
{"send", (PyCFunction) PyOmniUdpSession_send, METH_VARARGS | METH_KEYWORDS, NULL},
{"recv", (PyCFunction) PyOmniUdpSession_recv, METH_VARARGS | METH_KEYWORDS, PyOmniSession_recv_doc},
{"recv_into", (PyCFunction) PyOmniUdpSession_recv_into, METH_VARARGS | METH_KEYWORDS, PyOmniSession_recv_into_doc},
{"stats", (PyCFunction) PyOmniUdpSession_stats, METH_NOARGS, NULL},
{NULL, NULL, 0, NULL}
};
static PyTypeObject PyOmniUdpSessionType = {
PyVarObject_HEAD_INIT(NULL, 0)
};
static PyModuleDef omnisocket_module = {
PyModuleDef_HEAD_INIT,
.m_name = "_omnisocket",
@@ -315,6 +553,17 @@ PyMODINIT_FUNC PyInit__omnisocket(void) {
return NULL;
}
PyOmniUdpSessionType.tp_name = "omnisocket.UdpSession";
PyOmniUdpSessionType.tp_basicsize = sizeof(PyOmniUdpSession);
PyOmniUdpSessionType.tp_flags = Py_TPFLAGS_DEFAULT;
PyOmniUdpSessionType.tp_new = PyOmniUdpSession_new;
PyOmniUdpSessionType.tp_dealloc = (destructor) PyOmniUdpSession_dealloc;
PyOmniUdpSessionType.tp_methods = PyOmniUdpSession_methods;
if (PyType_Ready(&PyOmniUdpSessionType) < 0) {
return NULL;
}
module = PyModule_Create(&omnisocket_module);
if (module == NULL) {
return NULL;
@@ -327,6 +576,13 @@ PyMODINIT_FUNC PyInit__omnisocket(void) {
return NULL;
}
Py_INCREF(&PyOmniUdpSessionType);
if (PyModule_AddObject(module, "UdpSession", (PyObject *) &PyOmniUdpSessionType) != 0) {
Py_DECREF(&PyOmniUdpSessionType);
Py_DECREF(module);
return NULL;
}
if (PyModule_AddIntConstant(module, "MSG_TYPE_TEXT", MSG_TYPE_TEXT) != 0 ||
PyModule_AddIntConstant(module, "MSG_TYPE_FILE", MSG_TYPE_FILE) != 0 ||
PyModule_AddIntConstant(module, "MSG_TYPE_REGISTER", MSG_TYPE_REGISTER) != 0 ||

View File

@@ -246,3 +246,245 @@ void omnisocket_session_stats_snapshot(omnisocket_session_t *session, omnisocket
*out_stats = session->stats;
pthread_mutex_unlock(&session->mutex);
}
int omnisocket_udp_session_init(omnisocket_udp_session_t *session) {
int rc;
if (session == NULL) {
errno = EINVAL;
return -1;
}
memset(session, 0, sizeof(*session));
rc = pthread_mutex_init(&session->mutex, NULL);
if (rc != 0) {
errno = rc;
return -1;
}
rc = pthread_cond_init(&session->idle_cond, NULL);
if (rc != 0) {
pthread_mutex_destroy(&session->mutex);
errno = rc;
return -1;
}
return 0;
}
void omnisocket_udp_session_destroy(omnisocket_udp_session_t *session) {
if (session == NULL) {
return;
}
(void) omnisocket_udp_session_close(session);
pthread_cond_destroy(&session->idle_cond);
pthread_mutex_destroy(&session->mutex);
}
static int omnisocket_udp_session_begin_client_op(omnisocket_udp_session_t *session, udp_client_t **out_client) {
if (session == NULL || out_client == NULL) {
errno = EINVAL;
return -1;
}
pthread_mutex_lock(&session->mutex);
if (session->closing) {
pthread_mutex_unlock(&session->mutex);
errno = ECANCELED;
return -1;
}
if (session->client == NULL) {
pthread_mutex_unlock(&session->mutex);
errno = ENOTCONN;
return -1;
}
*out_client = session->client;
session->active_ops += 1;
pthread_mutex_unlock(&session->mutex);
return 0;
}
int omnisocket_udp_session_connect(
omnisocket_udp_session_t *session,
const char *server_addr,
const char *peer_id,
const char *bind_ip,
const char *bind_device,
int enable_timestamping
) {
udp_client_t *client;
if (session == NULL || server_addr == NULL || peer_id == NULL) {
errno = EINVAL;
return -1;
}
pthread_mutex_lock(&session->mutex);
while (session->closing) {
pthread_cond_wait(&session->idle_cond, &session->mutex);
}
if (session->client != NULL) {
pthread_mutex_unlock(&session->mutex);
errno = EISCONN;
return -1;
}
client = udp_client_dial_with_options(
server_addr,
peer_id,
bind_ip,
bind_device,
NULL,
NULL,
enable_timestamping
);
if (client == NULL) {
pthread_mutex_unlock(&session->mutex);
return -1;
}
session->client = client;
session->stats.connected = 1;
pthread_mutex_unlock(&session->mutex);
return 0;
}
int omnisocket_udp_session_close(omnisocket_udp_session_t *session) {
udp_client_t *client;
if (session == NULL) {
errno = EINVAL;
return -1;
}
pthread_mutex_lock(&session->mutex);
while (session->closing) {
pthread_cond_wait(&session->idle_cond, &session->mutex);
}
client = session->client;
if (client != NULL) {
session->closing = 1;
session->client = NULL;
}
session->stats.connected = 0;
pthread_mutex_unlock(&session->mutex);
if (client != NULL) {
udp_client_close(client);
pthread_mutex_lock(&session->mutex);
while (session->active_ops > 0) {
pthread_cond_wait(&session->idle_cond, &session->mutex);
}
pthread_mutex_unlock(&session->mutex);
udp_client_free(client);
pthread_mutex_lock(&session->mutex);
session->closing = 0;
pthread_cond_broadcast(&session->idle_cond);
pthread_mutex_unlock(&session->mutex);
}
return 0;
}
int omnisocket_udp_session_send(omnisocket_udp_session_t *session, const char *to, const void *data, size_t data_len) {
udp_client_t *client;
int rc;
if (session == NULL || to == NULL || (data == NULL && data_len > 0)) {
errno = EINVAL;
return -1;
}
if (omnisocket_udp_session_begin_client_op(session, &client) != 0) {
return -1;
}
rc = udp_client_send_binary(client, to, data, data_len);
pthread_mutex_lock(&session->mutex);
if (rc == 0) {
session->stats.send_calls += 1;
session->stats.send_bytes += (uint64_t) data_len;
} else {
session->stats.send_errors += 1;
}
if (session->active_ops > 0) {
session->active_ops -= 1;
}
if (session->closing && session->active_ops == 0) {
pthread_cond_broadcast(&session->idle_cond);
}
pthread_mutex_unlock(&session->mutex);
return rc;
}
int omnisocket_udp_session_recv(omnisocket_udp_session_t *session, message_t *out_msg, int timeout_ms) {
udp_client_t *client;
int rc;
if (session == NULL || out_msg == NULL) {
errno = EINVAL;
return -1;
}
if (omnisocket_udp_session_begin_client_op(session, &client) != 0) {
return -1;
}
rc = udp_client_receive_timed(client, out_msg, timeout_ms);
pthread_mutex_lock(&session->mutex);
if (rc == 0) {
session->stats.recv_calls += 1;
session->stats.recv_bytes += (uint64_t) out_msg->body_len;
} else if (rc == 1) {
session->stats.recv_timeouts += 1;
} else {
session->stats.recv_errors += 1;
}
if (session->active_ops > 0) {
session->active_ops -= 1;
}
if (session->closing && session->active_ops == 0) {
pthread_cond_broadcast(&session->idle_cond);
}
pthread_mutex_unlock(&session->mutex);
return rc;
}
int omnisocket_udp_session_recv_into(
omnisocket_udp_session_t *session,
void *buffer,
size_t buffer_len,
udp_client_recv_meta_t *out_meta,
int timeout_ms
) {
udp_client_t *client;
int rc;
if (session == NULL || out_meta == NULL || (buffer == NULL && buffer_len > 0)) {
errno = EINVAL;
return -1;
}
if (omnisocket_udp_session_begin_client_op(session, &client) != 0) {
return -1;
}
rc = udp_client_receive_into(client, buffer, buffer_len, out_meta, timeout_ms);
pthread_mutex_lock(&session->mutex);
if (rc == 0) {
session->stats.recv_calls += 1;
session->stats.recv_bytes += (uint64_t) out_meta->body_len;
} else if (rc == 1) {
session->stats.recv_timeouts += 1;
} else {
session->stats.recv_errors += 1;
}
if (session->active_ops > 0) {
session->active_ops -= 1;
}
if (session->closing && session->active_ops == 0) {
pthread_cond_broadcast(&session->idle_cond);
}
pthread_mutex_unlock(&session->mutex);
return rc;
}
void omnisocket_udp_session_stats_snapshot(omnisocket_udp_session_t *session, omnisocket_session_stats_t *out_stats) {
if (session == NULL || out_stats == NULL) {
return;
}
pthread_mutex_lock(&session->mutex);
*out_stats = session->stats;
pthread_mutex_unlock(&session->mutex);
}

View File

@@ -2,6 +2,7 @@
#define OMNISOCKET_PY_CLIENT_H
#include "peer_kcp_client.h"
#include "peer_udp_client.h"
typedef struct omnisocket_session_stats {
uint64_t send_calls;
@@ -23,6 +24,15 @@ typedef struct omnisocket_session {
omnisocket_session_stats_t stats;
} omnisocket_session_t;
typedef struct omnisocket_udp_session {
pthread_mutex_t mutex;
pthread_cond_t idle_cond;
udp_client_t *client;
size_t active_ops;
int closing;
omnisocket_session_stats_t stats;
} omnisocket_udp_session_t;
int omnisocket_session_init(omnisocket_session_t *session);
void omnisocket_session_destroy(omnisocket_session_t *session);
@@ -48,4 +58,27 @@ int omnisocket_session_recv_into(
);
void omnisocket_session_stats_snapshot(omnisocket_session_t *session, omnisocket_session_stats_t *out_stats);
int omnisocket_udp_session_init(omnisocket_udp_session_t *session);
void omnisocket_udp_session_destroy(omnisocket_udp_session_t *session);
int omnisocket_udp_session_connect(
omnisocket_udp_session_t *session,
const char *server_addr,
const char *peer_id,
const char *bind_ip,
const char *bind_device,
int enable_timestamping
);
int omnisocket_udp_session_close(omnisocket_udp_session_t *session);
int omnisocket_udp_session_send(omnisocket_udp_session_t *session, const char *to, const void *data, size_t data_len);
int omnisocket_udp_session_recv(omnisocket_udp_session_t *session, message_t *out_msg, int timeout_ms);
int omnisocket_udp_session_recv_into(
omnisocket_udp_session_t *session,
void *buffer,
size_t buffer_len,
udp_client_recv_meta_t *out_meta,
int timeout_ms
);
void omnisocket_udp_session_stats_snapshot(omnisocket_udp_session_t *session, omnisocket_session_stats_t *out_stats);
#endif

View File

@@ -0,0 +1,180 @@
from __future__ import annotations
from contextlib import contextmanager
from pathlib import Path
import socket
import subprocess
import sys
import threading
import time
import pytest
pytestmark = pytest.mark.skipif(sys.platform != 'linux', reason='Linux-only OmniSocket extension')
ROOT = Path(__file__).resolve().parents[2]
PYTHON_ROOT = ROOT / 'python'
if str(PYTHON_ROOT) not in sys.path:
sys.path.insert(0, str(PYTHON_ROOT))
omnisocket = pytest.importorskip('omnisocket')
CONTROL_DEFAULTS = omnisocket.CONTROL_DEFAULTS
MSG_TYPE_BINARY = omnisocket.MSG_TYPE_BINARY
Session = omnisocket.Session
UdpSession = omnisocket.UdpSession
def _reserve_port() -> int:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.bind(('127.0.0.1', 0))
return int(sock.getsockname()[1])
@contextmanager
def _run_server(binary_name: str, listen_addr: str):
binary = ROOT / 'bin' / binary_name
if not binary.exists():
pytest.skip(f'{binary} is not built')
process = subprocess.Popen(
[str(binary), '-listen', listen_addr],
cwd=str(ROOT),
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
try:
time.sleep(0.2)
yield process
finally:
process.terminate()
try:
process.wait(timeout=2.0)
except subprocess.TimeoutExpired:
process.kill()
process.wait(timeout=2.0)
def _connect_with_retry(session_cls, *, transport: str, server_addr: str, peer_id: str):
deadline = time.monotonic() + 3.0
last_error: Exception | None = None
while time.monotonic() < deadline:
session = session_cls()
try:
kwargs: dict[str, object] = {
'server_addr': server_addr,
'peer_id': peer_id,
}
if transport == 'kcp':
kwargs.update(CONTROL_DEFAULTS)
else:
kwargs['enable_timestamping'] = False
session.connect(**kwargs)
return session
except OSError as exc:
last_error = exc
time.sleep(0.1)
raise AssertionError(f'failed to connect {peer_id} to {server_addr}: {last_error}')
@pytest.mark.parametrize(
('transport', 'binary_name', 'session_cls'),
[
('udp', 'udpserver', UdpSession),
('kcp', 'kcpserver', Session),
],
)
def test_control_sessions_smoke(transport: str, binary_name: str, session_cls) -> None:
port = _reserve_port()
listen_addr = f'127.0.0.1:{port}'
sender_id = f'pytest-{transport}-sender'
receiver_id = f'pytest-{transport}-receiver'
with _run_server(binary_name, listen_addr):
sender = _connect_with_retry(session_cls, transport=transport, server_addr=listen_addr, peer_id=sender_id)
receiver = _connect_with_retry(session_cls, transport=transport, server_addr=listen_addr, peer_id=receiver_id)
try:
assert receiver.recv(timeout_ms=20) is None
payload = b'control-packet-1'
sender.send(to=receiver_id, data=payload)
from_peer, msg_type, recv_payload = receiver.recv(timeout_ms=1000)
assert from_peer == sender_id
assert msg_type == MSG_TYPE_BINARY
assert recv_payload == payload
payload2 = b'control-packet-2'
sender.send(to=receiver_id, data=payload2)
recv_buffer = bytearray(128)
meta = receiver.recv_into(buffer=recv_buffer, timeout_ms=1000)
assert meta is not None
assert meta['from'] == sender_id
assert meta['msg_type'] == MSG_TYPE_BINARY
assert meta['body_len'] == len(payload2)
assert bytes(recv_buffer[: meta['body_len']]) == payload2
sender_stats = sender.stats()
receiver_stats = receiver.stats()
assert sender_stats['connected'] == 1
assert receiver_stats['connected'] == 1
assert sender_stats['send_calls'] >= 2
assert receiver_stats['recv_calls'] >= 2
finally:
sender.close()
receiver.close()
def test_udp_session_close_interrupts_blocking_recv() -> None:
port = _reserve_port()
listen_addr = f'127.0.0.1:{port}'
receiver_id = 'pytest-udp-blocking-recv'
with _run_server('udpserver', listen_addr):
receiver = _connect_with_retry(
UdpSession,
transport='udp',
server_addr=listen_addr,
peer_id=receiver_id,
)
recv_error: list[BaseException] = []
close_error: list[BaseException] = []
recv_started = threading.Event()
recv_done = threading.Event()
close_done = threading.Event()
def recv_worker() -> None:
recv_started.set()
try:
receiver.recv()
except BaseException as exc: # pragma: no cover - assertion is on thread completion
recv_error.append(exc)
finally:
recv_done.set()
def close_worker() -> None:
try:
receiver.close()
except BaseException as exc: # pragma: no cover - assertion is on thread completion
close_error.append(exc)
finally:
close_done.set()
recv_thread = threading.Thread(target=recv_worker, daemon=True)
recv_thread.start()
assert recv_started.wait(timeout=1.0)
time.sleep(0.05)
close_thread = threading.Thread(target=close_worker, daemon=True)
close_thread.start()
assert close_done.wait(timeout=1.0), 'UdpSession.close() blocked while recv() was waiting'
assert recv_done.wait(timeout=1.0), 'UdpSession.recv() stayed blocked after close()'
assert not close_thread.is_alive()
assert not recv_thread.is_alive()
assert not close_error
assert not recv_error or isinstance(recv_error[0], OSError)

View File

@@ -5,6 +5,11 @@
- `udp` (default): unchanged from the original implementation
- `kcp`: sent through OmniSocket using `MSG_TYPE_BINARY`
Note:
- This README documents the `ros-control-c` path only.
- `ros-control-py` now uses OmniSocket for both `transport:=udp` and `transport:=kcp`; its `udp` mode is no longer raw socket UDP.
## Build
On Linux:

246
ros-control-py/README.md Normal file
View File

@@ -0,0 +1,246 @@
# ROS2 Teleop over OmniSocket UDP/KCP
`ros-control-py/udp_teleop_bridge` 现在把 teleop 控制流统一接到 OmniSocket peer 传输上。
- `transport:=udp` 表示 OmniSocket UDP`udpserver/udppeer` 的消息协议传输
- `transport:=kcp` 表示 OmniSocket KCP`kcpserver/kcppeer` 的消息协议传输
- 不再使用原来的裸 `socket.sendto()/recvfrom()` UDP 路径
机器人最终接收的话题保持不变:
- topic: `/hric/robot/cmd_vel`
- type: `geometry_msgs/msg/TwistStamped`
- frame_id: `pelvis`
控制负载也保持不变:
- fixed payload: 24-byte little-endian `<6f>`
- order: `lx, ly, lz, ax, ay, az`
## 目录
- `udp_teleop_bridge/udp_teleop_bridge/cmd_vel_udp_sender.py`: 订阅 `TwistStamped`,经 OmniSocket 发送 24 字节控制包
- `udp_teleop_bridge/udp_teleop_bridge/udp_cmd_vel_receiver.py`: 从 OmniSocket 接收控制包,补时间戳并发布到机器人 ROS2 topic
- `udp_teleop_bridge/udp_teleop_bridge/omni_transport.py`: 统一封装 OmniSocket UDP/KCP session
- `udp_teleop_bridge/config/xbox_twist_joy.yaml`: Xbox 手柄映射
- `udp_teleop_bridge/launch/*.launch.py`: Linux 启动入口
## Linux 构建
先安装 ROS 2 官方 teleop 依赖:
```bash
sudo apt install ros-${ROS_DISTRO}-joy ros-${ROS_DISTRO}-teleop-twist-joy ros-${ROS_DISTRO}-teleop-twist-keyboard
```
再构建并安装 OmniSocket Python 扩展:
```bash
make python-ext
make python-install
```
最后构建 ROS 包:
```bash
colcon build --packages-select udp_teleop_bridge
source install/setup.bash
```
如果 `omnisocket` 没有安装到当前 ROS Python 环境sender/receiver 会直接报错退出。
## 先验证机器人控制语义
在机器人本机先直接低速发布 `/hric/robot/cmd_vel`,确认 `linear.x``linear.y``angular.z` 的物理方向符合预期:
```bash
ros2 topic pub /hric/robot/cmd_vel geometry_msgs/msg/TwistStamped \
"{header: {frame_id: pelvis}, twist: {linear: {x: 0.10, y: 0.0, z: 0.0}, angular: {x: 0.0, y: 0.0, z: 0.0}}}" \
-r 20
```
```bash
ros2 topic pub /hric/robot/cmd_vel geometry_msgs/msg/TwistStamped \
"{header: {frame_id: pelvis}, twist: {linear: {x: 0.0, y: 0.10, z: 0.0}, angular: {x: 0.0, y: 0.0, z: 0.0}}}" \
-r 20
```
```bash
ros2 topic pub /hric/robot/cmd_vel geometry_msgs/msg/TwistStamped \
"{header: {frame_id: pelvis}, twist: {linear: {x: 0.0, y: 0.0, z: 0.0}, angular: {x: 0.0, y: 0.0, z: 0.30}}}" \
-r 20
```
停止:
```bash
ros2 topic pub --once /hric/robot/cmd_vel geometry_msgs/msg/TwistStamped \
"{header: {frame_id: pelvis}, twist: {linear: {x: 0.0, y: 0.0, z: 0.0}, angular: {x: 0.0, y: 0.0, z: 0.0}}}"
```
## 启动 OmniSocket Hub
OmniSocket UDP:
```bash
./bin/udpserver -listen :9001
```
OmniSocket KCP:
```bash
./bin/kcpserver -listen :9002
```
`server_addr` 不传时,节点会按 `transport` 自动选择默认值:
- `udp` -> `127.0.0.1:9001`
- `kcp` -> `127.0.0.1:9002`
`relay_via` 只在 `transport:=kcp` 时生效。
## 机器人端运行
UDP:
```bash
ros2 launch udp_teleop_bridge robot_udp_receiver.launch.py \
transport:=udp \
server_addr:=127.0.0.1:9001 \
peer_id:=ros-bridge-ctrl \
output_topic:=/hric/robot/cmd_vel \
frame_id:=pelvis \
watchdog_timeout:=0.5
```
KCP:
```bash
ros2 launch udp_teleop_bridge robot_udp_receiver.launch.py \
transport:=kcp \
server_addr:=127.0.0.1:9002 \
peer_id:=ros-bridge-ctrl \
output_topic:=/hric/robot/cmd_vel \
frame_id:=pelvis \
watchdog_timeout:=0.5
```
如果只允许某个 sender 控制,可以加:
```bash
expected_sender:=ros-keyboard-ctrl
```
## 控制端键盘运行
终端 A启动 sender:
```bash
ros2 launch udp_teleop_bridge keyboard_sender.launch.py \
transport:=udp \
server_addr:=127.0.0.1:9001 \
peer_id:=ros-keyboard-ctrl \
target_peer:=ros-bridge-ctrl
```
如果走 KCP:
```bash
ros2 launch udp_teleop_bridge keyboard_sender.launch.py \
transport:=kcp \
server_addr:=127.0.0.1:9002 \
peer_id:=ros-keyboard-ctrl \
target_peer:=ros-bridge-ctrl
```
终端 B启动官方键盘 teleop:
```bash
ros2 run teleop_twist_keyboard teleop_twist_keyboard --ros-args \
--remap cmd_vel:=/teleop/cmd_vel \
-p stamped:=true \
-p frame_id:=pelvis \
-p speed:=0.20 \
-p turn:=0.60
```
键盘默认键位(`teleop_twist_keyboard`,建议使用 US 键盘布局):
- `i`: 前进(`linear.x > 0`
- `,`: 后退(`linear.x < 0`
- `j`: 左转(`angular.z > 0`
- `l`: 右转(`angular.z < 0`
- `Shift + J`: 左平移(`linear.y > 0`
- `Shift + L`: 右平移(`linear.y < 0`
- `u` / `o` / `m` / `.`: 组合前进或后退加转向
- `k` 或其他未映射按键: 停止
- `q` / `z`: 整体速度增加 / 降低 10%
- `w` / `x`: 仅线速度增加 / 降低 10%
- `e` / `c`: 仅角速度增加 / 降低 10%
- `Ctrl-C`: 退出键盘 teleop
## 控制端 Xbox 手柄运行
UDP:
```bash
ros2 launch udp_teleop_bridge xbox_to_udp.launch.py \
transport:=udp \
server_addr:=127.0.0.1:9001 \
peer_id:=ros-gamepad-ctrl \
target_peer:=ros-bridge-ctrl \
joy_dev:=/dev/input/js0 \
frame_id:=pelvis
```
KCP:
```bash
ros2 launch udp_teleop_bridge xbox_to_udp.launch.py \
transport:=kcp \
server_addr:=127.0.0.1:9002 \
peer_id:=ros-gamepad-ctrl \
target_peer:=ros-bridge-ctrl \
joy_dev:=/dev/input/js0 \
frame_id:=pelvis
```
当前默认手柄映射:
- 左摇杆上下 -> `linear.x`
- 左摇杆左右 -> `linear.y`
- 右摇杆左右 -> `angular.z`
- `RB` 按住才允许运动
- `LB` 为 turbo
手柄实际操控含义(基于 `config/xbox_twist_joy.yaml` 的 Xbox 默认映射):
- 左摇杆向前 / 向后: 前进 / 后退
- 左摇杆向左 / 向右: 左平移 / 右平移
- 右摇杆向左 / 向右: 左转 / 右转
- 按住 `RB`: 以常速启用运动输出
- 同时按住 `LB` + `RB`: 启用 turbo更高的线速度和角速度
- 松开 `RB` 或将摇杆回中: 输出回到零速
## 数据流
键盘链路:
```text
teleop_twist_keyboard -> /teleop/cmd_vel (TwistStamped) -> cmd_vel_udp_sender -> OmniSocket UDP/KCP -> udp_cmd_vel_receiver -> /hric/robot/cmd_vel
```
手柄链路:
```text
joy_node -> teleop_twist_joy -> /teleop/cmd_vel (TwistStamped) -> cmd_vel_udp_sender -> OmniSocket UDP/KCP -> udp_cmd_vel_receiver -> /hric/robot/cmd_vel
```
## 安全行为
- sender 默认按 20 Hz 重发最新命令
- sender 输入超时后会改发零速
- sender 退出时会主动发送数个零速控制包
- receiver 超时后会在 ROS 主线程发布零速 stop
- receiver 只接受 `MSG_TYPE_BINARY` 且长度为 24 字节的负载
- 非预期 sender、非 binary 消息、错误长度消息都会被丢弃并记录日志

View File

@@ -0,0 +1,147 @@
## ROS2 Teleop over OmniSocket UDP/KCP
这个文档对应 `ros-control-py/udp_teleop_bridge` 的当前实现。
核心变化:
- `transport:=udp` 现在表示 OmniSocket UDP
- `transport:=kcp` 表示 OmniSocket KCP
- 不再使用原来的裸 `socket` UDP 实现
控制接口保持不变:
- topic: `/hric/robot/cmd_vel`
- type: `geometry_msgs/msg/TwistStamped`
- frame_id: `pelvis`
- payload: fixed 24-byte little-endian `<6f>`
负载顺序:
`lx, ly, lz, ax, ay, az`
### 构建顺序
```bash
make python-ext
make python-install
```
```bash
colcon build --packages-select udp_teleop_bridge
source install/setup.bash
```
### 启动 Hub
OmniSocket UDP:
```bash
./bin/udpserver -listen :9001
```
OmniSocket KCP:
```bash
./bin/kcpserver -listen :9002
```
### 机器人端 Receiver
```bash
ros2 launch udp_teleop_bridge robot_udp_receiver.launch.py \
transport:=udp \
server_addr:=127.0.0.1:9001 \
peer_id:=ros-bridge-ctrl \
output_topic:=/hric/robot/cmd_vel \
frame_id:=pelvis \
watchdog_timeout:=0.5
```
KCP 只需把 `transport``server_addr` 改成:
```bash
transport:=kcp server_addr:=127.0.0.1:9002
```
只接受指定 sender:
```bash
expected_sender:=ros-keyboard-ctrl
```
### 键盘 Sender
```bash
ros2 launch udp_teleop_bridge keyboard_sender.launch.py \
transport:=udp \
server_addr:=127.0.0.1:9001 \
peer_id:=ros-keyboard-ctrl \
target_peer:=ros-bridge-ctrl
```
```bash
ros2 run teleop_twist_keyboard teleop_twist_keyboard --ros-args \
--remap cmd_vel:=/teleop/cmd_vel \
-p stamped:=true \
-p frame_id:=pelvis \
-p speed:=0.20 \
-p turn:=0.60
```
### Xbox Sender
```bash
ros2 launch udp_teleop_bridge xbox_to_udp.launch.py \
transport:=udp \
server_addr:=127.0.0.1:9001 \
peer_id:=ros-gamepad-ctrl \
target_peer:=ros-bridge-ctrl \
joy_dev:=/dev/input/js0 \
frame_id:=pelvis
```
### 参数语义
- sender:
- `transport`
- `server_addr`
- `relay_via`
- `peer_id`
- `target_peer`
- `input_topic`
- `send_rate_hz`
- `input_timeout`
- receiver:
- `transport`
- `server_addr`
- `relay_via`
- `peer_id`
- `expected_sender`
- `output_topic`
- `frame_id`
- `watchdog_timeout`
- `publish_rate_hz`
`server_addr` 省略时,会按 transport 自动选择:
- `udp` -> `127.0.0.1:9001`
- `kcp` -> `127.0.0.1:9002`
### 数据流
```text
teleop_twist_keyboard / teleop_twist_joy
-> /teleop/cmd_vel (TwistStamped)
-> cmd_vel_udp_sender
-> OmniSocket UDP/KCP binary message
-> udp_cmd_vel_receiver
-> /hric/robot/cmd_vel
```
### 安全与约束
- sender 默认 20 Hz 重发
- sender 输入超时后改发零速
- receiver watchdog 超时后发零速 stop
- receiver 只接受 24 字节 binary 负载
- `relay_via` 只在 KCP 模式有效

View File

@@ -0,0 +1,32 @@
/**:
ros__parameters:
require_enable_button: true
enable_button: 5
enable_turbo_button: 4
axis_linear:
x: 1
y: 0
z: -1
scale_linear:
x: -0.30
y: -0.25
z: 0.0
scale_linear_turbo:
x: -0.60
y: -0.45
z: 0.0
axis_angular:
yaw: 3
pitch: -1
roll: -1
scale_angular:
yaw: -0.80
pitch: 0.0
roll: 0.0
scale_angular_turbo:
yaw: -1.20
pitch: 0.0
roll: 0.0
inverted_reverse: false
publish_stamped_twist: true
frame: pelvis

View File

@@ -0,0 +1,34 @@
from launch import LaunchDescription
from launch.actions import DeclareLaunchArgument
from launch.substitutions import LaunchConfiguration
from launch_ros.actions import Node
from launch_ros.parameter_descriptions import ParameterValue
def generate_launch_description() -> LaunchDescription:
return LaunchDescription([
DeclareLaunchArgument('transport', default_value='udp'),
DeclareLaunchArgument('server_addr', default_value=''),
DeclareLaunchArgument('relay_via', default_value=''),
DeclareLaunchArgument('peer_id', default_value='ros-keyboard-ctrl'),
DeclareLaunchArgument('target_peer', default_value='ros-bridge-ctrl'),
DeclareLaunchArgument('input_topic', default_value='/teleop/cmd_vel'),
DeclareLaunchArgument('send_rate_hz', default_value='20.0'),
DeclareLaunchArgument('input_timeout', default_value='0.75'),
Node(
package='udp_teleop_bridge',
executable='cmd_vel_udp_sender',
name='cmd_vel_udp_sender',
output='screen',
parameters=[{
'transport': LaunchConfiguration('transport'),
'server_addr': LaunchConfiguration('server_addr'),
'relay_via': LaunchConfiguration('relay_via'),
'peer_id': LaunchConfiguration('peer_id'),
'target_peer': LaunchConfiguration('target_peer'),
'input_topic': LaunchConfiguration('input_topic'),
'send_rate_hz': ParameterValue(LaunchConfiguration('send_rate_hz'), value_type=float),
'input_timeout': ParameterValue(LaunchConfiguration('input_timeout'), value_type=float),
}],
),
])

View File

@@ -0,0 +1,36 @@
from launch import LaunchDescription
from launch.actions import DeclareLaunchArgument
from launch.substitutions import LaunchConfiguration
from launch_ros.actions import Node
from launch_ros.parameter_descriptions import ParameterValue
def generate_launch_description() -> LaunchDescription:
return LaunchDescription([
DeclareLaunchArgument('transport', default_value='udp'),
DeclareLaunchArgument('server_addr', default_value=''),
DeclareLaunchArgument('relay_via', default_value=''),
DeclareLaunchArgument('peer_id', default_value='ros-bridge-ctrl'),
DeclareLaunchArgument('expected_sender', default_value=''),
DeclareLaunchArgument('output_topic', default_value='/hric/robot/cmd_vel'),
DeclareLaunchArgument('frame_id', default_value='pelvis'),
DeclareLaunchArgument('watchdog_timeout', default_value='0.5'),
DeclareLaunchArgument('publish_rate_hz', default_value='100.0'),
Node(
package='udp_teleop_bridge',
executable='udp_cmd_vel_receiver',
name='udp_cmd_vel_receiver',
output='screen',
parameters=[{
'transport': LaunchConfiguration('transport'),
'server_addr': LaunchConfiguration('server_addr'),
'relay_via': LaunchConfiguration('relay_via'),
'peer_id': LaunchConfiguration('peer_id'),
'expected_sender': LaunchConfiguration('expected_sender'),
'output_topic': LaunchConfiguration('output_topic'),
'frame_id': LaunchConfiguration('frame_id'),
'watchdog_timeout': ParameterValue(LaunchConfiguration('watchdog_timeout'), value_type=float),
'publish_rate_hz': ParameterValue(LaunchConfiguration('publish_rate_hz'), value_type=float),
}],
),
])

View File

@@ -0,0 +1,74 @@
from launch import LaunchDescription
from launch.actions import DeclareLaunchArgument
from launch.substitutions import LaunchConfiguration, PathJoinSubstitution
from launch_ros.actions import Node
from launch_ros.parameter_descriptions import ParameterValue
from launch_ros.substitutions import FindPackageShare
def generate_launch_description() -> LaunchDescription:
teleop_config = PathJoinSubstitution([
FindPackageShare('udp_teleop_bridge'),
'config',
'xbox_twist_joy.yaml',
])
teleop_topic = LaunchConfiguration('teleop_topic')
return LaunchDescription([
DeclareLaunchArgument('transport', default_value='udp'),
DeclareLaunchArgument('server_addr', default_value=''),
DeclareLaunchArgument('relay_via', default_value=''),
DeclareLaunchArgument('peer_id', default_value='ros-gamepad-ctrl'),
DeclareLaunchArgument('target_peer', default_value='ros-bridge-ctrl'),
DeclareLaunchArgument('joy_dev', default_value='/dev/input/js0'),
DeclareLaunchArgument('deadzone', default_value='0.10'),
DeclareLaunchArgument('autorepeat_rate', default_value='20.0'),
DeclareLaunchArgument('frame_id', default_value='pelvis'),
DeclareLaunchArgument('teleop_topic', default_value='/teleop/cmd_vel'),
DeclareLaunchArgument('send_rate_hz', default_value='20.0'),
DeclareLaunchArgument('input_timeout', default_value='0.30'),
Node(
package='joy',
executable='joy_node',
name='joy_node',
output='screen',
parameters=[{
'dev': LaunchConfiguration('joy_dev'),
'deadzone': ParameterValue(LaunchConfiguration('deadzone'), value_type=float),
'autorepeat_rate': ParameterValue(LaunchConfiguration('autorepeat_rate'), value_type=float),
}],
),
Node(
package='teleop_twist_joy',
executable='teleop_node',
name='teleop_twist_joy',
output='screen',
parameters=[
teleop_config,
{
'publish_stamped_twist': True,
'frame': LaunchConfiguration('frame_id'),
},
],
remappings=[
('cmd_vel', teleop_topic),
],
),
Node(
package='udp_teleop_bridge',
executable='cmd_vel_udp_sender',
name='cmd_vel_udp_sender',
output='screen',
parameters=[{
'transport': LaunchConfiguration('transport'),
'server_addr': LaunchConfiguration('server_addr'),
'relay_via': LaunchConfiguration('relay_via'),
'peer_id': LaunchConfiguration('peer_id'),
'target_peer': LaunchConfiguration('target_peer'),
'input_topic': teleop_topic,
'send_rate_hz': ParameterValue(LaunchConfiguration('send_rate_hz'), value_type=float),
'input_timeout': ParameterValue(LaunchConfiguration('input_timeout'), value_type=float),
}],
),
])

View File

@@ -0,0 +1,24 @@
<?xml version="1.0"?>
<package format="3">
<name>udp_teleop_bridge</name>
<version>0.1.0</version>
<description>ROS 2 OmniSocket UDP/KCP bridge for teleop TwistStamped commands.</description>
<maintainer email="codex@example.com">Codex</maintainer>
<license>MIT</license>
<buildtool_depend>ament_python</buildtool_depend>
<exec_depend>ament_index_python</exec_depend>
<exec_depend>geometry_msgs</exec_depend>
<exec_depend>joy</exec_depend>
<exec_depend>launch</exec_depend>
<exec_depend>launch_ros</exec_depend>
<exec_depend>rclpy</exec_depend>
<exec_depend>teleop_twist_joy</exec_depend>
<exec_depend>teleop_twist_keyboard</exec_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>

View File

@@ -0,0 +1 @@
udp_teleop_bridge

View File

@@ -0,0 +1,5 @@
[develop]
script_dir=$base/lib/udp_teleop_bridge
[install]
install_scripts=$base/lib/udp_teleop_bridge

View File

@@ -0,0 +1,34 @@
from setuptools import find_packages, setup
package_name = 'udp_teleop_bridge'
setup(
name=package_name,
version='0.1.0',
packages=find_packages(exclude=['test']),
data_files=[
('share/ament_index/resource_index/packages', [f'resource/{package_name}']),
(f'share/{package_name}', ['package.xml']),
(f'share/{package_name}/launch', [
'launch/keyboard_sender.launch.py',
'launch/robot_udp_receiver.launch.py',
'launch/xbox_to_udp.launch.py',
]),
(f'share/{package_name}/config', ['config/xbox_twist_joy.yaml']),
],
install_requires=['setuptools'],
zip_safe=True,
maintainer='Codex',
maintainer_email='codex@example.com',
description='ROS 2 OmniSocket UDP/KCP bridge for teleop TwistStamped commands.',
license='MIT',
tests_require=['pytest'],
entry_points={
'console_scripts': [
'cmd_vel_udp_sender = udp_teleop_bridge.cmd_vel_udp_sender:main',
'udp_cmd_vel_receiver = udp_teleop_bridge.udp_cmd_vel_receiver:main',
],
},
)

View File

@@ -0,0 +1,54 @@
from pathlib import Path
import sys
import pytest
ROOT = Path(__file__).resolve().parents[1]
if str(ROOT) not in sys.path:
sys.path.insert(0, str(ROOT))
from udp_teleop_bridge.protocol import ( # noqa: E402
PACKET_SIZE,
default_server_addr_for_transport,
normalize_command,
normalize_transport,
pack_command,
unpack_command,
)
def test_pack_unpack_round_trip() -> None:
command = (0.1, -0.2, 0.3, -0.4, 0.5, -0.6)
payload = pack_command(command)
assert len(payload) == PACKET_SIZE
assert unpack_command(payload) == pytest.approx(command)
@pytest.mark.parametrize('value', [float('nan'), float('inf'), float('-inf')])
def test_normalize_command_rejects_non_finite_values(value: float) -> None:
with pytest.raises(ValueError, match='non-finite'):
normalize_command((0.0, 0.0, value, 0.0, 0.0, 0.0))
def test_unpack_command_rejects_wrong_length() -> None:
with pytest.raises(ValueError, match='Expected'):
unpack_command(b'\x00' * (PACKET_SIZE - 1))
@pytest.mark.parametrize(
('transport', 'expected'),
[
('udp', '127.0.0.1:9001'),
('kcp', '127.0.0.1:9002'),
],
)
def test_default_server_addr_for_transport(transport: str, expected: str) -> None:
assert default_server_addr_for_transport(transport) == expected
def test_normalize_transport_rejects_unknown_value() -> None:
with pytest.raises(ValueError, match='Unsupported transport'):
normalize_transport('sctp')

View File

@@ -0,0 +1 @@
"""OmniSocket teleop bridge package."""

View File

@@ -0,0 +1,207 @@
"""ROS 2 node that forwards TwistStamped teleop commands over OmniSocket."""
from __future__ import annotations
import threading
import time
from typing import Dict, Optional, Tuple
import rclpy
from geometry_msgs.msg import TwistStamped
from rclpy.node import Node
from .omni_transport import MSG_TYPE_ERROR, OmniTransport
from .protocol import (
DEFAULT_EXIT_ZERO_PACKETS,
DEFAULT_INPUT_TIMEOUT,
DEFAULT_INPUT_TOPIC,
DEFAULT_KEYBOARD_PEER_ID,
DEFAULT_QUEUE_DEPTH,
DEFAULT_SEND_RATE_HZ,
DEFAULT_TARGET_PEER,
DEFAULT_TRANSPORT,
ZERO_COMMAND,
pack_command,
)
CommandTuple = Tuple[float, float, float, float, float, float]
class CmdVelUdpSender(Node):
"""Forward TwistStamped messages to a remote OmniSocket peer."""
def __init__(self) -> None:
super().__init__('cmd_vel_udp_sender')
self.declare_parameter('transport', DEFAULT_TRANSPORT)
self.declare_parameter('server_addr', '')
self.declare_parameter('relay_via', '')
self.declare_parameter('peer_id', DEFAULT_KEYBOARD_PEER_ID)
self.declare_parameter('target_peer', DEFAULT_TARGET_PEER)
self.declare_parameter('input_topic', DEFAULT_INPUT_TOPIC)
self.declare_parameter('send_rate_hz', DEFAULT_SEND_RATE_HZ)
self.declare_parameter('input_timeout', DEFAULT_INPUT_TIMEOUT)
self.declare_parameter('queue_depth', DEFAULT_QUEUE_DEPTH)
self.declare_parameter('exit_zero_packets', DEFAULT_EXIT_ZERO_PACKETS)
self._transport_name = str(self.get_parameter('transport').value)
self._server_addr = str(self.get_parameter('server_addr').value)
self._relay_via = str(self.get_parameter('relay_via').value)
self._peer_id = str(self.get_parameter('peer_id').value)
self._target_peer = str(self.get_parameter('target_peer').value).strip()
self._input_topic = str(self.get_parameter('input_topic').value)
self._send_rate_hz = float(self.get_parameter('send_rate_hz').value)
self._input_timeout = float(self.get_parameter('input_timeout').value)
self._queue_depth = int(self.get_parameter('queue_depth').value)
self._exit_zero_packets = int(self.get_parameter('exit_zero_packets').value)
if self._send_rate_hz <= 0.0:
raise ValueError('send_rate_hz must be > 0')
if self._input_timeout < 0.0:
raise ValueError('input_timeout must be >= 0')
if self._queue_depth <= 0:
raise ValueError('queue_depth must be > 0')
if not self._target_peer:
raise ValueError('target_peer must not be empty')
self._transport = OmniTransport(
transport=self._transport_name,
server_addr=self._server_addr,
relay_via=self._relay_via,
peer_id=self._peer_id,
)
self._last_log_times: Dict[str, float] = {}
self._latest_command: CommandTuple = ZERO_COMMAND
self._last_input_monotonic: Optional[float] = None
self._last_sent_command: Optional[CommandTuple] = None
self._closing = threading.Event()
self.create_subscription(
TwistStamped,
self._input_topic,
self._handle_twist,
self._queue_depth,
)
self.create_timer(1.0 / self._send_rate_hz, self._send_latest_command)
self._drain_thread = threading.Thread(target=self._drain_incoming, daemon=True)
self._drain_thread.start()
self.get_logger().info(
'Forwarding TwistStamped from %s via %s://%s as %s -> %s at %.1f Hz '
'(input timeout %.2f s)'
% (
self._input_topic,
self._transport.transport,
self._transport.server_addr,
self._peer_id,
self._target_peer,
self._send_rate_hz,
self._input_timeout,
)
)
def _should_log(self, key: str, throttle_sec: float) -> bool:
now = time.monotonic()
previous = self._last_log_times.get(key)
if previous is None or (now - previous) >= throttle_sec:
self._last_log_times[key] = now
return True
return False
def _handle_twist(self, msg: TwistStamped) -> None:
self._latest_command = (
float(msg.twist.linear.x),
float(msg.twist.linear.y),
float(msg.twist.linear.z),
float(msg.twist.angular.x),
float(msg.twist.angular.y),
float(msg.twist.angular.z),
)
self._last_input_monotonic = time.monotonic()
def _command_for_current_tick(self) -> CommandTuple:
if self._last_input_monotonic is None:
return ZERO_COMMAND
if self._input_timeout == 0.0:
return self._latest_command
age = time.monotonic() - self._last_input_monotonic
if age > self._input_timeout:
return ZERO_COMMAND
return self._latest_command
def _send_command(self, command: CommandTuple) -> None:
payload = pack_command(command)
try:
self._transport.send(to=self._target_peer, data=payload)
self._last_sent_command = command
except OSError as exc:
if self._should_log('send_error', 2.0):
self.get_logger().error(f'OmniSocket send failed: {exc}')
def _send_latest_command(self) -> None:
self._send_command(self._command_for_current_tick())
def _log_inbound_message(self, from_peer: str, msg_type: int, payload: bytes) -> None:
if msg_type == MSG_TYPE_ERROR:
if self._should_log('server_error', 1.0):
text = payload.decode('utf-8', errors='replace')
self.get_logger().error(f'OmniSocket server error from {from_peer}: {text}')
return
if self._should_log('unexpected_inbound', 2.0):
self.get_logger().warning(
'Ignoring unexpected inbound message type %d from %s (%d bytes)'
% (msg_type, from_peer, len(payload))
)
def _drain_incoming(self) -> None:
while not self._closing.is_set() and rclpy.ok():
try:
result = self._transport.recv(timeout_ms=100)
except OSError as exc:
if not self._closing.is_set() and self._should_log('drain_error', 2.0):
self.get_logger().error(f'OmniSocket receive loop stopped: {exc}')
return
if result is None:
continue
from_peer, msg_type, payload = result
self._log_inbound_message(from_peer, msg_type, payload)
def send_zero_burst(self) -> None:
"""Best-effort stop command sent during shutdown."""
for _ in range(max(1, self._exit_zero_packets)):
self._send_command(ZERO_COMMAND)
time.sleep(0.02)
def close(self) -> None:
self._closing.set()
if hasattr(self, '_transport') and self._transport is not None:
try:
self._transport.close()
except OSError as exc:
if self._should_log('close_error', 2.0):
self.get_logger().warning(f'Closing OmniSocket transport failed: {exc}')
self._transport = None
if hasattr(self, '_drain_thread') and self._drain_thread.is_alive():
self._drain_thread.join(timeout=0.5)
def destroy_node(self) -> bool:
self.close()
return super().destroy_node()
def main(args: Optional[list[str]] = None) -> None:
rclpy.init(args=args)
node = CmdVelUdpSender()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.send_zero_burst()
node.destroy_node()
rclpy.shutdown()

View File

@@ -0,0 +1,95 @@
"""Helpers for working with OmniSocket transport sessions."""
from __future__ import annotations
from .protocol import default_server_addr_for_transport, normalize_transport
try:
from omnisocket import (
CONTROL_DEFAULTS,
MSG_TYPE_BINARY,
MSG_TYPE_ERROR,
Session,
UdpSession,
)
except ImportError as exc: # pragma: no cover - depends on external build/install
raise RuntimeError(
'omnisocket is not installed for this Python environment; run '
'`make python-ext && make python-install` on a Linux host first'
) from exc
def _normalize_optional(value: object) -> str:
return str(value).strip()
class OmniTransport:
"""Small wrapper that normalizes OmniSocket UDP/KCP session setup."""
def __init__(
self,
*,
transport: object,
server_addr: object,
peer_id: object,
relay_via: object = '',
bind_ip: object = '',
bind_device: object = '',
enable_timestamping: bool = False,
) -> None:
self.transport = normalize_transport(transport)
self.server_addr = _normalize_optional(server_addr) or default_server_addr_for_transport(self.transport)
self.peer_id = _normalize_optional(peer_id)
self.relay_via = _normalize_optional(relay_via)
self.bind_ip = _normalize_optional(bind_ip)
self.bind_device = _normalize_optional(bind_device)
if not self.peer_id:
raise ValueError('peer_id must not be empty')
session_cls = Session if self.transport == 'kcp' else UdpSession
self._session = session_cls()
connect_kwargs: dict[str, object] = {
'server_addr': self.server_addr,
'peer_id': self.peer_id,
}
if self.bind_ip:
connect_kwargs['bind_ip'] = self.bind_ip
if self.bind_device:
connect_kwargs['bind_device'] = self.bind_device
if self.transport == 'kcp':
if self.relay_via:
connect_kwargs['relay_via'] = self.relay_via
connect_kwargs.update(CONTROL_DEFAULTS)
else:
connect_kwargs['enable_timestamping'] = bool(enable_timestamping)
self._session.connect(**connect_kwargs)
def send(self, *, to: str, data: bytes) -> None:
self._session.send(to=to, data=data)
def recv(self, *, timeout_ms: int = -1):
return self._session.recv(timeout_ms=timeout_ms)
def recv_into(self, *, buffer, timeout_ms: int = -1):
return self._session.recv_into(buffer=buffer, timeout_ms=timeout_ms)
def close(self) -> None:
self._session.close()
def stats(self) -> dict[str, int]:
return self._session.stats()
__all__ = [
'CONTROL_DEFAULTS',
'MSG_TYPE_BINARY',
'MSG_TYPE_ERROR',
'OmniTransport',
'Session',
'UdpSession',
]

View File

@@ -0,0 +1,74 @@
"""Shared teleop protocol helpers and transport defaults."""
from __future__ import annotations
import math
import struct
from typing import Iterable, Tuple
COMMAND_STRUCT = struct.Struct('<6f')
PACKET_SIZE = COMMAND_STRUCT.size
SUPPORTED_TRANSPORTS = ('udp', 'kcp')
DEFAULT_TRANSPORT = 'udp'
DEFAULT_OMNI_UDP_SERVER_ADDR = '127.0.0.1:9001'
DEFAULT_OMNI_KCP_SERVER_ADDR = '127.0.0.1:9002'
DEFAULT_KEYBOARD_PEER_ID = 'ros-keyboard-ctrl'
DEFAULT_GAMEPAD_PEER_ID = 'ros-gamepad-ctrl'
DEFAULT_BRIDGE_PEER_ID = 'ros-bridge-ctrl'
DEFAULT_TARGET_PEER = DEFAULT_BRIDGE_PEER_ID
DEFAULT_FRAME_ID = 'pelvis'
DEFAULT_INPUT_TOPIC = '/teleop/cmd_vel'
DEFAULT_OUTPUT_TOPIC = '/hric/robot/cmd_vel'
DEFAULT_SEND_RATE_HZ = 20.0
DEFAULT_INPUT_TIMEOUT = 0.75
DEFAULT_WATCHDOG_TIMEOUT = 0.5
DEFAULT_PUBLISH_RATE_HZ = 100.0
DEFAULT_QUEUE_DEPTH = 10
DEFAULT_EXIT_ZERO_PACKETS = 3
DEFAULT_RECV_BUFFER_BYTES = 2048
ZERO_COMMAND = (0.0, 0.0, 0.0, 0.0, 0.0, 0.0)
def normalize_transport(value: object) -> str:
"""Return a supported transport name."""
transport = str(value).strip().lower()
if transport not in SUPPORTED_TRANSPORTS:
supported = ', '.join(SUPPORTED_TRANSPORTS)
raise ValueError(f"Unsupported transport '{transport}', expected one of: {supported}")
return transport
def default_server_addr_for_transport(transport: str) -> str:
"""Return the default OmniSocket server for the chosen transport."""
transport = normalize_transport(transport)
if transport == 'udp':
return DEFAULT_OMNI_UDP_SERVER_ADDR
return DEFAULT_OMNI_KCP_SERVER_ADDR
def normalize_command(values: Iterable[float]) -> Tuple[float, float, float, float, float, float]:
"""Return a finite six-float command tuple."""
command = tuple(float(value) for value in values)
if len(command) != 6:
raise ValueError(f'Expected 6 command values, got {len(command)}')
if any(not math.isfinite(value) for value in command):
raise ValueError('Command contains a non-finite value')
return command
def pack_command(values: Iterable[float]) -> bytes:
"""Pack six floats into the wire format."""
return COMMAND_STRUCT.pack(*normalize_command(values))
def unpack_command(payload: bytes) -> Tuple[float, float, float, float, float, float]:
"""Decode a control packet into a six-float command tuple."""
if len(payload) != PACKET_SIZE:
raise ValueError(f'Expected {PACKET_SIZE} bytes, got {len(payload)}')
return normalize_command(COMMAND_STRUCT.unpack(payload))

View File

@@ -0,0 +1,238 @@
"""ROS 2 node that receives OmniSocket teleop packets and republishes TwistStamped."""
from __future__ import annotations
import threading
import time
from typing import Dict, Optional, Tuple
import rclpy
from geometry_msgs.msg import TwistStamped
from rclpy.node import Node
from .omni_transport import MSG_TYPE_BINARY, MSG_TYPE_ERROR, OmniTransport
from .protocol import (
DEFAULT_BRIDGE_PEER_ID,
DEFAULT_FRAME_ID,
DEFAULT_OUTPUT_TOPIC,
DEFAULT_PUBLISH_RATE_HZ,
DEFAULT_QUEUE_DEPTH,
DEFAULT_RECV_BUFFER_BYTES,
DEFAULT_TRANSPORT,
DEFAULT_WATCHDOG_TIMEOUT,
PACKET_SIZE,
ZERO_COMMAND,
unpack_command,
)
CommandTuple = Tuple[float, float, float, float, float, float]
class UdpCmdVelReceiver(Node):
"""Publish TwistStamped commands from the OmniSocket control wire format."""
def __init__(self) -> None:
super().__init__('udp_cmd_vel_receiver')
self.declare_parameter('transport', DEFAULT_TRANSPORT)
self.declare_parameter('server_addr', '')
self.declare_parameter('relay_via', '')
self.declare_parameter('peer_id', DEFAULT_BRIDGE_PEER_ID)
self.declare_parameter('expected_sender', '')
self.declare_parameter('output_topic', DEFAULT_OUTPUT_TOPIC)
self.declare_parameter('frame_id', DEFAULT_FRAME_ID)
self.declare_parameter('watchdog_timeout', DEFAULT_WATCHDOG_TIMEOUT)
self.declare_parameter('publish_rate_hz', DEFAULT_PUBLISH_RATE_HZ)
self.declare_parameter('queue_depth', DEFAULT_QUEUE_DEPTH)
self._transport_name = str(self.get_parameter('transport').value)
self._server_addr = str(self.get_parameter('server_addr').value)
self._relay_via = str(self.get_parameter('relay_via').value)
self._peer_id = str(self.get_parameter('peer_id').value)
self._expected_sender = str(self.get_parameter('expected_sender').value).strip()
self._output_topic = str(self.get_parameter('output_topic').value)
self._frame_id = str(self.get_parameter('frame_id').value)
self._watchdog_timeout = float(self.get_parameter('watchdog_timeout').value)
self._publish_rate_hz = float(self.get_parameter('publish_rate_hz').value)
self._queue_depth = int(self.get_parameter('queue_depth').value)
if self._watchdog_timeout <= 0.0:
raise ValueError('watchdog_timeout must be > 0')
if self._publish_rate_hz <= 0.0:
raise ValueError('publish_rate_hz must be > 0')
if self._queue_depth <= 0:
raise ValueError('queue_depth must be > 0')
self._publisher = self.create_publisher(TwistStamped, self._output_topic, self._queue_depth)
self._transport = OmniTransport(
transport=self._transport_name,
server_addr=self._server_addr,
relay_via=self._relay_via,
peer_id=self._peer_id,
)
self._lock = threading.Lock()
self._last_log_times: Dict[str, float] = {}
self._latest_command: CommandTuple = ZERO_COMMAND
self._last_packet_monotonic: Optional[float] = None
self._last_published_command: CommandTuple = ZERO_COMMAND
self._closing = threading.Event()
self._recv_buffer = bytearray(DEFAULT_RECV_BUFFER_BYTES)
self.create_timer(1.0 / self._publish_rate_hz, self._publish_tick)
self._recv_thread = threading.Thread(target=self._recv_loop, daemon=True)
self._recv_thread.start()
self.get_logger().info(
'Receiving teleop commands via %s://%s as %s and publishing TwistStamped to %s '
'at %.1f Hz (frame_id=%s, watchdog %.2f s)'
% (
self._transport.transport,
self._transport.server_addr,
self._peer_id,
self._output_topic,
self._publish_rate_hz,
self._frame_id,
self._watchdog_timeout,
)
)
def _should_log(self, key: str, throttle_sec: float) -> bool:
now = time.monotonic()
previous = self._last_log_times.get(key)
if previous is None or (now - previous) >= throttle_sec:
self._last_log_times[key] = now
return True
return False
def _publish_command(self, command: CommandTuple) -> None:
msg = TwistStamped()
msg.header.stamp = self.get_clock().now().to_msg()
msg.header.frame_id = self._frame_id
msg.twist.linear.x = command[0]
msg.twist.linear.y = command[1]
msg.twist.linear.z = command[2]
msg.twist.angular.x = command[3]
msg.twist.angular.y = command[4]
msg.twist.angular.z = command[5]
self._publisher.publish(msg)
self._last_published_command = command
def _handle_error_message(self, from_peer: str, body_len: int) -> None:
if self._should_log('server_error', 1.0):
text = bytes(self._recv_buffer[:body_len]).decode('utf-8', errors='replace')
self.get_logger().error(f'OmniSocket server error from {from_peer}: {text}')
def _recv_loop(self) -> None:
while not self._closing.is_set() and rclpy.ok():
try:
meta = self._transport.recv_into(buffer=self._recv_buffer, timeout_ms=100)
except BufferError as exc:
if self._should_log('buffer_error', 2.0):
self.get_logger().warning(f'Dropped oversized OmniSocket frame: {exc}')
continue
except OSError as exc:
if not self._closing.is_set() and self._should_log('recv_error', 2.0):
self.get_logger().error(f'OmniSocket receive loop stopped: {exc}')
return
if meta is None:
continue
from_peer = str(meta['from'])
msg_type = int(meta['msg_type'])
body_len = int(meta['body_len'])
if msg_type == MSG_TYPE_ERROR:
self._handle_error_message(from_peer, body_len)
continue
if self._expected_sender and from_peer != self._expected_sender:
if self._should_log('unexpected_sender', 2.0):
self.get_logger().warning(
'Ignoring message from unexpected sender %s (expected %s)'
% (from_peer, self._expected_sender)
)
continue
if msg_type != MSG_TYPE_BINARY:
if self._should_log('unexpected_type', 2.0):
self.get_logger().warning(
'Ignoring unexpected message type %d from %s (%d bytes)'
% (msg_type, from_peer, body_len)
)
continue
if body_len != PACKET_SIZE:
if self._should_log('packet_size', 2.0):
self.get_logger().warning(
'Dropped binary payload from %s with invalid size %d (expected %d)'
% (from_peer, body_len, PACKET_SIZE)
)
continue
try:
command = unpack_command(self._recv_buffer[:PACKET_SIZE])
except ValueError as exc:
if self._should_log('decode_error', 2.0):
self.get_logger().warning(f'Dropped malformed command payload: {exc}')
continue
with self._lock:
self._latest_command = command
self._last_packet_monotonic = time.monotonic()
def _command_for_publish_tick(self) -> tuple[CommandTuple, Optional[float], bool]:
with self._lock:
latest_command = self._latest_command
last_packet_monotonic = self._last_packet_monotonic
if last_packet_monotonic is None:
return ZERO_COMMAND, None, False
age = time.monotonic() - last_packet_monotonic
if age > self._watchdog_timeout:
return ZERO_COMMAND, age, True
return latest_command, age, False
def _publish_tick(self) -> None:
publish_command, age, timed_out = self._command_for_publish_tick()
if timed_out and self._last_published_command != ZERO_COMMAND:
if self._should_log('watchdog_stop', 2.0):
self.get_logger().warning(
'Command stream timed out after %.2f s, publishing zero velocity stop'
% age
)
self._publish_command(publish_command)
def close(self) -> None:
self._closing.set()
if hasattr(self, '_transport') and self._transport is not None:
try:
self._transport.close()
except OSError as exc:
if self._should_log('close_error', 2.0):
self.get_logger().warning(f'Closing OmniSocket transport failed: {exc}')
self._transport = None
if hasattr(self, '_recv_thread') and self._recv_thread.is_alive():
self._recv_thread.join(timeout=0.5)
def destroy_node(self) -> bool:
self.close()
return super().destroy_node()
def main(args: Optional[list[str]] = None) -> None:
rclpy.init(args=args)
node = UdpCmdVelReceiver()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()

View File

@@ -1,6 +1,8 @@
#include "peer_udp_client.h"
#include <poll.h>
#include <pthread.h>
#include <string.h>
struct udp_client {
char id[OMNI_MAX_PEER_ID];
@@ -17,6 +19,19 @@ static int client_next_message_id(udp_client_t *client, uint64_t *out_id) {
return 0;
}
static void udp_client_fill_recv_meta(udp_client_recv_meta_t *meta, const message_t *msg) {
if (meta == NULL || msg == NULL) {
return;
}
memset(meta, 0, sizeof(*meta));
meta->type = msg->type;
meta->id = msg->id;
meta->body_len = msg->body_len;
snprintf(meta->from, sizeof(meta->from), "%s", msg->from);
snprintf(meta->to, sizeof(meta->to), "%s", msg->to);
snprintf(meta->file_name, sizeof(meta->file_name), "%s", msg->file_name);
}
static int client_persist_message_to_disk(const message_t *msg, const char *inbox_dir, char *out_path, size_t out_path_len) {
char path[512];
if (omni_ensure_dir(inbox_dir) != 0) {
@@ -70,7 +85,7 @@ static int client_persist_message_to_disk(const message_t *msg, const char *inbo
return 0;
}
udp_client_t *udp_client_dial(const char *server_addr, const char *peer_id, const char *bind_ip, latency_logger_t *logger, tx_timestamp_debug_logger_t *debug_logger, int enable_timestamping) {
udp_client_t *udp_client_dial_with_options(const char *server_addr, const char *peer_id, const char *bind_ip, const char *bind_device, latency_logger_t *logger, tx_timestamp_debug_logger_t *debug_logger, int enable_timestamping) {
udp_client_t *client;
message_t register_msg;
client = (udp_client_t *) calloc(1, sizeof(*client));
@@ -80,7 +95,7 @@ udp_client_t *udp_client_dial(const char *server_addr, const char *peer_id, cons
snprintf(client->id, sizeof(client->id), "%s", peer_id);
pthread_mutex_init(&client->id_mu, NULL);
client->logger = logger;
client->conn = udp_conn_dial(server_addr, bind_ip, NULL, enable_timestamping, logger, OMNI_NODE_ROLE_PEER, peer_id, debug_logger);
client->conn = udp_conn_dial(server_addr, bind_ip, bind_device, enable_timestamping, logger, OMNI_NODE_ROLE_PEER, peer_id, debug_logger);
if (client->conn == NULL) {
udp_client_free(client);
return NULL;
@@ -97,6 +112,10 @@ udp_client_t *udp_client_dial(const char *server_addr, const char *peer_id, cons
return client;
}
udp_client_t *udp_client_dial(const char *server_addr, const char *peer_id, const char *bind_ip, latency_logger_t *logger, tx_timestamp_debug_logger_t *debug_logger, int enable_timestamping) {
return udp_client_dial_with_options(server_addr, peer_id, bind_ip, NULL, logger, debug_logger, enable_timestamping);
}
const char *udp_client_id(const udp_client_t *client) {
return client == NULL ? "" : client->id;
}
@@ -124,6 +143,38 @@ int udp_client_send_text(udp_client_t *client, const char *to, const char *text)
return 0;
}
int udp_client_send_binary(udp_client_t *client, const char *to, const void *data, size_t data_len) {
message_t msg;
uint64_t id;
if (client == NULL || to == NULL || (data == NULL && data_len > 0)) {
errno = EINVAL;
return -1;
}
protocol_message_init(&msg);
client_next_message_id(client, &id);
msg.type = MSG_TYPE_BINARY;
msg.id = id;
snprintf(msg.from, sizeof(msg.from), "%s", client->id);
snprintf(msg.to, sizeof(msg.to), "%s", to);
if (data_len > 0) {
msg.body = (uint8_t *) malloc(data_len);
if (msg.body == NULL) {
return -1;
}
memcpy(msg.body, data, data_len);
}
msg.body_len = data_len;
latencylog_log_message_event(client->logger, OMNI_NODE_ROLE_PEER, client->id, EVENT_A_APP_PREP_BEGIN, &msg);
if (udp_conn_send(client->conn, &msg) != 0) {
protocol_message_clear(&msg);
return -1;
}
protocol_message_clear(&msg);
return 0;
}
int udp_client_send_file_path(udp_client_t *client, const char *to, const char *path) {
message_t msg;
uint64_t id;
@@ -151,7 +202,34 @@ int udp_client_send_file_path(udp_client_t *client, const char *to, const char *
return 0;
}
int udp_client_receive(udp_client_t *client, message_t *out_msg) {
int udp_client_receive_timed(udp_client_t *client, message_t *out_msg, int timeout_ms) {
if (client == NULL || out_msg == NULL) {
errno = EINVAL;
return -1;
}
if (timeout_ms >= 0) {
struct pollfd pfd;
int rc;
memset(&pfd, 0, sizeof(pfd));
pfd.fd = udp_conn_fd(client->conn);
pfd.events = POLLIN | POLLERR | POLLHUP;
do {
rc = poll(&pfd, 1, timeout_ms);
} while (rc < 0 && errno == EINTR);
if (rc == 0) {
return 1;
}
if (rc < 0) {
return -1;
}
if ((pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) != 0 && (pfd.revents & POLLIN) == 0) {
errno = ECONNRESET;
return -1;
}
}
if (udp_conn_receive(client->conn, out_msg, NULL, NULL) != 0) {
return -1;
}
@@ -159,6 +237,39 @@ int udp_client_receive(udp_client_t *client, message_t *out_msg) {
return 0;
}
int udp_client_receive(udp_client_t *client, message_t *out_msg) {
return udp_client_receive_timed(client, out_msg, -1);
}
int udp_client_receive_into(udp_client_t *client, void *buffer, size_t buffer_len, udp_client_recv_meta_t *out_meta, int timeout_ms) {
message_t msg;
int rc;
if (client == NULL || (buffer == NULL && buffer_len > 0) || out_meta == NULL) {
errno = EINVAL;
return -1;
}
protocol_message_init(&msg);
rc = udp_client_receive_timed(client, &msg, timeout_ms);
if (rc != 0) {
return rc;
}
udp_client_fill_recv_meta(out_meta, &msg);
if (msg.body_len > buffer_len) {
protocol_message_clear(&msg);
errno = EMSGSIZE;
return 2;
}
if (msg.body_len > 0) {
memcpy(buffer, msg.body, msg.body_len);
}
protocol_message_clear(&msg);
return 0;
}
int udp_client_persist_message(udp_client_t *client, const message_t *msg, const char *inbox_dir, char *out_path, size_t out_path_len) {
if (!latencylog_is_business_message(msg)) {
errno = EINVAL;

View File

@@ -1799,6 +1799,8 @@ int kcp_conn_close(kcp_conn_t *conn) {
pthread_mutex_lock(&conn->kcp_mu);
atomic_store(&conn->closed, 1);
if (conn->owns_socket && !conn->socket_closed) {
/* Wake the blocking recv thread before closing the shared UDP socket. */
(void) shutdown(conn->fd, SHUT_RDWR);
close(conn->fd);
conn->socket_closed = 1;
}

View File

@@ -3,6 +3,7 @@
#include <arpa/inet.h>
#include <netdb.h>
#include <pthread.h>
#include <sys/socket.h>
#include <unistd.h>
typedef struct udp_pending_tx {
@@ -413,6 +414,13 @@ int udp_conn_receive(udp_conn_t *conn, message_t *out_msg, struct sockaddr_stora
}
n = recvmsg(conn->fd, &msg, 0);
if (n < 0) {
if (conn->closed) {
errno = ECANCELED;
}
return -1;
}
if (n == 0 && conn->closed) {
errno = ECANCELED;
return -1;
}
if (conn->timestamping_enabled) {
@@ -454,6 +462,8 @@ int udp_conn_close(udp_conn_t *conn) {
}
if (!conn->closed) {
conn->closed = 1;
/* Wake blocking recvmsg()/poll users before tearing down the socket. */
(void) shutdown(conn->fd, SHUT_RDWR);
close(conn->fd);
if (conn->errqueue_thread_started) {
pthread_join(conn->errqueue_thread, NULL);