|
|
|
|
@@ -2,6 +2,8 @@
|
|
|
|
|
|
|
|
|
|
from __future__ import annotations
|
|
|
|
|
|
|
|
|
|
import os
|
|
|
|
|
import socket
|
|
|
|
|
import threading
|
|
|
|
|
import time
|
|
|
|
|
from typing import Dict, Optional, Tuple
|
|
|
|
|
@@ -10,7 +12,6 @@ import rclpy
|
|
|
|
|
from geometry_msgs.msg import TwistStamped
|
|
|
|
|
from rclpy.node import Node
|
|
|
|
|
|
|
|
|
|
from .omni_transport import MSG_TYPE_BINARY, MSG_TYPE_ERROR, OmniTransport
|
|
|
|
|
from .protocol import (
|
|
|
|
|
DEFAULT_BRIDGE_PEER_ID,
|
|
|
|
|
DEFAULT_FRAME_ID,
|
|
|
|
|
@@ -40,6 +41,7 @@ class UdpCmdVelReceiver(Node):
|
|
|
|
|
self.declare_parameter('relay_via', '')
|
|
|
|
|
self.declare_parameter('peer_id', DEFAULT_BRIDGE_PEER_ID)
|
|
|
|
|
self.declare_parameter('expected_sender', '')
|
|
|
|
|
self.declare_parameter('local_socket_path', '/tmp/omnisocket-b-side-cmd.sock')
|
|
|
|
|
self.declare_parameter('output_topic', DEFAULT_OUTPUT_TOPIC)
|
|
|
|
|
self.declare_parameter('frame_id', DEFAULT_FRAME_ID)
|
|
|
|
|
self.declare_parameter('watchdog_timeout', DEFAULT_WATCHDOG_TIMEOUT)
|
|
|
|
|
@@ -51,12 +53,15 @@ class UdpCmdVelReceiver(Node):
|
|
|
|
|
self._relay_via = str(self.get_parameter('relay_via').value)
|
|
|
|
|
self._peer_id = str(self.get_parameter('peer_id').value)
|
|
|
|
|
self._expected_sender = str(self.get_parameter('expected_sender').value).strip()
|
|
|
|
|
self._local_socket_path = str(self.get_parameter('local_socket_path').value).strip()
|
|
|
|
|
self._output_topic = str(self.get_parameter('output_topic').value)
|
|
|
|
|
self._frame_id = str(self.get_parameter('frame_id').value)
|
|
|
|
|
self._watchdog_timeout = float(self.get_parameter('watchdog_timeout').value)
|
|
|
|
|
self._publish_rate_hz = float(self.get_parameter('publish_rate_hz').value)
|
|
|
|
|
self._queue_depth = int(self.get_parameter('queue_depth').value)
|
|
|
|
|
|
|
|
|
|
if self._transport_name not in ('udp', 'kcp', 'unix_dgram'):
|
|
|
|
|
raise ValueError("transport must be one of: udp, kcp, unix_dgram")
|
|
|
|
|
if self._watchdog_timeout <= 0.0:
|
|
|
|
|
raise ValueError('watchdog_timeout must be > 0')
|
|
|
|
|
if self._publish_rate_hz <= 0.0:
|
|
|
|
|
@@ -65,12 +70,23 @@ class UdpCmdVelReceiver(Node):
|
|
|
|
|
raise ValueError('queue_depth must be > 0')
|
|
|
|
|
|
|
|
|
|
self._publisher = self.create_publisher(TwistStamped, self._output_topic, self._queue_depth)
|
|
|
|
|
self._transport = OmniTransport(
|
|
|
|
|
transport=self._transport_name,
|
|
|
|
|
server_addr=self._server_addr,
|
|
|
|
|
relay_via=self._relay_via,
|
|
|
|
|
peer_id=self._peer_id,
|
|
|
|
|
)
|
|
|
|
|
self._transport = None
|
|
|
|
|
self._unix_socket: socket.socket | None = None
|
|
|
|
|
self._msg_type_binary = 0
|
|
|
|
|
self._msg_type_error = 0
|
|
|
|
|
if self._transport_name == 'unix_dgram':
|
|
|
|
|
self._setup_unix_socket()
|
|
|
|
|
else:
|
|
|
|
|
from .omni_transport import MSG_TYPE_BINARY, MSG_TYPE_ERROR, OmniTransport
|
|
|
|
|
|
|
|
|
|
self._msg_type_binary = MSG_TYPE_BINARY
|
|
|
|
|
self._msg_type_error = MSG_TYPE_ERROR
|
|
|
|
|
self._transport = OmniTransport(
|
|
|
|
|
transport=self._transport_name,
|
|
|
|
|
server_addr=self._server_addr,
|
|
|
|
|
relay_via=self._relay_via,
|
|
|
|
|
peer_id=self._peer_id,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
self._lock = threading.Lock()
|
|
|
|
|
self._last_log_times: Dict[str, float] = {}
|
|
|
|
|
@@ -82,22 +98,58 @@ class UdpCmdVelReceiver(Node):
|
|
|
|
|
|
|
|
|
|
self.create_timer(1.0 / self._publish_rate_hz, self._publish_tick)
|
|
|
|
|
|
|
|
|
|
self._recv_thread = threading.Thread(target=self._recv_loop, daemon=True)
|
|
|
|
|
recv_target = self._recv_loop_unix_dgram if self._transport_name == 'unix_dgram' else self._recv_loop
|
|
|
|
|
self._recv_thread = threading.Thread(target=recv_target, daemon=True)
|
|
|
|
|
self._recv_thread.start()
|
|
|
|
|
|
|
|
|
|
self.get_logger().info(
|
|
|
|
|
'Receiving teleop commands via %s://%s as %s and publishing TwistStamped to %s '
|
|
|
|
|
'at %.1f Hz (frame_id=%s, watchdog %.2f s)'
|
|
|
|
|
% (
|
|
|
|
|
self._transport.transport,
|
|
|
|
|
self._transport.server_addr,
|
|
|
|
|
self._peer_id,
|
|
|
|
|
self._output_topic,
|
|
|
|
|
self._publish_rate_hz,
|
|
|
|
|
self._frame_id,
|
|
|
|
|
self._watchdog_timeout,
|
|
|
|
|
if self._transport_name == 'unix_dgram':
|
|
|
|
|
self.get_logger().info(
|
|
|
|
|
'Receiving teleop commands via unix_dgram://%s and publishing TwistStamped to %s '
|
|
|
|
|
'at %.1f Hz (frame_id=%s, watchdog %.2f s)'
|
|
|
|
|
% (
|
|
|
|
|
self._local_socket_path,
|
|
|
|
|
self._output_topic,
|
|
|
|
|
self._publish_rate_hz,
|
|
|
|
|
self._frame_id,
|
|
|
|
|
self._watchdog_timeout,
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
else:
|
|
|
|
|
assert self._transport is not None
|
|
|
|
|
self.get_logger().info(
|
|
|
|
|
'Receiving teleop commands via %s://%s as %s and publishing TwistStamped to %s '
|
|
|
|
|
'at %.1f Hz (frame_id=%s, watchdog %.2f s)'
|
|
|
|
|
% (
|
|
|
|
|
self._transport.transport,
|
|
|
|
|
self._transport.server_addr,
|
|
|
|
|
self._peer_id,
|
|
|
|
|
self._output_topic,
|
|
|
|
|
self._publish_rate_hz,
|
|
|
|
|
self._frame_id,
|
|
|
|
|
self._watchdog_timeout,
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
def _setup_unix_socket(self) -> None:
|
|
|
|
|
if not self._local_socket_path:
|
|
|
|
|
raise ValueError('local_socket_path must not be empty for unix_dgram transport')
|
|
|
|
|
|
|
|
|
|
socket_dir = os.path.dirname(self._local_socket_path)
|
|
|
|
|
if socket_dir:
|
|
|
|
|
os.makedirs(socket_dir, exist_ok=True)
|
|
|
|
|
if os.path.exists(self._local_socket_path):
|
|
|
|
|
self.get_logger().warning(
|
|
|
|
|
'Removing existing unix datagram socket path before bind: %s'
|
|
|
|
|
% self._local_socket_path
|
|
|
|
|
)
|
|
|
|
|
try:
|
|
|
|
|
os.unlink(self._local_socket_path)
|
|
|
|
|
except FileNotFoundError:
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
self._unix_socket = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
|
|
|
|
|
self._unix_socket.bind(self._local_socket_path)
|
|
|
|
|
self._unix_socket.settimeout(0.1)
|
|
|
|
|
|
|
|
|
|
def _should_log(self, key: str, throttle_sec: float) -> bool:
|
|
|
|
|
now = time.monotonic()
|
|
|
|
|
@@ -128,6 +180,7 @@ class UdpCmdVelReceiver(Node):
|
|
|
|
|
def _recv_loop(self) -> None:
|
|
|
|
|
while not self._closing.is_set() and rclpy.ok():
|
|
|
|
|
try:
|
|
|
|
|
assert self._transport is not None
|
|
|
|
|
meta = self._transport.recv_into(buffer=self._recv_buffer, timeout_ms=100)
|
|
|
|
|
except BufferError as exc:
|
|
|
|
|
if self._should_log('buffer_error', 2.0):
|
|
|
|
|
@@ -145,7 +198,7 @@ class UdpCmdVelReceiver(Node):
|
|
|
|
|
msg_type = int(meta['msg_type'])
|
|
|
|
|
body_len = int(meta['body_len'])
|
|
|
|
|
|
|
|
|
|
if msg_type == MSG_TYPE_ERROR:
|
|
|
|
|
if msg_type == self._msg_type_error:
|
|
|
|
|
self._handle_error_message(from_peer, body_len)
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
@@ -157,7 +210,7 @@ class UdpCmdVelReceiver(Node):
|
|
|
|
|
)
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
if msg_type != MSG_TYPE_BINARY:
|
|
|
|
|
if msg_type != self._msg_type_binary:
|
|
|
|
|
if self._should_log('unexpected_type', 2.0):
|
|
|
|
|
self.get_logger().warning(
|
|
|
|
|
'Ignoring unexpected message type %d from %s (%d bytes)'
|
|
|
|
|
@@ -184,6 +237,38 @@ class UdpCmdVelReceiver(Node):
|
|
|
|
|
self._latest_command = command
|
|
|
|
|
self._last_packet_monotonic = time.monotonic()
|
|
|
|
|
|
|
|
|
|
def _recv_loop_unix_dgram(self) -> None:
|
|
|
|
|
assert self._unix_socket is not None
|
|
|
|
|
|
|
|
|
|
while not self._closing.is_set() and rclpy.ok():
|
|
|
|
|
try:
|
|
|
|
|
payload = self._unix_socket.recv(DEFAULT_RECV_BUFFER_BYTES)
|
|
|
|
|
except socket.timeout:
|
|
|
|
|
continue
|
|
|
|
|
except OSError as exc:
|
|
|
|
|
if not self._closing.is_set() and self._should_log('unix_recv_error', 2.0):
|
|
|
|
|
self.get_logger().error(f'Unix datagram receive loop stopped: {exc}')
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
if len(payload) != PACKET_SIZE:
|
|
|
|
|
if self._should_log('unix_packet_size', 2.0):
|
|
|
|
|
self.get_logger().warning(
|
|
|
|
|
'Dropped unix datagram payload with invalid size %d (expected %d)'
|
|
|
|
|
% (len(payload), PACKET_SIZE)
|
|
|
|
|
)
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
command = unpack_command(payload)
|
|
|
|
|
except ValueError as exc:
|
|
|
|
|
if self._should_log('unix_decode_error', 2.0):
|
|
|
|
|
self.get_logger().warning(f'Dropped malformed unix datagram payload: {exc}')
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
with self._lock:
|
|
|
|
|
self._latest_command = command
|
|
|
|
|
self._last_packet_monotonic = time.monotonic()
|
|
|
|
|
|
|
|
|
|
def _command_for_publish_tick(self) -> tuple[CommandTuple, Optional[float], bool]:
|
|
|
|
|
with self._lock:
|
|
|
|
|
latest_command = self._latest_command
|
|
|
|
|
@@ -218,6 +303,17 @@ class UdpCmdVelReceiver(Node):
|
|
|
|
|
if self._should_log('close_error', 2.0):
|
|
|
|
|
self.get_logger().warning(f'Closing OmniSocket transport failed: {exc}')
|
|
|
|
|
self._transport = None
|
|
|
|
|
if self._unix_socket is not None:
|
|
|
|
|
try:
|
|
|
|
|
self._unix_socket.close()
|
|
|
|
|
except OSError as exc:
|
|
|
|
|
if self._should_log('unix_close_error', 2.0):
|
|
|
|
|
self.get_logger().warning(f'Closing unix socket failed: {exc}')
|
|
|
|
|
self._unix_socket = None
|
|
|
|
|
try:
|
|
|
|
|
os.unlink(self._local_socket_path)
|
|
|
|
|
except FileNotFoundError:
|
|
|
|
|
pass
|
|
|
|
|
if hasattr(self, '_recv_thread') and self._recv_thread.is_alive():
|
|
|
|
|
self._recv_thread.join(timeout=0.5)
|
|
|
|
|
|
|
|
|
|
|