check: 查看机器人速度topic信息

This commit is contained in:
Mock
2026-04-09 22:28:30 +08:00
parent d76230b9b0
commit bc18f9a27b
3 changed files with 124 additions and 0 deletions

View File

@@ -15,6 +15,7 @@
<exec_depend>launch</exec_depend>
<exec_depend>launch_ros</exec_depend>
<exec_depend>rclpy</exec_depend>
<exec_depend>rosidl_runtime_py</exec_depend>
<exec_depend>teleop_twist_joy</exec_depend>
<exec_depend>teleop_twist_keyboard</exec_depend>

View File

@@ -29,6 +29,7 @@ setup(
'console_scripts': [
'cmd_vel_udp_sender = udp_teleop_bridge.cmd_vel_udp_sender:main',
'udp_cmd_vel_receiver = udp_teleop_bridge.udp_cmd_vel_receiver:main',
'topic_status_reader = udp_teleop_bridge.topic_status_reader:main',
],
},
)

View File

@@ -0,0 +1,122 @@
"""Subscribe to a ROS 2 topic with runtime type discovery and print messages."""
from __future__ import annotations
import json
import time
from typing import Any
import rclpy
from rclpy.node import Node
from rosidl_runtime_py.convert import message_to_ordereddict
from rosidl_runtime_py.utilities import get_message
WAIT_LOG_INTERVAL_SEC = 5.0
class TopicStatusReader(Node):
"""Wait for a topic to appear, subscribe to it, and print each message."""
def __init__(self) -> None:
super().__init__('topic_status_reader')
self.declare_parameter('topic', '/hrc/robot/cmd_vel_status')
self.declare_parameter('qos_depth', 10)
self.declare_parameter('poll_interval_sec', 0.5)
self._topic = str(self.get_parameter('topic').value).strip()
self._qos_depth = int(self.get_parameter('qos_depth').value)
self._poll_interval_sec = float(self.get_parameter('poll_interval_sec').value)
if not self._topic:
raise ValueError('topic must not be empty')
if self._qos_depth <= 0:
raise ValueError('qos_depth must be > 0')
if self._poll_interval_sec <= 0.0:
raise ValueError('poll_interval_sec must be > 0')
self._topic_type: str | None = None
self._subscription = None
self._message_count = 0
self._last_wait_log_monotonic = 0.0
self._poll_timer = self.create_timer(self._poll_interval_sec, self._ensure_subscription)
self._ensure_subscription()
def _discover_topic_types(self) -> list[str]:
for topic_name, topic_types in self.get_topic_names_and_types():
if topic_name == self._topic:
return list(topic_types)
return []
def _log_waiting(self) -> None:
now = time.monotonic()
if (now - self._last_wait_log_monotonic) < WAIT_LOG_INTERVAL_SEC:
return
self._last_wait_log_monotonic = now
self.get_logger().info(f'Waiting for topic {self._topic} to appear...')
def _ensure_subscription(self) -> None:
if self._subscription is not None:
return
topic_types = self._discover_topic_types()
if not topic_types:
self._log_waiting()
return
if len(topic_types) > 1:
joined = ', '.join(topic_types)
self.get_logger().warning(
f'Topic {self._topic} reports multiple types ({joined}); using {topic_types[0]}'
)
self._topic_type = topic_types[0]
try:
message_type = get_message(self._topic_type)
except Exception as exc:
self.get_logger().error(
f'Failed to import message type {self._topic_type} for {self._topic}: {exc}'
)
return
self._subscription = self.create_subscription(
message_type,
self._topic,
self._handle_message,
self._qos_depth,
)
self._poll_timer.cancel()
self.get_logger().info(
f'Subscribed to {self._topic} with type {self._topic_type} (qos_depth={self._qos_depth})'
)
def _format_message(self, msg: Any) -> str:
try:
payload = message_to_ordereddict(msg)
except Exception:
return str(msg)
return json.dumps(payload, ensure_ascii=False, indent=2)
def _handle_message(self, msg: Any) -> None:
self._message_count += 1
received_at = time.strftime('%Y-%m-%d %H:%M:%S')
topic_type = self._topic_type or type(msg).__name__
rendered = self._format_message(msg)
print(
f'[{received_at}] #{self._message_count} {self._topic} ({topic_type})\n{rendered}\n',
flush=True,
)
def main(args: list[str] | None = None) -> None:
rclpy.init(args=args)
node = TopicStatusReader()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()