# python import asyncio from datetime import datetime, timezone import json from pathlib import Path from typing import List from fastapi import FastAPI, WebSocket, WebSocketDisconnect from fastapi.responses import HTMLResponse app = FastAPI() # 简单内存管理:保存浏览器客户端 browser_clients: List[WebSocket] = [] # 可选:保存最近一条状态 last_status = None LOG_DIR = Path(__file__).resolve().parent / "logs" PACKET_LOG_PATH = LOG_DIR / "robot_packets.jsonl" INDEX_HTML = """ Robot Status Monitor

🤖 Robot Status Monitor

WebSocket: 连接中...

最近更新: -

⚡ 主电池电压
- V
🔋 主电池电流
- A
📊 主电池电量
- %
⚡ 副电池电压
- V
🔋 副电池电流
- A
📊 副电池电量
- %
IDPos (rad)SpeedCurrent (A)Temp (°C)Motor TempMOS TempError
左臂 (11-14)
11-------
12-------
13-------
14-------
右臂 (21-24)
21-------
22-------
23-------
24-------
腰部 (31-33)
31-------
32-------
33-------
左腿 (51-56)
51-------
52-------
53-------
54-------
55-------
56-------
右腿 (61-66)
61-------
62-------
63-------
64-------
65-------
66-------
""" @app.get("/", response_class=HTMLResponse) async def index(): return INDEX_HTML def append_robot_packet_log(packet: dict): """将服务器收到的机器人数据按 JSON Lines 追加写入日志文件。""" LOG_DIR.mkdir(parents=True, exist_ok=True) log_record = { "received_at": datetime.now(timezone.utc).isoformat(), "packet_timestamp": packet.get("timestamp"), "packet": packet, } with PACKET_LOG_PATH.open("a", encoding="utf-8") as log_file: log_file.write(json.dumps(log_record, ensure_ascii=False) + "\n") # 浏览器客户端 WebSocket(接收广播) @app.websocket("/ws") async def websocket_browser_endpoint(ws: WebSocket): await ws.accept() browser_clients.append(ws) global last_status try: # 可在连接时推送最近一条状态 if last_status is not None: await ws.send_text(json.dumps(last_status)) while True: # 浏览器无需发送消息,但保持连接活跃 msg = await ws.receive_text() # 可以处理浏览器发来的控制命令(未实现),这里忽略 except WebSocketDisconnect: pass finally: if ws in browser_clients: browser_clients.remove(ws) # 机器人/ROS2 发送端 WebSocket(推送 JSON) @app.websocket("/ws/robot") async def websocket_robot_endpoint(ws: WebSocket): await ws.accept() print("[WS] 机器人发送端已连接") try: while True: text = await ws.receive_text() try: data = json.loads(text) except Exception: print(f"[WS] JSON 解析失败: {text[:200]}") continue print(f"[WS] 收到机器人数据: {len(data.get('statuses', []))} 个电机") append_robot_packet_log(data) payload = { "statuses": data.get("statuses", []), "timestamp": data.get("timestamp"), "power": data.get("power"), } global last_status last_status = payload await broadcast_to_browsers(payload) except WebSocketDisconnect: print("[WS] 机器人发送端已断开") async def broadcast_to_browsers(payload): text = json.dumps(payload) to_remove = [] for client in list(browser_clients): try: await client.send_text(text) except Exception: to_remove.append(client) for c in to_remove: if c in browser_clients: browser_clients.remove(c) # 可直接运行: uvicorn main_monitor:app --reload --port 8000 if __name__ == "__main__": import uvicorn uvicorn.run("main_monitor:app", host="0.0.0.0", port=8000, reload=True)