232 lines
7.7 KiB
Python
232 lines
7.7 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
monitor_sender.py
|
||
运行在机器人控制器上,订阅 /robot_state ROS2 话题,
|
||
并尽量兼容 /power/battery/status 电池话题,
|
||
通过 WebSocket 将状态信息发送到监控服务器。
|
||
每 1 秒采样并发送一次,降低通信开销。
|
||
|
||
用法:
|
||
ros2 run <your_package> monitor_sender
|
||
或直接: python3 monitor_sender.py --ip 192.168.1.100 --port 8000
|
||
"""
|
||
|
||
import argparse
|
||
import asyncio
|
||
import json
|
||
import threading
|
||
from datetime import datetime, timezone
|
||
|
||
import rclpy
|
||
from rclpy.node import Node
|
||
import websockets
|
||
import websockets.exceptions
|
||
|
||
try:
|
||
from ros2_bridge_msgs.msg import RobotState
|
||
except ImportError as exc:
|
||
RobotState = None
|
||
ROBOT_STATE_IMPORT_ERROR = exc
|
||
else:
|
||
ROBOT_STATE_IMPORT_ERROR = None
|
||
|
||
try:
|
||
from bodyctrl_msgs.msg import PowerBatteryStatus
|
||
except ImportError:
|
||
PowerBatteryStatus = None
|
||
|
||
|
||
class RobotStatusBridge(Node):
|
||
def __init__(self, msg_queue: asyncio.Queue, loop: asyncio.AbstractEventLoop):
|
||
super().__init__("monitor_sender")
|
||
self._queue = msg_queue
|
||
self._loop = loop
|
||
self._lock = threading.Lock()
|
||
self._latest_snapshot = None
|
||
self._latest_power = None # 电源状态单独保存
|
||
|
||
if RobotState is None:
|
||
raise RuntimeError(
|
||
"无法导入 ros2_bridge_msgs/msg/RobotState,请确认已 source 对应 ROS 2 工作空间。"
|
||
) from ROBOT_STATE_IMPORT_ERROR
|
||
|
||
self.create_subscription(RobotState, "/robot_state", self._robot_state_cb, 10)
|
||
subscribed_topics = ["/robot_state"]
|
||
|
||
if PowerBatteryStatus is not None:
|
||
self.create_subscription(
|
||
PowerBatteryStatus,
|
||
"/power/battery/status",
|
||
self._power_cb,
|
||
10,
|
||
)
|
||
subscribed_topics.append("/power/battery/status")
|
||
else:
|
||
self.get_logger().warn(
|
||
"未找到 bodyctrl_msgs/PowerBatteryStatus,将跳过 /power/battery/status 订阅"
|
||
)
|
||
|
||
self.get_logger().info(f"已订阅 {', '.join(subscribed_topics)}")
|
||
|
||
# 创建 1 秒定时器,每秒将最新状态放入队列
|
||
self._timer = self.create_timer(1.0, self._timer_cb)
|
||
|
||
@staticmethod
|
||
def _stamp_to_iso(stamp) -> str:
|
||
return datetime.fromtimestamp(
|
||
stamp.sec + stamp.nanosec * 1e-9, tz=timezone.utc
|
||
).isoformat()
|
||
|
||
@staticmethod
|
||
def _extract_temp_field(status, snake_name: str, legacy_name: str):
|
||
value = getattr(status, snake_name, None)
|
||
if value is None:
|
||
value = getattr(status, legacy_name, None)
|
||
return value
|
||
|
||
def _collect_statuses(self, status_group) -> list[dict]:
|
||
statuses = []
|
||
for s in getattr(status_group, "status", []):
|
||
entry = {
|
||
"name": int(s.name),
|
||
"pos": float(s.pos),
|
||
"speed": float(s.speed),
|
||
"current": float(s.current),
|
||
"temperature": float(s.temperature),
|
||
"error": int(s.error),
|
||
}
|
||
motor_temp = self._extract_temp_field(s, "motor_temperature", "motortemperature")
|
||
mos_temp = self._extract_temp_field(s, "mos_temperature", "mostemperature")
|
||
if motor_temp is not None:
|
||
entry["motor_temp"] = round(float(motor_temp), 1)
|
||
if mos_temp is not None:
|
||
entry["mos_temp"] = round(float(mos_temp), 1)
|
||
statuses.append(entry)
|
||
return statuses
|
||
|
||
def _robot_state_cb(self, msg):
|
||
try:
|
||
statuses = []
|
||
for key in ("head", "arm", "waist", "leg"):
|
||
section = getattr(msg, key, None)
|
||
if section is not None:
|
||
statuses.extend(self._collect_statuses(section))
|
||
|
||
timestamp = None
|
||
header = getattr(msg, "header", None)
|
||
if header is not None and hasattr(header, "stamp"):
|
||
timestamp = self._stamp_to_iso(header.stamp)
|
||
|
||
with self._lock:
|
||
self._latest_snapshot = {"statuses": statuses, "timestamp": timestamp}
|
||
except Exception as e:
|
||
self.get_logger().warn(f"解析 /robot_state 消息失败: {e}")
|
||
|
||
def _power_cb(self, msg):
|
||
"""电源状态回调:保存最新电源数据"""
|
||
try:
|
||
with self._lock:
|
||
self._latest_power = {
|
||
"voltage": round(float(msg.master_battery_voltage), 3),
|
||
"current": round(float(msg.master_battery_current), 3),
|
||
"power": round(float(msg.master_battery_power), 1),
|
||
"little_voltage": round(float(msg.little_battery_voltage), 3),
|
||
"little_current": round(float(msg.little_battery_current), 3),
|
||
"little_power": round(float(msg.little_battery_power), 1),
|
||
}
|
||
except Exception as e:
|
||
self.get_logger().warn(f"解析 power 消息失败: {e}")
|
||
|
||
def _timer_cb(self):
|
||
"""1 秒定时器回调:合并所有话题的最新状态,放入异步队列"""
|
||
with self._lock:
|
||
snapshot = self._latest_snapshot
|
||
power = self._latest_power
|
||
# 清空,避免重复发送
|
||
self._latest_snapshot = None
|
||
self._latest_power = None
|
||
|
||
if snapshot is None and power is None:
|
||
return
|
||
|
||
payload = {
|
||
"statuses": snapshot["statuses"] if snapshot else [],
|
||
"timestamp": snapshot["timestamp"] if snapshot else None,
|
||
"power": power,
|
||
}
|
||
try:
|
||
self._loop.call_soon_threadsafe(self._queue.put_nowait, payload)
|
||
except asyncio.QueueFull:
|
||
self.get_logger().warn("发送队列已满,丢弃本次数据")
|
||
|
||
|
||
async def ws_sender(server_url: str, msg_queue: asyncio.Queue):
|
||
"""异步 WebSocket 发送循环,带自动重连"""
|
||
while True:
|
||
try:
|
||
async with websockets.connect(server_url) as ws:
|
||
print(f"[WS] 已连接到 {server_url}")
|
||
while True:
|
||
payload = await msg_queue.get()
|
||
text = json.dumps(payload)
|
||
await ws.send(text)
|
||
except (
|
||
OSError,
|
||
websockets.exceptions.ConnectionClosed,
|
||
websockets.exceptions.InvalidURI,
|
||
ConnectionRefusedError,
|
||
) as e:
|
||
print(f"[WS] 连接断开/失败: {e},3 秒后重连...")
|
||
while not msg_queue.empty():
|
||
try:
|
||
msg_queue.get_nowait()
|
||
except asyncio.QueueEmpty:
|
||
break
|
||
await asyncio.sleep(3)
|
||
|
||
|
||
def spin_ros(node: Node):
|
||
"""在独立线程中运行 ROS2 spin"""
|
||
rclpy.spin(node)
|
||
|
||
|
||
def main():
|
||
parser = argparse.ArgumentParser(description="ROS2 Robot Status → WebSocket 发送端")
|
||
parser.add_argument(
|
||
"--ip",
|
||
type=str,
|
||
default="10.11.24.86",
|
||
help="监控服务器 IP 地址,例如 192.168.1.100",
|
||
)
|
||
parser.add_argument(
|
||
"--port",
|
||
type=int,
|
||
default=8000,
|
||
help="监控服务器端口(默认 8000)",
|
||
)
|
||
args = parser.parse_args()
|
||
server_url = f"ws://{args.ip}:{args.port}/ws/robot"
|
||
|
||
rclpy.init()
|
||
|
||
loop = asyncio.new_event_loop()
|
||
msg_queue: asyncio.Queue = asyncio.Queue(maxsize=100)
|
||
|
||
node = RobotStatusBridge(msg_queue, loop)
|
||
|
||
ros_thread = threading.Thread(target=spin_ros, args=(node,), daemon=True)
|
||
ros_thread.start()
|
||
|
||
try:
|
||
loop.run_until_complete(ws_sender(server_url, msg_queue))
|
||
except KeyboardInterrupt:
|
||
print("\n[INFO] 正在关闭...")
|
||
finally:
|
||
node.destroy_node()
|
||
rclpy.shutdown()
|
||
loop.close()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|