feat: C控制程序对接KCP
This commit is contained in:
247
ros-control-c/robot/udp_ros_bridge.py
Normal file
247
ros-control-c/robot/udp_ros_bridge.py
Normal file
@@ -0,0 +1,247 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
udp_ros_bridge.py — UDP/KCP → ROS2 TwistStamped bridge
|
||||
|
||||
Receives 24-byte binary twist commands from keyboard/gamepad controllers
|
||||
via UDP or OmniSocket/KCP and publishes geometry_msgs/msg/TwistStamped to
|
||||
/hric/robot/cmd_vel.
|
||||
|
||||
Usage:
|
||||
ros2 run <your_pkg> udp_ros_bridge (if installed as a ROS2 package)
|
||||
python3 udp_ros_bridge.py (standalone)
|
||||
|
||||
ROS2 parameters:
|
||||
transport (string) — udp or kcp (default udp)
|
||||
udp_port (int) — UDP listen port (default 9870)
|
||||
kcp_server (string) — KCP hub addr (default 127.0.0.1:9002)
|
||||
kcp_relay_via (string) — optional relay addr (default "")
|
||||
peer_id (string) — local KCP peer id (default ros-bridge-ctrl)
|
||||
expected_sender (string) — optional sender filter (default "")
|
||||
topic (string) — publish topic (default /hric/robot/cmd_vel)
|
||||
frame_id (string) — TwistStamped frame_id (default pelvis)
|
||||
timeout (float) — watchdog timeout seconds (default 0.5)
|
||||
"""
|
||||
|
||||
from pathlib import Path
|
||||
import struct
|
||||
import socket
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
|
||||
import rclpy
|
||||
from rclpy.node import Node
|
||||
from geometry_msgs.msg import TwistStamped
|
||||
|
||||
TWIST_CMD_FMT = '<6f' # 6 little-endian floats, 24 bytes
|
||||
TWIST_CMD_SIZE = struct.calcsize(TWIST_CMD_FMT)
|
||||
|
||||
|
||||
def _load_omnisocket():
|
||||
try:
|
||||
from omnisocket import CONTROL_DEFAULTS, MSG_TYPE_BINARY, MSG_TYPE_ERROR, Session
|
||||
return CONTROL_DEFAULTS, MSG_TYPE_BINARY, MSG_TYPE_ERROR, Session
|
||||
except ImportError:
|
||||
root = Path(__file__).resolve().parents[2]
|
||||
python_dir = root / 'python'
|
||||
if str(python_dir) not in sys.path:
|
||||
sys.path.insert(0, str(python_dir))
|
||||
from omnisocket import CONTROL_DEFAULTS, MSG_TYPE_BINARY, MSG_TYPE_ERROR, Session
|
||||
return CONTROL_DEFAULTS, MSG_TYPE_BINARY, MSG_TYPE_ERROR, Session
|
||||
|
||||
|
||||
class UdpTeleopBridge(Node):
|
||||
|
||||
def __init__(self):
|
||||
super().__init__('udp_teleop_bridge')
|
||||
|
||||
# declare parameters
|
||||
self.declare_parameter('transport', 'udp')
|
||||
self.declare_parameter('udp_port', 9870)
|
||||
self.declare_parameter('kcp_server', '127.0.0.1:9002')
|
||||
self.declare_parameter('kcp_relay_via', '')
|
||||
self.declare_parameter('peer_id', 'ros-bridge-ctrl')
|
||||
self.declare_parameter('expected_sender', '')
|
||||
self.declare_parameter('topic', '/hric/robot/cmd_vel')
|
||||
self.declare_parameter('frame_id', 'pelvis')
|
||||
self.declare_parameter('timeout', 0.5)
|
||||
|
||||
self._transport = str(self.get_parameter('transport').value).strip().lower()
|
||||
self._port = self.get_parameter('udp_port').value
|
||||
self._kcp_server = str(self.get_parameter('kcp_server').value)
|
||||
self._kcp_relay_via = str(self.get_parameter('kcp_relay_via').value)
|
||||
self._peer_id = str(self.get_parameter('peer_id').value)
|
||||
self._expected_sender = str(self.get_parameter('expected_sender').value)
|
||||
self._topic = self.get_parameter('topic').value
|
||||
self._frame_id = self.get_parameter('frame_id').value
|
||||
self._timeout = self.get_parameter('timeout').value
|
||||
|
||||
if self._transport not in ('udp', 'kcp'):
|
||||
raise ValueError(f"Unsupported transport '{self._transport}', expected 'udp' or 'kcp'")
|
||||
|
||||
# publisher
|
||||
self._pub = self.create_publisher(TwistStamped, self._topic, 10)
|
||||
|
||||
# watchdog timer
|
||||
self._last_recv = time.monotonic()
|
||||
self._lock = threading.Lock()
|
||||
self._latest_cmd = (0.0, 0.0, 0.0, 0.0, 0.0, 0.0)
|
||||
self._timer = self.create_timer(1.0 / 20.0, self._timer_cb)
|
||||
self._sock = None
|
||||
self._session = None
|
||||
self._msg_type_binary = None
|
||||
self._msg_type_error = None
|
||||
self._closing = False
|
||||
|
||||
if self._transport == 'kcp':
|
||||
control_defaults, self._msg_type_binary, self._msg_type_error, session_cls = _load_omnisocket()
|
||||
self._session = session_cls()
|
||||
self._session.connect(
|
||||
server_addr=self._kcp_server,
|
||||
peer_id=self._peer_id,
|
||||
relay_via=self._kcp_relay_via,
|
||||
**control_defaults,
|
||||
)
|
||||
else:
|
||||
self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||
self._sock.bind(('0.0.0.0', self._port))
|
||||
self._sock.settimeout(0.1)
|
||||
|
||||
# receive thread
|
||||
recv_target = self._recv_loop_kcp if self._transport == 'kcp' else self._recv_loop_udp
|
||||
self._recv_thread = threading.Thread(target=recv_target, daemon=True)
|
||||
self._recv_thread.start()
|
||||
|
||||
if self._transport == 'kcp':
|
||||
self.get_logger().info(
|
||||
f'Bridge ready — KCP {self._kcp_server} as {self._peer_id} → {self._topic} '
|
||||
f'(frame_id={self._frame_id}, timeout={self._timeout}s)'
|
||||
)
|
||||
else:
|
||||
self.get_logger().info(
|
||||
f'Bridge ready — UDP 0.0.0.0:{self._port} → {self._topic} '
|
||||
f'(frame_id={self._frame_id}, timeout={self._timeout}s)'
|
||||
)
|
||||
|
||||
def _recv_loop_udp(self):
|
||||
"""Background thread: receive UDP packets and update latest command."""
|
||||
while rclpy.ok():
|
||||
try:
|
||||
data, addr = self._sock.recvfrom(TWIST_CMD_SIZE + 64)
|
||||
except socket.timeout:
|
||||
continue
|
||||
except OSError:
|
||||
break
|
||||
|
||||
if len(data) != TWIST_CMD_SIZE:
|
||||
self.get_logger().warn(
|
||||
f'Packet has invalid size {len(data)} bytes from {addr}, '
|
||||
f'expected {TWIST_CMD_SIZE}'
|
||||
)
|
||||
continue
|
||||
|
||||
values = struct.unpack(TWIST_CMD_FMT, data)
|
||||
with self._lock:
|
||||
self._latest_cmd = values
|
||||
self._last_recv = time.monotonic()
|
||||
|
||||
def _recv_loop_kcp(self):
|
||||
"""Background thread: receive KCP packets and update latest command."""
|
||||
while rclpy.ok():
|
||||
try:
|
||||
result = self._session.recv(timeout_ms=100)
|
||||
except OSError as exc:
|
||||
if not self._closing:
|
||||
self.get_logger().error(f'KCP receive failed: {exc}')
|
||||
break
|
||||
|
||||
if result is None:
|
||||
continue
|
||||
|
||||
from_peer, msg_type, payload = result
|
||||
|
||||
if msg_type == self._msg_type_error:
|
||||
self.get_logger().error(
|
||||
f'KCP server error from {from_peer}: {payload.decode("utf-8", errors="replace")}'
|
||||
)
|
||||
continue
|
||||
|
||||
if self._expected_sender and from_peer != self._expected_sender:
|
||||
self.get_logger().warn(
|
||||
f'Ignoring KCP packet from unexpected sender {from_peer}, '
|
||||
f'expected {self._expected_sender}'
|
||||
)
|
||||
continue
|
||||
|
||||
if msg_type != self._msg_type_binary:
|
||||
self.get_logger().warn(
|
||||
f'Ignoring non-binary KCP message type {msg_type} from {from_peer}'
|
||||
)
|
||||
continue
|
||||
|
||||
if len(payload) != TWIST_CMD_SIZE:
|
||||
self.get_logger().warn(
|
||||
f'KCP payload has invalid size {len(payload)} bytes from {from_peer}, '
|
||||
f'expected {TWIST_CMD_SIZE}'
|
||||
)
|
||||
continue
|
||||
|
||||
values = struct.unpack(TWIST_CMD_FMT, payload)
|
||||
with self._lock:
|
||||
self._latest_cmd = values
|
||||
self._last_recv = time.monotonic()
|
||||
|
||||
def _timer_cb(self):
|
||||
"""20 Hz: publish TwistStamped from latest received command."""
|
||||
with self._lock:
|
||||
elapsed = time.monotonic() - self._last_recv
|
||||
if elapsed > self._timeout:
|
||||
lx, ly, lz, ax, ay, az = 0.0, 0.0, 0.0, 0.0, 0.0, 0.0
|
||||
else:
|
||||
lx, ly, lz, ax, ay, az = self._latest_cmd
|
||||
|
||||
msg = TwistStamped()
|
||||
msg.header.stamp = self.get_clock().now().to_msg()
|
||||
msg.header.frame_id = self._frame_id
|
||||
msg.twist.linear.x = float(lx)
|
||||
msg.twist.linear.y = float(ly)
|
||||
msg.twist.linear.z = float(lz)
|
||||
msg.twist.angular.x = float(ax)
|
||||
msg.twist.angular.y = float(ay)
|
||||
msg.twist.angular.z = float(az)
|
||||
|
||||
self._pub.publish(msg)
|
||||
|
||||
def destroy_node(self):
|
||||
self._closing = True
|
||||
if self._sock is not None:
|
||||
self._sock.close()
|
||||
self._sock = None
|
||||
if self._session is not None:
|
||||
try:
|
||||
self._session.close()
|
||||
except OSError as exc:
|
||||
self.get_logger().warn(f'Closing KCP session failed: {exc}')
|
||||
self._session = None
|
||||
if hasattr(self, '_recv_thread') and self._recv_thread.is_alive():
|
||||
self._recv_thread.join(timeout=0.2)
|
||||
super().destroy_node()
|
||||
|
||||
|
||||
def main(args=None):
|
||||
rclpy.init(args=args)
|
||||
node = None
|
||||
try:
|
||||
node = UdpTeleopBridge()
|
||||
rclpy.spin(node)
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
finally:
|
||||
if node is not None:
|
||||
node.destroy_node()
|
||||
rclpy.shutdown()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
Reference in New Issue
Block a user