From faebc5d44cd55f01759df2113f039fcba95be365 Mon Sep 17 00:00:00 2001 From: meiqi <976161896@qq.com> Date: Mon, 30 Mar 2026 23:04:39 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=AF=B9=E6=8E=A5KCP-C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Deploy_Tienkung/config/dex_config.yaml | 2 +- Deploy_Tienkung/rl_control_node.py | 27 +- .../udp_loopback/README_omnisocket.md | 43 +++ .../udp_loopback/config/omnisocket_demo.yaml | 29 ++ .../udp_loopback/omnisocket_control.py | 106 +++++ .../udp_loopback/omnisocket_fsm_controller.py | 275 +++++++++++++ .../omnisocket_keyboard_sender.py | 229 +++++++++++ .../udp_loopback/omnisocket_xbox_sender.py | 283 ++++++++++++++ .../udp_loopback/rl_control_node.py | 365 ++++++++++++++++++ 9 files changed, 1357 insertions(+), 2 deletions(-) create mode 100644 Deploy_Tienkung/udp_loopback/README_omnisocket.md create mode 100644 Deploy_Tienkung/udp_loopback/config/omnisocket_demo.yaml create mode 100644 Deploy_Tienkung/udp_loopback/omnisocket_control.py create mode 100644 Deploy_Tienkung/udp_loopback/omnisocket_fsm_controller.py create mode 100644 Deploy_Tienkung/udp_loopback/omnisocket_keyboard_sender.py create mode 100644 Deploy_Tienkung/udp_loopback/omnisocket_xbox_sender.py create mode 100644 Deploy_Tienkung/udp_loopback/rl_control_node.py diff --git a/Deploy_Tienkung/config/dex_config.yaml b/Deploy_Tienkung/config/dex_config.yaml index d2f9b42..8925b06 100644 --- a/Deploy_Tienkung/config/dex_config.yaml +++ b/Deploy_Tienkung/config/dex_config.yaml @@ -6,7 +6,7 @@ sim: false debug: false -control_tool: [keyboard] # joystick, xbox, keyboard, udp_loopback +control_tool: [omnisocket_loopback] # joystick, xbox, keyboard, udp_loopback, omnisocket_loopback joystick: initial_height: 0.957 diff --git a/Deploy_Tienkung/rl_control_node.py b/Deploy_Tienkung/rl_control_node.py index 5d3dd2e..e4a976a 100644 --- a/Deploy_Tienkung/rl_control_node.py +++ b/Deploy_Tienkung/rl_control_node.py @@ -47,7 +47,13 @@ def timing_decorator(func): class XMIGCSControlNode(Node): """xMIGCS控制节点Python版本""" - VALID_CONTROL_TOOLS = ("joystick", "xbox", "keyboard", "udp_loopback") + VALID_CONTROL_TOOLS = ( + "joystick", + "xbox", + "keyboard", + "udp_loopback", + "omnisocket_loopback", + ) def __init__(self, debug=False): super().__init__('xmigcs_control_node') @@ -185,6 +191,13 @@ class XMIGCSControlNode(Node): self.udp_fsm_controller.init() self.udp_fsm_controller.start() + if "omnisocket_loopback" in self.control_tools: + from udp_loopback.omnisocket_fsm_controller import OmniSocketFSMController + + self.omnisocket_fsm_controller = OmniSocketFSMController() + self.omnisocket_fsm_controller.init() + self.omnisocket_fsm_controller.start() + # Xbox控制器 if "xbox" in self.control_tools: self.xbox_controller = XBOXController() @@ -333,6 +346,15 @@ class XMIGCSControlNode(Node): self.udp_fsm_controller.get_last_fsm_command_time(), )) + if "omnisocket_loopback" in self.control_tools: + self.omnisocket_fsm_controller.update_flag() + source_flags.append(( + "omnisocket_loopback", + self.omnisocket_fsm_controller.get_udp_flag(), + self.omnisocket_fsm_controller.get_last_input_time(), + self.omnisocket_fsm_controller.get_last_fsm_command_time(), + )) + source_name, active_flag = self._select_control_flag(source_flags) flag = self._copy_control_flag(active_flag) flag.fsm_state_command = self._select_fsm_command(source_flags) @@ -421,6 +443,9 @@ class XMIGCSControlNode(Node): if hasattr(self, 'udp_fsm_controller') and "udp_loopback" in self.control_tools: self.udp_fsm_controller.stop() + if hasattr(self, + 'omnisocket_fsm_controller') and "omnisocket_loopback" in self.control_tools: + self.omnisocket_fsm_controller.stop() if self.control_thread and self.control_thread.is_alive(): self.control_thread.join(timeout=1.0) diff --git a/Deploy_Tienkung/udp_loopback/README_omnisocket.md b/Deploy_Tienkung/udp_loopback/README_omnisocket.md new file mode 100644 index 0000000..ccdbc07 --- /dev/null +++ b/Deploy_Tienkung/udp_loopback/README_omnisocket.md @@ -0,0 +1,43 @@ +# OmniSocket Integration + +This repo now supports an OmniSocket-based control path alongside the original UDP loopback. + +## Files + +- `omnisocket_control.py`: binary control packet codec shared by sender/receiver. +- `omnisocket_keyboard_sender.py`: keyboard -> OmniSocket sender. +- `omnisocket_xbox_sender.py`: ROS2 `/xbox_data` -> OmniSocket sender. +- `omnisocket_fsm_controller.py`: OmniSocket receiver that converts packets into `ControlFlag`. +- `config/omnisocket_demo.yaml`: OmniSocket transport config template. + +## Main-node integration + +Set `control_tool: omnisocket_loopback` in `config/dex_config.yaml`, then run `rl_control_node.py` or `rl_control_node_sim.py`. + +The main node will instantiate `OmniSocketFSMController`, which receives binary control packets and maps them into the same FSM commands used by the UDP loopback path. + +## Sender usage + +Keyboard sender: + +```bash +cd Deploy_Tienkung +python3 udp_loopback/omnisocket_keyboard_sender.py +``` + +Xbox sender: + +```bash +cd Deploy_Tienkung +source /opt/ros/jazzy/setup.bash +ros2 run joy joy_node --ros-args -r joy:=/xbox_data +python3 udp_loopback/omnisocket_xbox_sender.py +``` + +## Notes + +- Install the `omnisocket` Python package before using this path. +- `omnisocket_xbox_sender.py` still reuses `config/dex_config.yaml` for Xbox axis/button overrides so it stays aligned with the existing repo configuration. +- The original UDP files remain unchanged, so you can switch back by restoring `control_tool: udp_loopback`. +- OmniSocket keyboard/Xbox mappings are aligned with the cleaned walk-only FSM flow: `ZERO`, `STOP`, and `WALKAMP`. +- Keyboard sender supports `4/5/6` for clearing `x/y/yaw` speed independently, and `r` still clears all three axes. diff --git a/Deploy_Tienkung/udp_loopback/config/omnisocket_demo.yaml b/Deploy_Tienkung/udp_loopback/config/omnisocket_demo.yaml new file mode 100644 index 0000000..e79c967 --- /dev/null +++ b/Deploy_Tienkung/udp_loopback/config/omnisocket_demo.yaml @@ -0,0 +1,29 @@ +transport: + server_addr: 81.70.156.140:10909 + relay_via: 106.55.173.235 + bind_ip: "" + bind_device: "" + +control_sender: + peer_id: peer-a-ctrl + target_peer: peer-b-ctrl + joy_topic: /xbox_data + deadzone: 0.10 + analog_epsilon: 0.01 + dpad_threshold: 0.50 + trigger_pressed_threshold: -0.50 + +control_receiver: + peer_id: peer-b-ctrl + +motion: + initial_lift: 0.89 + min_lift: 0.65 + max_lift: 0.90 + lift_step: 0.05 + surge_step: 0.10 + sway_step: 0.10 + spin_step: 0.10 + max_surge: 1.00 + max_sway: 0.50 + max_spin: 0.50 diff --git a/Deploy_Tienkung/udp_loopback/omnisocket_control.py b/Deploy_Tienkung/udp_loopback/omnisocket_control.py new file mode 100644 index 0000000..7054ed0 --- /dev/null +++ b/Deploy_Tienkung/udp_loopback/omnisocket_control.py @@ -0,0 +1,106 @@ +"""Binary control packet codec and motion helpers for OmniSocket demos.""" + +from __future__ import annotations + +from dataclasses import dataclass +import struct +import time + + +CONTROL_PACKET_VERSION = 1 +CONTROL_PACKET_STRUCT = struct.Struct("!BBHIfQ") + +EVENT_NAME_TO_ID = { + "pose_home": 1, + "pose_hold": 2, + "mode_stride": 3, + "surge_up": 6, + "surge_down": 7, + "sway_left": 8, + "sway_right": 9, + "spin_left": 10, + "spin_right": 11, + "set_surge": 12, + "set_sway": 13, + "set_spin": 14, + "set_lift": 15, + "lift_up": 16, + "lift_down": 17, + "trim_reset": 18, + "session_quit": 19, +} +EVENT_ID_TO_NAME = {value: key for key, value in EVENT_NAME_TO_ID.items()} + + +@dataclass(slots=True) +class ControlPacket: + seq_id: int + event_id: int + drive_value: float = 1.0 + sent_at_ns: int = 0 + + @property + def event_name(self) -> str: + return EVENT_ID_TO_NAME.get(self.event_id, f"unknown_{self.event_id}") + + def encode(self) -> bytes: + sent_at_ns = self.sent_at_ns or time.time_ns() + return CONTROL_PACKET_STRUCT.pack( + CONTROL_PACKET_VERSION, + self.event_id, + 0, + int(self.seq_id), + float(self.drive_value), + int(sent_at_ns), + ) + + @classmethod + def decode(cls, payload: bytes) -> "ControlPacket": + if len(payload) != CONTROL_PACKET_STRUCT.size: + raise ValueError( + f"invalid control packet length {len(payload)}; " + f"want {CONTROL_PACKET_STRUCT.size}" + ) + version, event_id, _reserved, seq_id, drive_value, sent_at_ns = ( + CONTROL_PACKET_STRUCT.unpack(payload) + ) + if version != CONTROL_PACKET_VERSION: + raise ValueError(f"unsupported control packet version {version}") + return cls( + seq_id=int(seq_id), + event_id=int(event_id), + drive_value=float(drive_value), + sent_at_ns=int(sent_at_ns), + ) + + +def make_control_packet( + seq_id: int, event_name: str, drive_value: float = 1.0, sent_at_ns: int | None = None +) -> ControlPacket: + event_id = EVENT_NAME_TO_ID[event_name] + return ControlPacket( + seq_id=seq_id, + event_id=event_id, + drive_value=drive_value, + sent_at_ns=time.time_ns() if sent_at_ns is None else sent_at_ns, + ) + + +@dataclass(slots=True) +class MotionFrame: + mode_tag: str = "pose_hold" + surge_goal: float = 0.0 + sway_goal: float = 0.0 + spin_goal: float = 0.0 + lift_goal: float = 0.89 + relay_on: bool = True + last_event_code: str = "" + last_rx_time: float = 0.0 + + +def format_motion_frame(frame: MotionFrame) -> str: + return ( + f"mode={frame.mode_tag} surge={frame.surge_goal:.3f} " + f"sway={frame.sway_goal:.3f} spin={frame.spin_goal:.3f} " + f"lift={frame.lift_goal:.3f}" + ) diff --git a/Deploy_Tienkung/udp_loopback/omnisocket_fsm_controller.py b/Deploy_Tienkung/udp_loopback/omnisocket_fsm_controller.py new file mode 100644 index 0000000..7d2eec2 --- /dev/null +++ b/Deploy_Tienkung/udp_loopback/omnisocket_fsm_controller.py @@ -0,0 +1,275 @@ +"""Adapter that maps OmniSocket control packets onto a ControlFlag interface.""" + +from __future__ import annotations + +from pathlib import Path +import queue +import struct +import threading +import time +from typing import Dict, Optional + +import yaml + +from common.joystick import ControlFlag + +try: + from .omnisocket_control import ControlPacket, MotionFrame, format_motion_frame +except ImportError: # pragma: no cover - direct script execution fallback + from omnisocket_control import ControlPacket, MotionFrame, format_motion_frame + + +def _load_omnisocket_api(): + try: + from omnisocket import CONTROL_DEFAULTS, MSG_TYPE_BINARY, Session + except ImportError as exc: # pragma: no cover - environment dependent + raise RuntimeError( + "omnisocket is not installed. Install it before using " + "control_tool=omnisocket_loopback." + ) from exc + return CONTROL_DEFAULTS, MSG_TYPE_BINARY, Session + + +class OmniSocketFSMFlag(ControlFlag): + """FSM-facing flag produced from decoded OmniSocket control packets.""" + + def __init__(self) -> None: + super().__init__() + self.x_speed_command: float = 0.0 + self.y_speed_command: float = 0.0 + self.yaw_speed_command: float = 0.0 + self.height_cmd: float = 0.89 + + +class OmniSocketFSMController: + """Receive OmniSocket control packets and expose them as ControlFlag.""" + + def __init__(self) -> None: + self.config: Dict[str, object] = {} + self.data_mutex = threading.Lock() + + self._load_config() + self._init_data_structures() + + self.recv_running = False + self.recv_thread: Optional[threading.Thread] = None + self.session = None + self._msg_type_binary = None + + def _load_config(self) -> None: + config_path = Path(__file__).resolve().parent / "config" / "omnisocket_demo.yaml" + if config_path.exists(): + with config_path.open("r", encoding="utf-8") as file: + self.config = yaml.safe_load(file) or {} + else: + self.config = {} + + transport_cfg = self.config.get("transport", {}) + receiver_cfg = self.config.get("control_receiver", {}) + motion_cfg = self.config.get("motion", {}) + + self.server_addr = str(transport_cfg.get("server_addr", "127.0.0.1:10909")) + self.relay_via = str(transport_cfg.get("relay_via", "")) + self.bind_ip = str(transport_cfg.get("bind_ip", "")) + self.bind_device = str(transport_cfg.get("bind_device", "")) + self.peer_id = str(receiver_cfg.get("peer_id", "peer-b-ctrl")) + + self.initial_lift = float(motion_cfg.get("initial_lift", 0.89)) + self.lift_step = float(motion_cfg.get("lift_step", 0.05)) + self.max_surge = float(motion_cfg.get("max_surge", 1.0)) + self.max_sway = float(motion_cfg.get("max_sway", 0.5)) + self.max_spin = float(motion_cfg.get("max_spin", 0.5)) + self.max_lift = float(motion_cfg.get("max_lift", 0.90)) + self.min_lift = float(motion_cfg.get("min_lift", 0.65)) + self.surge_step = float(motion_cfg.get("surge_step", 0.1)) + self.sway_step = float(motion_cfg.get("sway_step", 0.1)) + self.spin_step = float(motion_cfg.get("spin_step", 0.1)) + + def _init_data_structures(self) -> None: + self.packet_queue: queue.Queue[ControlPacket] = queue.Queue(maxsize=128) + self.motion_frame = MotionFrame(lift_goal=self.initial_lift) + self.udp_flag = OmniSocketFSMFlag() + self.udp_flag.height_cmd = self.initial_lift + self.last_seq_id = -1 + self.last_fsm_command_time = 0.0 + + def start(self) -> None: + control_defaults, msg_type_binary, session_cls = _load_omnisocket_api() + self._msg_type_binary = msg_type_binary + self.session = session_cls() + self.session.connect( + server_addr=self.server_addr, + peer_id=self.peer_id, + relay_via=self.relay_via, + bind_ip=self.bind_ip, + bind_device=self.bind_device, + **control_defaults, + ) + + self.recv_running = True + self.recv_thread = threading.Thread(target=self._recv_loop, daemon=True) + self.recv_thread.start() + print( + f"OmniSocket FSM controller listening as {self.peer_id} " + f"via {self.server_addr}" + ) + + def stop(self) -> None: + self.recv_running = False + if self.recv_thread and self.recv_thread.is_alive(): + self.recv_thread.join(timeout=1.0) + if self.session is not None: + self.session.close() + self.session = None + print("OmniSocket FSM controller stopped") + + def _recv_loop(self) -> None: + while self.recv_running and self.session is not None: + item = self.session.recv(timeout_ms=200) + if item is None: + continue + + from_peer, msg_type, payload = item + if msg_type != self._msg_type_binary: + print( + f"[omnisocket_fsm] ignore non-binary message " + f"from {from_peer}: {msg_type}" + ) + continue + + try: + packet = ControlPacket.decode(payload) + except (ValueError, struct.error) as exc: + print(f"[omnisocket_fsm] drop invalid payload from {from_peer}: {exc}") + continue + + try: + self.packet_queue.put_nowait(packet) + except queue.Full: + try: + self.packet_queue.get_nowait() + self.packet_queue.put_nowait(packet) + except queue.Empty: + pass + + def update_flag(self) -> None: + while not self.packet_queue.empty(): + try: + packet = self.packet_queue.get_nowait() + except queue.Empty: + break + self._apply_packet(packet) + + def get_udp_flag(self) -> OmniSocketFSMFlag: + with self.data_mutex: + flag_copy = OmniSocketFSMFlag() + flag_copy.__dict__.update(self.udp_flag.__dict__) + return flag_copy + + def get_last_input_time(self) -> float: + with self.data_mutex: + return self.motion_frame.last_rx_time + + def get_last_fsm_command_time(self) -> float: + with self.data_mutex: + return self.last_fsm_command_time + + def init(self) -> int: + print("OmniSocket FSM controller initialized") + return 0 + + def _apply_packet(self, packet: ControlPacket) -> None: + event_code = packet.event_name + now = time.time() + with self.data_mutex: + self.last_seq_id = packet.seq_id + self.motion_frame.last_event_code = event_code + self.motion_frame.last_rx_time = now + + if event_code == "pose_home": + self.motion_frame.mode_tag = "pose_home" + self.last_fsm_command_time = now + elif event_code == "pose_hold": + self.motion_frame.mode_tag = "pose_hold" + self.last_fsm_command_time = now + elif event_code == "mode_stride": + self.motion_frame.mode_tag = "mode_stride" + self.last_fsm_command_time = now + elif event_code == "surge_up": + self.motion_frame.surge_goal = min( + self.max_surge, self.motion_frame.surge_goal + self.surge_step + ) + elif event_code == "surge_down": + self.motion_frame.surge_goal = max( + -self.max_surge, self.motion_frame.surge_goal - self.surge_step + ) + elif event_code == "sway_left": + self.motion_frame.sway_goal = max( + -self.max_sway, self.motion_frame.sway_goal - self.sway_step + ) + elif event_code == "sway_right": + self.motion_frame.sway_goal = min( + self.max_sway, self.motion_frame.sway_goal + self.sway_step + ) + elif event_code == "spin_left": + self.motion_frame.spin_goal = max( + -self.max_spin, self.motion_frame.spin_goal - self.spin_step + ) + elif event_code == "spin_right": + self.motion_frame.spin_goal = min( + self.max_spin, self.motion_frame.spin_goal + self.spin_step + ) + elif event_code == "set_surge": + self.motion_frame.surge_goal = max( + -self.max_surge, min(self.max_surge, packet.drive_value) + ) + elif event_code == "set_sway": + self.motion_frame.sway_goal = max( + -self.max_sway, min(self.max_sway, packet.drive_value) + ) + elif event_code == "set_spin": + self.motion_frame.spin_goal = max( + -self.max_spin, min(self.max_spin, packet.drive_value) + ) + elif event_code == "set_lift": + self.motion_frame.lift_goal = max( + self.min_lift, min(self.max_lift, packet.drive_value) + ) + elif event_code == "lift_up": + self.motion_frame.lift_goal = min( + self.max_lift, self.motion_frame.lift_goal + self.lift_step + ) + elif event_code == "lift_down": + self.motion_frame.lift_goal = max( + self.min_lift, self.motion_frame.lift_goal - self.lift_step + ) + elif event_code == "trim_reset": + self.motion_frame.surge_goal = 0.0 + self.motion_frame.sway_goal = 0.0 + self.motion_frame.spin_goal = 0.0 + elif event_code == "session_quit": + self.motion_frame.relay_on = False + self.motion_frame.mode_tag = "pose_hold" + self.recv_running = False + + self._sync_motion_frame_to_flag() + print( + f"[omnisocket_fsm] seq={packet.seq_id} event={event_code} " + f"{format_motion_frame(self.motion_frame)} " + f"fsm={self.udp_flag.fsm_state_command}" + ) + + def _sync_motion_frame_to_flag(self) -> None: + mode_to_fsm_command = { + "pose_home": "gotoZERO", + "pose_hold": "gotoSTOP", + "mode_stride": "gotoWALKAMP", + } + self.udp_flag.enable = self.motion_frame.relay_on + self.udp_flag.fsm_state_command = mode_to_fsm_command.get( + self.motion_frame.mode_tag, self.udp_flag.fsm_state_command + ) + self.udp_flag.x_speed_command = self.motion_frame.surge_goal + self.udp_flag.y_speed_command = self.motion_frame.sway_goal + self.udp_flag.yaw_speed_command = self.motion_frame.spin_goal + self.udp_flag.height_cmd = self.motion_frame.lift_goal diff --git a/Deploy_Tienkung/udp_loopback/omnisocket_keyboard_sender.py b/Deploy_Tienkung/udp_loopback/omnisocket_keyboard_sender.py new file mode 100644 index 0000000..89b3e5d --- /dev/null +++ b/Deploy_Tienkung/udp_loopback/omnisocket_keyboard_sender.py @@ -0,0 +1,229 @@ +"""Keyboard sender that emits binary control packets over OmniSocket.""" + +from __future__ import annotations + +import os +from pathlib import Path +import select +import signal +import sys +import termios +import threading +import tty +from typing import Dict, Optional + +import yaml + +try: + from .omnisocket_control import make_control_packet +except ImportError: # pragma: no cover - direct script execution fallback + from omnisocket_control import make_control_packet + + +def _load_omnisocket_api(): + try: + from omnisocket import CONTROL_DEFAULTS, Session + except ImportError as exc: # pragma: no cover - environment dependent + raise RuntimeError( + "omnisocket is not installed. Install it before using " + "omnisocket_keyboard_sender.py." + ) from exc + return CONTROL_DEFAULTS, Session + + +class OmniSocketKeyboardSender: + """Standalone keyboard sender for OmniSocket control-plane testing.""" + + def __init__(self) -> None: + self.config: Dict[str, object] = {} + self.seq_id = 0 + self.running = False + self.session = None + self.input_thread: Optional[threading.Thread] = None + self.original_terminal_settings = None + + self._load_config() + self._init_session() + self._print_help() + + def _load_config(self) -> None: + config_path = Path(__file__).resolve().parent / "config" / "omnisocket_demo.yaml" + if config_path.exists(): + with config_path.open("r", encoding="utf-8") as file: + self.config = yaml.safe_load(file) or {} + else: + self.config = {} + + transport_cfg = self.config.get("transport", {}) + sender_cfg = self.config.get("control_sender", {}) + + self.server_addr = str(transport_cfg.get("server_addr", "127.0.0.1:10909")) + self.relay_via = str(transport_cfg.get("relay_via", "")) + self.bind_ip = str(transport_cfg.get("bind_ip", "")) + self.bind_device = str(transport_cfg.get("bind_device", "")) + self.peer_id = str(sender_cfg.get("peer_id", "peer-a-ctrl")) + self.target_peer = str(sender_cfg.get("target_peer", "peer-b-ctrl")) + + def _init_session(self) -> None: + control_defaults, session_cls = _load_omnisocket_api() + self.session = session_cls() + self.session.connect( + server_addr=self.server_addr, + peer_id=self.peer_id, + relay_via=self.relay_via, + bind_ip=self.bind_ip, + bind_device=self.bind_device, + **control_defaults, + ) + + def _print_help(self) -> None: + print("OmniSocket keyboard sender ready") + print(f"Peer: {self.peer_id} -> {self.target_peer} via {self.server_addr}") + print("Keys:") + print(" z -> pose_home") + print(" c -> pose_hold") + print(" m -> mode_stride") + print(" w/s -> surge +/-") + print(" a/d -> sway +/-") + print(" q/e -> spin +/-") + print(" Left/Right -> lift +/-") + print(" Up/Down -> surge +/-") + print(" r -> trim_reset") + print(" 4 -> clear x speed") + print(" 5 -> clear y speed") + print(" 6 -> clear yaw speed") + print(" x -> session_quit") + + def start(self) -> None: + self.running = True + self.input_thread = threading.Thread(target=self._input_loop, daemon=True) + self.input_thread.start() + print("OmniSocket keyboard sender thread started") + + def stop(self) -> None: + self.running = False + if self.input_thread and self.input_thread.is_alive(): + self.input_thread.join(timeout=1.0) + if self.original_terminal_settings is not None: + try: + termios.tcsetattr( + sys.stdin, termios.TCSADRAIN, self.original_terminal_settings + ) + except termios.error: + pass + self.original_terminal_settings = None + if self.session is not None: + self.session.close() + self.session = None + print("OmniSocket keyboard sender stopped") + + def _input_loop(self) -> None: + self.original_terminal_settings = termios.tcgetattr(sys.stdin) + try: + tty.setraw(sys.stdin.fileno()) + while self.running: + if select.select([sys.stdin], [], [], 0.1)[0]: + key = sys.stdin.read(1) + self._process_key(key) + except KeyboardInterrupt: + self._handle_ctrl_c() + finally: + if self.original_terminal_settings is not None: + termios.tcsetattr( + sys.stdin, termios.TCSADRAIN, self.original_terminal_settings + ) + self.original_terminal_settings = None + + def _process_key(self, key: str) -> None: + event_map = { + "w": ("surge_up", "w", 1.0), + "s": ("surge_down", "s", 1.0), + "a": ("sway_left", "a", 1.0), + "d": ("sway_right", "d", 1.0), + "q": ("spin_left", "q", 1.0), + "e": ("spin_right", "e", 1.0), + "z": ("pose_home", "z", 1.0), + "c": ("pose_hold", "c", 1.0), + "m": ("mode_stride", "m", 1.0), + "r": ("trim_reset", "r", 1.0), + "4": ("set_surge", "4", 0.0), + "5": ("set_sway", "5", 0.0), + "6": ("set_spin", "6", 0.0), + "x": ("session_quit", "x", 1.0), + } + + if key == "\x03": + self._handle_ctrl_c() + return + + if key == "\x1b": + self._handle_arrow_key() + return + + if key in event_map: + event_code, key_name, drive_value = event_map[key] + self._send_event(event_code, key_name, drive_value) + + def _handle_arrow_key(self) -> None: + if not select.select([sys.stdin], [], [], 0.1)[0]: + return + key2 = sys.stdin.read(1) + if key2 != "[": + return + if not select.select([sys.stdin], [], [], 0.1)[0]: + return + key3 = sys.stdin.read(1) + arrow_map = { + "A": ("surge_up", "arrow_up"), + "B": ("surge_down", "arrow_down"), + "C": ("lift_down", "arrow_right"), + "D": ("lift_up", "arrow_left"), + } + if key3 in arrow_map: + event_code, key_name = arrow_map[key3] + self._send_event(event_code, key_name) + + def _send_event( + self, event_code: str, key_name: str, drive_value: float = 1.0 + ) -> None: + if self.session is None: + return + packet = make_control_packet(self.seq_id, event_code, drive_value) + payload = packet.encode() + self.seq_id += 1 + self.session.send(to=self.target_peer, data=payload) + print( + f"sent seq={packet.seq_id} event={event_code} key={key_name} " + f"bytes={len(payload)}" + ) + if event_code == "session_quit": + self.running = False + + def _handle_ctrl_c(self) -> None: + self.running = False + if self.original_terminal_settings is not None: + try: + termios.tcsetattr( + sys.stdin, termios.TCSADRAIN, self.original_terminal_settings + ) + except termios.error: + pass + self.original_terminal_settings = None + os.kill(os.getpid(), signal.SIGINT) + + +def main() -> None: + sender = OmniSocketKeyboardSender() + sender.start() + try: + while sender.running: + if sender.input_thread is not None: + sender.input_thread.join(timeout=0.2) + except KeyboardInterrupt: + pass + finally: + sender.stop() + + +if __name__ == "__main__": + main() diff --git a/Deploy_Tienkung/udp_loopback/omnisocket_xbox_sender.py b/Deploy_Tienkung/udp_loopback/omnisocket_xbox_sender.py new file mode 100644 index 0000000..328ffac --- /dev/null +++ b/Deploy_Tienkung/udp_loopback/omnisocket_xbox_sender.py @@ -0,0 +1,283 @@ +"""ROS2 Joy -> OmniSocket bridge for Xbox control.""" + +from __future__ import annotations + +from pathlib import Path +from typing import Dict + +import rclpy +from rclpy.node import Node +from rclpy.qos import HistoryPolicy, QoSProfile, ReliabilityPolicy +from sensor_msgs.msg import Joy +import yaml + +try: + from .omnisocket_control import make_control_packet +except ImportError: # pragma: no cover - direct script execution fallback + from omnisocket_control import make_control_packet + + +def _load_omnisocket_api(): + try: + from omnisocket import CONTROL_DEFAULTS, Session + except ImportError as exc: # pragma: no cover - environment dependent + raise RuntimeError( + "omnisocket is not installed. Install it before using " + "omnisocket_xbox_sender.py." + ) from exc + return CONTROL_DEFAULTS, Session + + +class OmniSocketXboxSender(Node): + """Subscribe to Joy messages and forward them through OmniSocket.""" + + def __init__(self) -> None: + super().__init__("omnisocket_xbox_sender") + self.config: Dict[str, object] = {} + self.seq_id = 0 + self.last_buttons: Dict[str, int] = {} + self.last_dpad_h = 0.0 + self.session = None + + self._load_config() + self._init_session() + + qos_profile = QoSProfile( + reliability=ReliabilityPolicy.RELIABLE, + history=HistoryPolicy.KEEP_LAST, + depth=10, + ) + self.subscription = self.create_subscription( + Joy, self.joy_topic, self._joy_callback, qos_profile + ) + + self.get_logger().info( + f"Forwarding {self.joy_topic} -> OmniSocket " + f"{self.peer_id} -> {self.target_peer}" + ) + self.get_logger().info( + "Buttons: A=WALKAMP X=ZERO Y=STOP START=reset" + ) + + def destroy_node(self) -> bool: + if self.session is not None: + self.session.close() + self.session = None + return super().destroy_node() + + def _load_config(self) -> None: + omni_config_path = ( + Path(__file__).resolve().parent / "config" / "omnisocket_demo.yaml" + ) + main_config_path = Path(__file__).resolve().parents[1] / "config" / "dex_config.yaml" + + if omni_config_path.exists(): + with omni_config_path.open("r", encoding="utf-8") as file: + self.config = yaml.safe_load(file) or {} + else: + self.config = {} + + with main_config_path.open("r", encoding="utf-8") as file: + main_config = yaml.safe_load(file) or {} + + transport_cfg = self.config.get("transport", {}) + sender_cfg = self.config.get("control_sender", {}) + xbox_cfg = main_config.get("xbox", {}) + + self.server_addr = str(transport_cfg.get("server_addr", "127.0.0.1:10909")) + self.relay_via = str(transport_cfg.get("relay_via", "")) + self.bind_ip = str(transport_cfg.get("bind_ip", "")) + self.bind_device = str(transport_cfg.get("bind_device", "")) + self.peer_id = str(sender_cfg.get("peer_id", "peer-a-ctrl")) + self.target_peer = str(sender_cfg.get("target_peer", "peer-b-ctrl")) + self.joy_topic = str(sender_cfg.get("joy_topic", "/xbox_data")) + + self.deadzone = float(sender_cfg.get("deadzone", 0.10)) + self.analog_epsilon = float(sender_cfg.get("analog_epsilon", 0.01)) + self.dpad_threshold = float(sender_cfg.get("dpad_threshold", 0.50)) + self.trigger_pressed_threshold = float( + sender_cfg.get("trigger_pressed_threshold", -0.50) + ) + + self.forward_command_offset = float( + sender_cfg.get( + "forward_command_offset", + xbox_cfg.get("forward_command_offset", 0.0), + ) + ) + self.lateral_command_offset = float( + sender_cfg.get( + "lateral_command_offset", + xbox_cfg.get("lateral_command_offset", 0.0), + ) + ) + self.rotation_command_offset = float( + sender_cfg.get( + "rotation_command_offset", + xbox_cfg.get("rotation_command_offset", 0.0), + ) + ) + + self.button_map = { + "a": 0, + "b": 1, + "x": 2, + "y": 3, + "lb": 4, + "rb": 5, + "select": 6, + "start": 7, + "home": 8, + } + self.axis_map = { + "lx": 0, + "ly": 1, + "l_trigger": 2, + "rx": 3, + "ry": 4, + "r_trigger": 5, + "dpad_h": 6, + "dpad_v": 7, + } + + self._merge_mapping(self.button_map, xbox_cfg.get("button_map")) + self._merge_mapping(self.axis_map, xbox_cfg.get("axis_map")) + self._merge_mapping(self.button_map, sender_cfg.get("button_map")) + self._merge_mapping(self.axis_map, sender_cfg.get("axis_map")) + + def _merge_mapping(self, target: Dict[str, int], override: object) -> None: + if not isinstance(override, dict): + return + for name, index in override.items(): + if name in target: + try: + target[name] = int(index) + except (TypeError, ValueError): + pass + + def _init_session(self) -> None: + control_defaults, session_cls = _load_omnisocket_api() + self.session = session_cls() + self.session.connect( + server_addr=self.server_addr, + peer_id=self.peer_id, + relay_via=self.relay_via, + bind_ip=self.bind_ip, + bind_device=self.bind_device, + **control_defaults, + ) + + def _joy_callback(self, msg: Joy) -> None: + axes = list(msg.axes) + [0.0] * 16 + buttons = list(msg.buttons) + [0] * 32 + + state = { + "a": self._button_value(buttons, "a"), + "x": self._button_value(buttons, "x"), + "y": self._button_value(buttons, "y"), + "start": self._button_value(buttons, "start"), + "lx": self._axis_value(axes, "lx"), + "ly": self._axis_value(axes, "ly"), + "rx": self._axis_value(axes, "rx"), + "dpad_h": self._axis_value(axes, "dpad_h"), + } + + self._send_mode_events(state) + self._send_trim_event(state) + self._send_lift_events(state) + self._send_analog_events(state) + + self.last_buttons = { + name: int(state[name]) for name in ("a", "x", "y", "start") + } + self.last_dpad_h = float(state["dpad_h"]) + + def _button_value(self, buttons: list[int], name: str) -> int: + index = self.button_map[name] + return int(buttons[index]) if index < len(buttons) else 0 + + def _axis_value(self, axes: list[float], name: str) -> float: + index = self.axis_map[name] + return float(axes[index]) if index < len(axes) else 0.0 + + def _send_mode_events(self, state: Dict[str, float]) -> None: + if self._rising_edge(state, "y"): + self._send_event("pose_hold", "y") + elif self._rising_edge(state, "x"): + self._send_event("pose_home", "x") + elif self._rising_edge(state, "a"): + self._send_event("mode_stride", "a") + + def _send_trim_event(self, state: Dict[str, float]) -> None: + if self._rising_edge(state, "start"): + self._send_event("trim_reset", "start") + + def _send_lift_events(self, state: Dict[str, float]) -> None: + dpad_h = float(state["dpad_h"]) + if dpad_h <= -self.dpad_threshold and self.last_dpad_h > -self.dpad_threshold: + self._send_event("lift_up", "dpad_left") + elif dpad_h >= self.dpad_threshold and self.last_dpad_h < self.dpad_threshold: + self._send_event("lift_down", "dpad_right") + + def _send_analog_events(self, state: Dict[str, float]) -> None: + surge = self._compute_surge(state["ly"]) + sway = self._cleanup_command( + self._apply_deadzone(state["lx"]) * -0.4 + self.lateral_command_offset + ) + spin = self._cleanup_command( + self._apply_deadzone(state["rx"]) * -0.4 + self.rotation_command_offset + ) + + self._send_event("set_surge", "left_stick_y", surge) + self._send_event("set_sway", "left_stick_x", sway) + self._send_event("set_spin", "right_stick_x", spin) + + def _compute_surge(self, ly: float) -> float: + ly = self._apply_deadzone(ly) + if ly >= 0.0: + value = ly * 0.8 + self.forward_command_offset + else: + value = ly * 0.5 + return self._cleanup_command(value) + + def _apply_deadzone(self, value: float) -> float: + if abs(value) < self.deadzone: + return 0.0 + return float(value) + + def _cleanup_command(self, value: float) -> float: + if abs(value) < self.analog_epsilon: + return 0.0 + return float(value) + + def _rising_edge(self, state: Dict[str, float], name: str) -> bool: + previous = int(self.last_buttons.get(name, 0)) + return int(state[name]) == 1 and previous == 0 + + def _send_event( + self, event_code: str, key_name: str, drive_value: float = 1.0 + ) -> None: + if self.session is None: + return + packet = make_control_packet(self.seq_id, event_code, drive_value) + self.seq_id += 1 + self.session.send(to=self.target_peer, data=packet.encode()) + self.get_logger().debug( + f"sent seq={packet.seq_id} event={event_code} key={key_name}" + ) + + +def main(args: list[str] | None = None) -> None: + rclpy.init(args=args) + node = OmniSocketXboxSender() + try: + rclpy.spin(node) + except KeyboardInterrupt: + pass + finally: + node.destroy_node() + rclpy.shutdown() + + +if __name__ == "__main__": + main() diff --git a/Deploy_Tienkung/udp_loopback/rl_control_node.py b/Deploy_Tienkung/udp_loopback/rl_control_node.py new file mode 100644 index 0000000..9f84c16 --- /dev/null +++ b/Deploy_Tienkung/udp_loopback/rl_control_node.py @@ -0,0 +1,365 @@ +""" +RL Control Plugin (Python Version) +Main ROS2 node for humanoid robot RL control system +""" +import math +import os +import queue +import threading +import time +import sys + +import rclpy +import yaml +from rclpy.node import Node +from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy +# ROS messages +from sensor_msgs.msg import Joy + +sys.path.append(os.path.dirname(os.path.abspath(__file__))) + +from common.joystick import JoystickHumanoid, ControlFlag +from common.xbox_control import XBOXController +# Local imports +from common.robot_data import RobotData +from FSM.robot_fsm import get_robot_fsm +from FSM.fsm_base import FSMStateName +from common.robot_interface import get_robot_interface +from common.stdin_keyboard_control import KeyboardController +from udp_loopback.udp_fsm_controller import UDPFSMController +import functools + +def timing_decorator(func): + """ + 装饰器:记录函数执行时间 + """ + @functools.wraps(func) + def wrapper(*args, **kwargs): + start_time = time.perf_counter() + result = func(*args, **kwargs) + end_time = time.perf_counter() + execution_time = end_time - start_time + print(f"[TIMING] {func.__name__} executed in {execution_time:.6f} seconds") + return result + return wrapper + + +class XMIGCSControlNode(Node): + """xMIGCS控制节点Python版本""" + + def __init__(self, debug=False): + super().__init__('xmigcs_control_node') + + # 配置和参数 + self.debug = debug + self.whole_joint_num = 35 + self.pi = math.pi + self.rpm2rps = math.pi / 30.0 + + self.config = {} + + # 加载配置 + self._load_config() + + # 初始化数据结构 + self._init_data_structures() + + # 机器人接口 + self.robot_interface = get_robot_interface(self.robot_data, + self.config_file) + self.robot_interface.init(self) # 传入node实例 + + # 机器人FSM + self.robot_fsm = get_robot_fsm( + self.robot_data, + self.config, + ) + + # 初始化ROS接口 + self._init_ros_interfaces() + + # 初始化控制系统 + self._init_control_system() + + # 启动控制线程 + self._start_control_thread() + + def _load_config(self): + """加载配置文件""" + # 获取包路径 + self.config_file = os.path.join('.', 'config', 'dex_config.yaml') + + with open(self.config_file, 'r') as f: + self.config = yaml.safe_load(f) + + print(self.config) + + # 获取控制器类型 + self.control_tool = self.config.get('control_tool', 'keyboard') + # 提取关键配置参数 + self.motor_num = self.config.get('motor_num') + self.dt = self.config.get('dt') + self.sim = self.config.get('sim') + + # 检查当前用户名,如果是ubuntu则抛出异常 + import getpass + user_name = getpass.getuser().lower() + if self.sim and user_name == 'ubuntu': + raise RuntimeError("On ubuntu user, sim must be set to false") + + def _init_data_structures(self): + """初始化数据结构""" + # 机器人数据 + self.robot_data = RobotData(self.motor_num, self.whole_joint_num) + self.robot_data.config_file_ = getattr(self, 'config_file', '') + + # joysticks 消息队列 + self.queue_joy_cmd = queue.Queue(maxsize=1) + self.queue_xbox_cmd = queue.Queue(maxsize=1) + self.control_flag = ControlFlag() + + def _init_ros_interfaces(self): + """初始化ROS接口(仅非电机相关)""" + qos_profile = QoSProfile(reliability=ReliabilityPolicy.RELIABLE, + history=HistoryPolicy.KEEP_LAST, + depth=5) + + # 订阅者(非电机相关) + if self.control_tool == "joystick": + self.sub_joy_cmd = self.create_subscription( + Joy, '/sbus_data', self._joy_callback, qos_profile) + if self.control_tool == "xbox": + self.sub_xbox_cmd = self.create_subscription( + Joy, '/xbox_data', self._xbox_callback, qos_profile) + + def _init_control_system(self): + """初始化控制系统""" + + # 手柄控制器 + if self.control_tool == "joystick": + self.joystick_humanoid = JoystickHumanoid() + self.joystick_humanoid.init() + + # 键盘控制器 + if self.control_tool == "keyboard": + self.keyboard_controller = KeyboardController() + self.keyboard_controller.init() + # 如果使用键盘控制,启动键盘监听 + self.keyboard_controller.start() + + # UDP控制器 + if self.control_tool == "udp_loopback": + self.udp_fsm_controller = UDPFSMController() + self.udp_fsm_controller.init() + self.udp_fsm_controller.start() + if self.control_tool == "omnisocket_loopback": + from udp_loopback.omnisocket_fsm_controller import OmniSocketFSMController + + self.omnisocket_fsm_controller = OmniSocketFSMController() + self.omnisocket_fsm_controller.init() + self.omnisocket_fsm_controller.start() + + # Xbox控制器 + if self.control_tool == "xbox": + self.xbox_controller = XBOXController() + self.xbox_controller.init() + + # 控制标志 + self.control_running = False + self.control_thread = None + + def _start_control_thread(self): + """启动控制线程""" + self.control_running = True + self.control_thread = threading.Thread(target=self._rl_control_loop, + daemon=True) + self.control_thread.start() + self.get_logger().info("Control thread started") + + def _rl_control_loop(self): + """主控制循环""" + self.get_logger().info("RL control loop starting...") + + # 初始化时间戳 + time_passed = 0.0 + + # 处理手柄数据 + self._process_controller_data() + # 更新机器人数据 + self._update_robot_data(self.control_flag, time_passed) + + while self.control_running and rclpy.ok(): + + # 运行FSM + loop_start = time.perf_counter() + self.robot_fsm.run_fsm(self.robot_data.control_flag) + + # 发布控制命令 + self.robot_interface.update_param(current_state=self.robot_fsm.get_current_state()) + self._send_control_commands(self.robot_data.control_flag) + + # 更新时间戳 + time_passed += self.dt + + # 处理手柄数据 + self._process_controller_data() + + # 更新机器人数据 + self._update_robot_data(self.control_flag, time_passed) + + # 控制频率 + self._precise_sleep_until(loop_start + self.dt) + # print( + # f"current control freq: {1/(time.perf_counter() - loop_start):.2f} Hz" + # ) + + + self.get_logger().info("RL control loop ended") + + def _precise_sleep_until(self, target_time): + """精确睡眠到目标时间""" + current_time = time.perf_counter() + sleep_time = target_time - current_time + + if sleep_time <= 0: + return # 已经超时,立即返回 + + # 分级睡眠策略 + if sleep_time > 0.003: # 3ms以上使用混合睡眠 + # 先睡眠大部分时间 + time.sleep(sleep_time * 0.9) + # 剩余时间忙等待 + while time.perf_counter() < target_time: + pass + else: # 3ms以内纯忙等待 + while time.perf_counter() < target_time: + pass + + # def _wait_for_start_signal(self): + # """等待启动信号""" + # start_file = "/tmp/rl_start_signal" + # self.get_logger().info("Waiting for start signal...") + # self.get_logger().info("Run: touch /tmp/rl_start_signal") + + # # 删除可能存在的旧文件 + # if os.path.exists(start_file): + # os.remove(start_file) + + # # 等待启动文件出现 + # while not os.path.exists(start_file) and rclpy.ok(): + # time.sleep(0.5) + + # self.get_logger().info("Start signal received, beginning RL control!") + + # @timing_decorator + def _process_controller_data(self): + # 处理控制器输入 + if self.control_tool == "joystick": + # 处理手柄输入 + while not self.queue_joy_cmd.empty(): + try: + msg = self.queue_joy_cmd.get_nowait() + self.joystick_humanoid.joy_map_read(msg) + self.joystick_humanoid.joy_flag_update() + break + except queue.Empty: + break + if self.control_tool == "xbox": + while not self.queue_xbox_cmd.empty(): + try: + msg = self.queue_xbox_cmd.get_nowait() + self.xbox_controller.xbox_map_read(msg) + self.xbox_controller.xbox_flag_update() + break + except queue.Empty: + break + + if self.control_tool == "keyboard": + self.keyboard_controller.update_flag() + flag = self.keyboard_controller.get_keyboard_flag() + elif self.control_tool == "udp_loopback": + self.udp_fsm_controller.update_flag() + flag = self.udp_fsm_controller.get_udp_flag() + elif self.control_tool == "omnisocket_loopback": + self.omnisocket_fsm_controller.update_flag() + flag = self.omnisocket_fsm_controller.get_udp_flag() + elif self.control_tool == "joystick": + flag = self.joystick_humanoid.get_joy_flag() + elif self.control_tool == "xbox": + flag = self.xbox_controller.get_xbox_flag() + else: + print("[ERROR] No control tool specified") + print('*' * 30 + f"current flag: {flag}" + '*' * 30) + self.control_flag = flag + + # @timing_decorator + def _update_robot_data(self, flag: ControlFlag, time_passed: float): + """更新机器人数据""" + self.robot_interface.update_robot_data(flag, time_passed) + + # @timing_decorator + def _send_control_commands(self, flag: ControlFlag): + """发布控制命令""" + # 通过robot_interface发布控制命令 + self.robot_interface.send_motor_commands(flag) + + def _joy_callback(self, msg): + """云卓手柄输入回调""" + try: + self.queue_joy_cmd.put_nowait(msg) + except queue.Full: + try: + self.queue_joy_cmd.get_nowait() # 移除旧数据 + self.queue_joy_cmd.put_nowait(msg) # 加入新数据 + except: + pass # 如果仍然无法加入,忽略 + + def _xbox_callback(self, msg): + """xbox手柄输入回调""" + try: + self.queue_xbox_cmd.put_nowait(msg) + except queue.Full: + try: + self.queue_xbox_cmd.get_nowait() # 移除旧数据 + self.queue_xbox_cmd.put_nowait(msg) # 加入新数据 + except: + pass # 如果仍然无法加入,忽略 + + def destroy_node(self): + """节点销毁""" + self.control_running = False + # # 先停止键盘控制器(重要!) + if hasattr(self, + 'keyboard_controller') and self.control_tool == "keyboard": + self.keyboard_controller.stop() + if hasattr(self, + 'udp_fsm_controller') and self.control_tool == "udp_loopback": + self.udp_fsm_controller.stop() + if hasattr(self, + 'omnisocket_fsm_controller') and self.control_tool == "omnisocket_loopback": + self.omnisocket_fsm_controller.stop() + + if self.control_thread and self.control_thread.is_alive(): + self.control_thread.join(timeout=1.0) + + super().destroy_node() + + +def main(args=None): + """主函数""" + rclpy.init(args=args) + node = None + try: + node = XMIGCSControlNode(debug=False) + rclpy.spin(node) + except KeyboardInterrupt: + pass + finally: + if 'node' in locals() and node is not None: + node.destroy_node() + rclpy.shutdown() + + + +if __name__ == '__main__': + main()