From 32702e25f37f93a09ae2072f9f7f937a271723f4 Mon Sep 17 00:00:00 2001 From: meiqi <976161896@qq.com> Date: Mon, 30 Mar 2026 14:22:53 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9E=E5=A4=9A=E8=AE=BE=E5=A4=87?= =?UTF-8?q?=E6=8E=A7=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Deploy_Tienkung/common/joystick.py | 20 +++ .../common/stdin_keyboard_control.py | 21 ++- Deploy_Tienkung/common/xbox_control.py | 54 ++++--- Deploy_Tienkung/config/dex_config.yaml | 2 +- Deploy_Tienkung/rl_control_node.py | 141 +++++++++++++++--- .../udp_loopback/udp_fsm_controller.py | 14 ++ .../udp_loopback/udp_keyboard_sender.py | 41 +++-- 7 files changed, 234 insertions(+), 59 deletions(-) diff --git a/Deploy_Tienkung/common/joystick.py b/Deploy_Tienkung/common/joystick.py index 9eadbea..3d3c2af 100644 --- a/Deploy_Tienkung/common/joystick.py +++ b/Deploy_Tienkung/common/joystick.py @@ -3,6 +3,7 @@ Joystick Control Module Python equivalent of the C++ Joystick functionality for ROS Joy messages """ import os +import time import yaml import threading from dataclasses import dataclass @@ -76,6 +77,8 @@ class JoystickHumanoid: self.max_yaw_speed = 0.0 # 高度平滑控制 self.target_height = 0.0 + self.last_input_time = 0.0 + self.last_fsm_command_time = 0.0 # 加载配置文件 self._load_config() @@ -139,11 +142,14 @@ class JoystickHumanoid: x2=msg.axes[0] if len(msg.axes) > 1 else 0.0, y1=msg.axes[2] if len(msg.axes) > 2 else 0.0, y2=msg.axes[1] if len(msg.axes) > 0 else 0.0) + if yunzhuo_map != self.joy_map: + self.last_input_time = time.time() self.joy_map = yunzhuo_map def joy_flag_update(self): """根据手柄输入更新控制标志""" with self.data_mutex: + fsm_command_updated = False # 更新手柄启动标志 if self.joy_map.f == -1.0: self.joy_flag.enable = False @@ -152,14 +158,17 @@ class JoystickHumanoid: # FSM状态切换命令 if self.joy_map.c == 1.0: self.joy_flag.fsm_state_command = "gotoSTOP" + fsm_command_updated = True else: button_pressed_nums = self.check_button_pressed_nums( self.joy_map) if button_pressed_nums == 0: if self.joy_map.d == 1.0: self.joy_flag.fsm_state_command = "gotoZERO" + fsm_command_updated = True elif self.joy_map.a == 1.0: self.joy_flag.fsm_state_command = "gotoWALKAMP" + fsm_command_updated = True # 获取walk速度命令 self.get_x_y_yaw_speed_command() # 获取高度命令 @@ -169,8 +178,13 @@ class JoystickHumanoid: #e上拨 if self.joy_map.a == 1.0: self.joy_flag.fsm_state_command = "gotoBEYONDMIMIC" + fsm_command_updated = True elif self.joy_map.d == 1.0: self.joy_flag.fsm_state_command = "gotoBEYONDZERO" + fsm_command_updated = True + + if fsm_command_updated: + self.last_fsm_command_time = time.time() @@ -179,6 +193,12 @@ class JoystickHumanoid: with self.data_mutex: return self.joy_flag + def get_last_input_time(self) -> float: + return self.last_input_time + + def get_last_fsm_command_time(self) -> float: + return self.last_fsm_command_time + def init(self) -> int: """初始化手柄控制器""" print("Joystick controller initialized") diff --git a/Deploy_Tienkung/common/stdin_keyboard_control.py b/Deploy_Tienkung/common/stdin_keyboard_control.py index a52528a..5869223 100644 --- a/Deploy_Tienkung/common/stdin_keyboard_control.py +++ b/Deploy_Tienkung/common/stdin_keyboard_control.py @@ -8,6 +8,7 @@ import select import termios import tty import os +import time import yaml from typing import Optional from .joystick import ControlFlag @@ -50,6 +51,8 @@ class KeyboardController: self.running = False self.input_thread = None self.original_terminal_settings = None + self.last_input_time = 0.0 + self.last_fsm_command_time = 0.0 # 加载配置文件 self._load_config() @@ -166,6 +169,7 @@ class KeyboardController: def _process_key(self, key): """处理按键输入""" + handled = True if key == 'w': self._on_w_key() elif key == 's': @@ -207,7 +211,10 @@ class KeyboardController: self._handle_arrow_key() else: # 忽略其他按键 - pass + handled = False + + if handled: + self.last_input_time = time.time() def _handle_arrow_key(self): """处理方向键序列""" @@ -296,17 +303,20 @@ class KeyboardController: """处理z键 - 切换到ZERO状态""" with self.data_mutex: self.keyboard_flag.fsm_state_command = "gotoZERO" + self.last_fsm_command_time = time.time() print("Command: gotoZERO") def _on_v_key(self): """处理v键 - 切换到BEYONGDMIMIC状态""" with self.data_mutex: self.keyboard_flag.fsm_state_command = "gotoBEYONDMIMIC" + self.last_fsm_command_time = time.time() print("Command: gotoBEYONDMIMIC") def _on_c_key(self): """处理c键 - 切换到STOP状态""" with self.data_mutex: self.keyboard_flag.fsm_state_command = "gotoSTOP" + self.last_fsm_command_time = time.time() print("Command: gotoSTOP") @@ -346,6 +356,7 @@ class KeyboardController: """处理m键 - 切换到WALKAMP状态""" with self.data_mutex: self.keyboard_flag.fsm_state_command = "gotoWALKAMP" + self.last_fsm_command_time = time.time() print("Command: gotoWALKAMP") def _on_p_key(self): @@ -355,6 +366,7 @@ class KeyboardController: self.keyboard_flag.y_speed_command = 0.0 self.keyboard_flag.yaw_speed_command = 0.0 self.keyboard_flag.fsm_state_command = "gotoMYPOLICY" + self.last_fsm_command_time = time.time() print("Command: gotoMYPOLICY (movement commands reset to zero)") def _on_n_key(self): @@ -364,6 +376,7 @@ class KeyboardController: self.keyboard_flag.y_speed_command = 0.0 self.keyboard_flag.yaw_speed_command = 0.0 self.keyboard_flag.fsm_state_command = "gotoXSIMRUN" + self.last_fsm_command_time = time.time() print("Command: gotoXSIMRUN (movement commands reset to zero)") def _handle_ctrl_c(self): @@ -418,6 +431,12 @@ class KeyboardController: flag_copy = KeyboardFlag() flag_copy.__dict__.update(self.keyboard_flag.__dict__) return flag_copy + + def get_last_input_time(self) -> float: + return self.last_input_time + + def get_last_fsm_command_time(self) -> float: + return self.last_fsm_command_time def init(self) -> int: """初始化键盘控制器""" diff --git a/Deploy_Tienkung/common/xbox_control.py b/Deploy_Tienkung/common/xbox_control.py index a50b543..0fa51b9 100644 --- a/Deploy_Tienkung/common/xbox_control.py +++ b/Deploy_Tienkung/common/xbox_control.py @@ -3,6 +3,7 @@ XBOX Controller compatibility layer. Implements the same FSM modes and control flags as `stdin_keyboard_control.py` / `joystick.py`. """ import os +import time import yaml import threading from typing import Optional @@ -53,6 +54,8 @@ class XBOXController: self.map = XBOXMap() self.flag = XBOXFlag() self.data_mutex = threading.Lock() + self.last_input_time = 0.0 + self.last_fsm_command_time = 0.0 # state tracking self.last_select = 0 @@ -130,44 +133,51 @@ class XBOXController: # axes layout may differ; try safe indexing axes = list(msg.axes) + [0.0] * 16 buttons = list(msg.buttons) + [0] * 32 - - # common mapping assumptions (best-effort) - self.map.lx = axes[self.axis_map['lx']] - self.map.ly = axes[self.axis_map['ly']] - self.map.rx = axes[self.axis_map['rx']] - self.map.ry = axes[self.axis_map['ry']] - # triggers sometimes on axes - self.map.l_trigger = axes[self.axis_map['l_trigger']] - self.map.r_trigger = axes[self.axis_map['r_trigger']] - # dpad may be on axes - self.map.dpad_h = axes[self.axis_map['dpad_h']] - self.map.dpad_v = axes[self.axis_map['dpad_v']] - - # buttons using button_map indices + + new_map = XBOXMap( + lx=axes[self.axis_map['lx']], + ly=axes[self.axis_map['ly']], + rx=axes[self.axis_map['rx']], + ry=axes[self.axis_map['ry']], + l_trigger=axes[self.axis_map['l_trigger']], + r_trigger=axes[self.axis_map['r_trigger']], + dpad_h=axes[self.axis_map['dpad_h']], + dpad_v=axes[self.axis_map['dpad_v']], + ) + for name, idx in self.button_map.items(): try: val = buttons[idx] except Exception: val = 0 - setattr(self.map, name, val) + setattr(new_map, name, val) + + if new_map != self.map: + self.last_input_time = time.time() + self.map = new_map def xbox_flag_update(self): """Update ControlFlag from the xbox map, mirroring joystick logic.""" with self.data_mutex: + fsm_command = None # FSM state mapping - cover keyboard commands z/c/m/h/g/p/o # c -> gotoSTOP if self.map.y == 1: - self.flag.fsm_state_command = 'gotoSTOP' + fsm_command = 'gotoSTOP' # a -> gotoWALKAMP elif self.map.a == 1: - self.flag.fsm_state_command = 'gotoWALKAMP' + fsm_command = 'gotoWALKAMP' # h -> gotoDH (Left trigger + A) # v -> gotoBEYONDMIMIC (Left trigger + home) elif self.map.l_trigger < -0.5 and self.map.home == 1: - self.flag.fsm_state_command = 'gotoBEYONDMIMIC' + fsm_command = 'gotoBEYONDMIMIC' # z -> gotoZERO elif self.map.x == 1: - self.flag.fsm_state_command = 'gotoZERO' + fsm_command = 'gotoZERO' + + if fsm_command is not None: + self.flag.fsm_state_command = fsm_command + self.last_fsm_command_time = time.time() # detect state change if not hasattr(self, '_last_state'): @@ -251,6 +261,12 @@ class XBOXController: with self.data_mutex: return self.flag + def get_last_input_time(self) -> float: + return self.last_input_time + + def get_last_fsm_command_time(self) -> float: + return self.last_fsm_command_time + def init(self) -> int: print("XBOX controller initialized") return 0 diff --git a/Deploy_Tienkung/config/dex_config.yaml b/Deploy_Tienkung/config/dex_config.yaml index 4eed57e..e5b581d 100644 --- a/Deploy_Tienkung/config/dex_config.yaml +++ b/Deploy_Tienkung/config/dex_config.yaml @@ -6,7 +6,7 @@ sim: false debug: false -control_tool: keyboard # joystick, xbox, keyboard, udp_loopback +control_tool: [keyboard] # joystick, xbox, keyboard, udp_loopback joystick: initial_height: 0.957 diff --git a/Deploy_Tienkung/rl_control_node.py b/Deploy_Tienkung/rl_control_node.py index b759aba..637a8f3 100644 --- a/Deploy_Tienkung/rl_control_node.py +++ b/Deploy_Tienkung/rl_control_node.py @@ -47,6 +47,8 @@ def timing_decorator(func): class XMIGCSControlNode(Node): """xMIGCS控制节点Python版本""" + VALID_CONTROL_TOOLS = ("joystick", "xbox", "keyboard", "udp_loopback") + def __init__(self, debug=False): super().__init__('xmigcs_control_node') @@ -95,7 +97,8 @@ class XMIGCSControlNode(Node): print(self.config) # 获取控制器类型 - self.control_tool = self.config.get('control_tool', 'keyboard') + raw_control_tool = self.config.get('control_tool', 'keyboard') + self.control_tools = self._normalize_control_tools(raw_control_tool) # 提取关键配置参数 self.motor_num = self.config.get('motor_num') self.dt = self.config.get('dt') @@ -107,6 +110,34 @@ class XMIGCSControlNode(Node): if self.sim and user_name == 'ubuntu': raise RuntimeError("On ubuntu user, sim must be set to false") + def _normalize_control_tools(self, raw_control_tool): + """Normalize control_tool config to an ordered tool list.""" + if isinstance(raw_control_tool, str): + normalized = raw_control_tool.replace("+", ",") + requested_tools = [ + item.strip() for item in normalized.split(",") if item.strip() + ] + elif isinstance(raw_control_tool, list): + requested_tools = [str(item).strip() for item in raw_control_tool if str(item).strip()] + else: + raise ValueError("control_tool must be a string or list") + + if not requested_tools: + requested_tools = ["keyboard"] + + deduped_tools = [] + for tool_name in requested_tools: + if tool_name not in self.VALID_CONTROL_TOOLS: + raise ValueError( + f"Unsupported control tool '{tool_name}'. " + f"Expected one of {self.VALID_CONTROL_TOOLS}" + ) + if tool_name not in deduped_tools: + deduped_tools.append(tool_name) + + print(f"[control] active tools: {deduped_tools}") + return deduped_tools + def _init_data_structures(self): """初始化数据结构""" # 机器人数据 @@ -117,6 +148,7 @@ class XMIGCSControlNode(Node): self.queue_joy_cmd = queue.Queue(maxsize=1) self.queue_xbox_cmd = queue.Queue(maxsize=1) self.control_flag = ControlFlag() + self.last_applied_fsm_command_time = 0.0 def _init_ros_interfaces(self): """初始化ROS接口(仅非电机相关)""" @@ -125,10 +157,10 @@ class XMIGCSControlNode(Node): depth=5) # 订阅者(非电机相关) - if self.control_tool == "joystick": + if "joystick" in self.control_tools: self.sub_joy_cmd = self.create_subscription( Joy, '/sbus_data', self._joy_callback, qos_profile) - if self.control_tool == "xbox": + if "xbox" in self.control_tools: self.sub_xbox_cmd = self.create_subscription( Joy, '/xbox_data', self._xbox_callback, qos_profile) @@ -136,25 +168,25 @@ class XMIGCSControlNode(Node): """初始化控制系统""" # 手柄控制器 - if self.control_tool == "joystick": + if "joystick" in self.control_tools: self.joystick_humanoid = JoystickHumanoid() self.joystick_humanoid.init() # 键盘控制器 - if self.control_tool == "keyboard": + if "keyboard" in self.control_tools: self.keyboard_controller = KeyboardController() self.keyboard_controller.init() # 如果使用键盘控制,启动键盘监听 self.keyboard_controller.start() # UDP控制器 - if self.control_tool == "udp_loopback": + if "udp_loopback" in self.control_tools: self.udp_fsm_controller = UDPFSMController() self.udp_fsm_controller.init() self.udp_fsm_controller.start() # Xbox控制器 - if self.control_tool == "xbox": + if "xbox" in self.control_tools: self.xbox_controller = XBOXController() self.xbox_controller.init() @@ -247,8 +279,10 @@ class XMIGCSControlNode(Node): # @timing_decorator def _process_controller_data(self): + source_flags = [] + # 处理控制器输入 - if self.control_tool == "joystick": + if "joystick" in self.control_tools: # 处理手柄输入 while not self.queue_joy_cmd.empty(): try: @@ -258,7 +292,14 @@ class XMIGCSControlNode(Node): break except queue.Empty: break - if self.control_tool == "xbox": + source_flags.append(( + "joystick", + self.joystick_humanoid.get_joy_flag(), + self.joystick_humanoid.get_last_input_time(), + self.joystick_humanoid.get_last_fsm_command_time(), + )) + + if "xbox" in self.control_tools: while not self.queue_xbox_cmd.empty(): try: msg = self.queue_xbox_cmd.get_nowait() @@ -267,22 +308,78 @@ class XMIGCSControlNode(Node): break except queue.Empty: break + source_flags.append(( + "xbox", + self.xbox_controller.get_xbox_flag(), + self.xbox_controller.get_last_input_time(), + self.xbox_controller.get_last_fsm_command_time(), + )) - if self.control_tool == "keyboard": + if "keyboard" in self.control_tools: self.keyboard_controller.update_flag() - flag = self.keyboard_controller.get_keyboard_flag() - elif self.control_tool == "udp_loopback": + source_flags.append(( + "keyboard", + self.keyboard_controller.get_keyboard_flag(), + self.keyboard_controller.get_last_input_time(), + self.keyboard_controller.get_last_fsm_command_time(), + )) + + if "udp_loopback" in self.control_tools: self.udp_fsm_controller.update_flag() - flag = self.udp_fsm_controller.get_udp_flag() - elif self.control_tool == "joystick": - flag = self.joystick_humanoid.get_joy_flag() - elif self.control_tool == "xbox": - flag = self.xbox_controller.get_xbox_flag() - else: - print("[ERROR] No control tool specified") - print('*' * 30 + f"current flag: {flag}" + '*' * 30) + source_flags.append(( + "udp_loopback", + self.udp_fsm_controller.get_udp_flag(), + self.udp_fsm_controller.get_last_input_time(), + self.udp_fsm_controller.get_last_fsm_command_time(), + )) + + source_name, active_flag = self._select_control_flag(source_flags) + flag = self._copy_control_flag(active_flag) + flag.fsm_state_command = self._select_fsm_command(source_flags) + print('*' * 30 + f"current flag source={source_name}: {flag}" + '*' * 30) self.control_flag = flag + def _select_control_flag(self, source_flags): + """Select the most recently active controller.""" + if not source_flags: + print("[ERROR] No control tool specified") + return "none", ControlFlag() + + selected_source = source_flags[0] + for candidate in source_flags[1:]: + if candidate[2] > selected_source[2]: + selected_source = candidate + + return selected_source[0], selected_source[1] + + def _copy_control_flag(self, flag: ControlFlag) -> ControlFlag: + flag_copy = type(flag)() + flag_copy.__dict__.update(flag.__dict__) + return flag_copy + + def _select_fsm_command(self, source_flags) -> str: + latest_source = None + for candidate in source_flags: + if latest_source is None or candidate[3] > latest_source[3]: + latest_source = candidate + + if latest_source is not None and latest_source[3] > self.last_applied_fsm_command_time: + self.last_applied_fsm_command_time = latest_source[3] + return latest_source[1].fsm_state_command + + return self._get_hold_fsm_command() + + def _get_hold_fsm_command(self) -> str: + state_to_command = { + FSMStateName.STOP: "gotoSTOP", + FSMStateName.ZERO: "gotoZERO", + FSMStateName.WALKAMP: "gotoWALKAMP", + FSMStateName.MYPOLICY: "gotoMYPOLICY", + FSMStateName.XSIMRUN: "gotoXSIMRUN", + } + current_state = self.robot_fsm.get_current_state() + return state_to_command.get(current_state, self.control_flag.fsm_state_command) + # @timing_decorator def _update_robot_data(self, flag: ControlFlag, time_passed: float): """更新机器人数据""" @@ -321,10 +418,10 @@ class XMIGCSControlNode(Node): self.control_running = False # # 先停止键盘控制器(重要!) if hasattr(self, - 'keyboard_controller') and self.control_tool == "keyboard": + 'keyboard_controller') and "keyboard" in self.control_tools: self.keyboard_controller.stop() if hasattr(self, - 'udp_fsm_controller') and self.control_tool == "udp_loopback": + 'udp_fsm_controller') and "udp_loopback" in self.control_tools: self.udp_fsm_controller.stop() if self.control_thread and self.control_thread.is_alive(): diff --git a/Deploy_Tienkung/udp_loopback/udp_fsm_controller.py b/Deploy_Tienkung/udp_loopback/udp_fsm_controller.py index cbd0079..072ae8e 100644 --- a/Deploy_Tienkung/udp_loopback/udp_fsm_controller.py +++ b/Deploy_Tienkung/udp_loopback/udp_fsm_controller.py @@ -72,6 +72,7 @@ class UDPFSMController: self.udp_flag = UDPFSMFlag() self.udp_flag.height_cmd = self.initial_lift self.last_seq_id = -1 + self.last_fsm_command_time = 0.0 def start(self) -> None: self.rx_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) @@ -132,6 +133,14 @@ class UDPFSMController: flag_copy.__dict__.update(self.udp_flag.__dict__) return flag_copy + def get_last_input_time(self) -> float: + with self.data_mutex: + return self.motion_frame.last_rx_time + + def get_last_fsm_command_time(self) -> float: + with self.data_mutex: + return self.last_fsm_command_time + def init(self) -> int: print("UDP FSM controller initialized") return 0 @@ -145,14 +154,19 @@ class UDPFSMController: event_code = packet.event_code if event_code == "pose_home": self.motion_frame.mode_tag = "pose_home" + self.last_fsm_command_time = packet.sent_at elif event_code == "pose_hold": self.motion_frame.mode_tag = "pose_hold" + self.last_fsm_command_time = packet.sent_at elif event_code == "mode_stride": self.motion_frame.mode_tag = "mode_stride" + self.last_fsm_command_time = packet.sent_at elif event_code == "mode_dash": self.motion_frame.mode_tag = "mode_dash" + self.last_fsm_command_time = packet.sent_at elif event_code == "mode_xrun": self.motion_frame.mode_tag = "mode_xrun" + self.last_fsm_command_time = packet.sent_at elif event_code == "surge_up": self.motion_frame.surge_goal = min( self.max_surge, self.motion_frame.surge_goal + self.surge_step diff --git a/Deploy_Tienkung/udp_loopback/udp_keyboard_sender.py b/Deploy_Tienkung/udp_loopback/udp_keyboard_sender.py index 9224e46..00a5173 100644 --- a/Deploy_Tienkung/udp_loopback/udp_keyboard_sender.py +++ b/Deploy_Tienkung/udp_loopback/udp_keyboard_sender.py @@ -64,6 +64,9 @@ class UDPKeyboardSender: print(" Left/Right -> lift +/-") print(" Up/Down -> surge +/-") print(" r -> trim_reset") + print(" 4 -> clear x speed") + print(" 5 -> clear y speed") + print(" 6 -> clear yaw speed") print(" x -> session_quit") def start(self) -> None: @@ -106,19 +109,22 @@ class UDPKeyboardSender: def _process_key(self, key: str) -> None: event_map = { - "w": ("surge_up", "w"), - "s": ("surge_down", "s"), - "a": ("sway_left", "a"), - "d": ("sway_right", "d"), - "q": ("spin_left", "q"), - "e": ("spin_right", "e"), - "z": ("pose_home", "z"), - "c": ("pose_hold", "c"), - "m": ("mode_stride", "m"), - "p": ("mode_dash", "p"), - "n": ("mode_xrun", "n"), - "r": ("trim_reset", "r"), - "x": ("session_quit", "x"), + "w": ("surge_up", "w", 1.0), + "s": ("surge_down", "s", 1.0), + "a": ("sway_left", "a", 1.0), + "d": ("sway_right", "d", 1.0), + "q": ("spin_left", "q", 1.0), + "e": ("spin_right", "e", 1.0), + "z": ("pose_home", "z", 1.0), + "c": ("pose_hold", "c", 1.0), + "m": ("mode_stride", "m", 1.0), + "p": ("mode_dash", "p", 1.0), + "n": ("mode_xrun", "n", 1.0), + "r": ("trim_reset", "r", 1.0), + "4": ("set_surge", "4", 0.0), + "5": ("set_sway", "5", 0.0), + "6": ("set_spin", "6", 0.0), + "x": ("session_quit", "x", 1.0), } if key == "\x03": @@ -130,8 +136,8 @@ class UDPKeyboardSender: return if key in event_map: - event_code, key_name = event_map[key] - self._send_event(event_code, key_name) + event_code, key_name, drive_value = event_map[key] + self._send_event(event_code, key_name, drive_value=drive_value) def _handle_arrow_key(self) -> None: if not select.select([sys.stdin], [], [], 0.1)[0]: @@ -152,13 +158,16 @@ class UDPKeyboardSender: event_code, key_name = arrow_map[key3] self._send_event(event_code, key_name) - def _send_event(self, event_code: str, key_name: str) -> None: + def _send_event( + self, event_code: str, key_name: str, drive_value: float = 1.0 + ) -> None: if self.socket is None: return envelope = InputEnvelope( seq_id=self.seq_id, event_code=event_code, key_name=key_name, + drive_value=drive_value, source_tag=self.source_tag, ) self.seq_id += 1