Files
xMonitor/monitor_sender.py
2026-04-14 17:45:47 +08:00

232 lines
7.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
monitor_sender.py
运行在机器人控制器上,订阅 /robot_state ROS2 话题,
并尽量兼容 /power/battery/status 电池话题,
通过 WebSocket 将状态信息发送到监控服务器。
每 1 秒采样并发送一次,降低通信开销。
用法:
ros2 run <your_package> monitor_sender
或直接: python3 monitor_sender.py --ip 192.168.1.100 --port 8000
"""
import argparse
import asyncio
import json
import threading
from datetime import datetime, timezone
import rclpy
from rclpy.node import Node
import websockets
import websockets.exceptions
try:
from ros2_bridge_msgs.msg import RobotState
except ImportError as exc:
RobotState = None
ROBOT_STATE_IMPORT_ERROR = exc
else:
ROBOT_STATE_IMPORT_ERROR = None
try:
from bodyctrl_msgs.msg import PowerBatteryStatus
except ImportError:
PowerBatteryStatus = None
class RobotStatusBridge(Node):
def __init__(self, msg_queue: asyncio.Queue, loop: asyncio.AbstractEventLoop):
super().__init__("monitor_sender")
self._queue = msg_queue
self._loop = loop
self._lock = threading.Lock()
self._latest_snapshot = None
self._latest_power = None # 电源状态单独保存
if RobotState is None:
raise RuntimeError(
"无法导入 ros2_bridge_msgs/msg/RobotState请确认已 source 对应 ROS 2 工作空间。"
) from ROBOT_STATE_IMPORT_ERROR
self.create_subscription(RobotState, "/robot_state", self._robot_state_cb, 10)
subscribed_topics = ["/robot_state"]
if PowerBatteryStatus is not None:
self.create_subscription(
PowerBatteryStatus,
"/power/battery/status",
self._power_cb,
10,
)
subscribed_topics.append("/power/battery/status")
else:
self.get_logger().warn(
"未找到 bodyctrl_msgs/PowerBatteryStatus将跳过 /power/battery/status 订阅"
)
self.get_logger().info(f"已订阅 {', '.join(subscribed_topics)}")
# 创建 1 秒定时器,每秒将最新状态放入队列
self._timer = self.create_timer(1.0, self._timer_cb)
@staticmethod
def _stamp_to_iso(stamp) -> str:
return datetime.fromtimestamp(
stamp.sec + stamp.nanosec * 1e-9, tz=timezone.utc
).isoformat()
@staticmethod
def _extract_temp_field(status, snake_name: str, legacy_name: str):
value = getattr(status, snake_name, None)
if value is None:
value = getattr(status, legacy_name, None)
return value
def _collect_statuses(self, status_group) -> list[dict]:
statuses = []
for s in getattr(status_group, "status", []):
entry = {
"name": int(s.name),
"pos": float(s.pos),
"speed": float(s.speed),
"current": float(s.current),
"temperature": float(s.temperature),
"error": int(s.error),
}
motor_temp = self._extract_temp_field(s, "motor_temperature", "motortemperature")
mos_temp = self._extract_temp_field(s, "mos_temperature", "mostemperature")
if motor_temp is not None:
entry["motor_temp"] = round(float(motor_temp), 1)
if mos_temp is not None:
entry["mos_temp"] = round(float(mos_temp), 1)
statuses.append(entry)
return statuses
def _robot_state_cb(self, msg):
try:
statuses = []
for key in ("head", "arm", "waist", "leg"):
section = getattr(msg, key, None)
if section is not None:
statuses.extend(self._collect_statuses(section))
timestamp = None
header = getattr(msg, "header", None)
if header is not None and hasattr(header, "stamp"):
timestamp = self._stamp_to_iso(header.stamp)
with self._lock:
self._latest_snapshot = {"statuses": statuses, "timestamp": timestamp}
except Exception as e:
self.get_logger().warn(f"解析 /robot_state 消息失败: {e}")
def _power_cb(self, msg):
"""电源状态回调:保存最新电源数据"""
try:
with self._lock:
self._latest_power = {
"voltage": round(float(msg.master_battery_voltage), 3),
"current": round(float(msg.master_battery_current), 3),
"power": round(float(msg.master_battery_power), 1),
"little_voltage": round(float(msg.little_battery_voltage), 3),
"little_current": round(float(msg.little_battery_current), 3),
"little_power": round(float(msg.little_battery_power), 1),
}
except Exception as e:
self.get_logger().warn(f"解析 power 消息失败: {e}")
def _timer_cb(self):
"""1 秒定时器回调:合并所有话题的最新状态,放入异步队列"""
with self._lock:
snapshot = self._latest_snapshot
power = self._latest_power
# 清空,避免重复发送
self._latest_snapshot = None
self._latest_power = None
if snapshot is None and power is None:
return
payload = {
"statuses": snapshot["statuses"] if snapshot else [],
"timestamp": snapshot["timestamp"] if snapshot else None,
"power": power,
}
try:
self._loop.call_soon_threadsafe(self._queue.put_nowait, payload)
except asyncio.QueueFull:
self.get_logger().warn("发送队列已满,丢弃本次数据")
async def ws_sender(server_url: str, msg_queue: asyncio.Queue):
"""异步 WebSocket 发送循环,带自动重连"""
while True:
try:
async with websockets.connect(server_url) as ws:
print(f"[WS] 已连接到 {server_url}")
while True:
payload = await msg_queue.get()
text = json.dumps(payload)
await ws.send(text)
except (
OSError,
websockets.exceptions.ConnectionClosed,
websockets.exceptions.InvalidURI,
ConnectionRefusedError,
) as e:
print(f"[WS] 连接断开/失败: {e}3 秒后重连...")
while not msg_queue.empty():
try:
msg_queue.get_nowait()
except asyncio.QueueEmpty:
break
await asyncio.sleep(3)
def spin_ros(node: Node):
"""在独立线程中运行 ROS2 spin"""
rclpy.spin(node)
def main():
parser = argparse.ArgumentParser(description="ROS2 Robot Status → WebSocket 发送端")
parser.add_argument(
"--ip",
type=str,
default="10.11.24.86",
help="监控服务器 IP 地址,例如 192.168.1.100",
)
parser.add_argument(
"--port",
type=int,
default=8000,
help="监控服务器端口(默认 8000",
)
args = parser.parse_args()
server_url = f"ws://{args.ip}:{args.port}/ws/robot"
rclpy.init()
loop = asyncio.new_event_loop()
msg_queue: asyncio.Queue = asyncio.Queue(maxsize=100)
node = RobotStatusBridge(msg_queue, loop)
ros_thread = threading.Thread(target=spin_ros, args=(node,), daemon=True)
ros_thread.start()
try:
loop.run_until_complete(ws_sender(server_url, msg_queue))
except KeyboardInterrupt:
print("\n[INFO] 正在关闭...")
finally:
node.destroy_node()
rclpy.shutdown()
loop.close()
if __name__ == "__main__":
main()