diff --git a/.gitignore b/.gitignore index e69de29..120b983 100644 --- a/.gitignore +++ b/.gitignore @@ -0,0 +1,3 @@ +/TienKung_URDF +/TienKung-Lab +/xGMR \ No newline at end of file diff --git a/Deploy_Tienkung/common/xbox_control.py b/Deploy_Tienkung/common/xbox_control.py index 88eb980..a50b543 100644 --- a/Deploy_Tienkung/common/xbox_control.py +++ b/Deploy_Tienkung/common/xbox_control.py @@ -69,7 +69,6 @@ class XBOXController: # smoothing self.height_step = 0.05 - self._load_config() # default button map indices (can be overridden in config) self.button_map = { 'a': 0, 'b': 1, 'x': 2, 'y': 3, @@ -84,6 +83,8 @@ class XBOXController: 'dpad_h': 6, 'dpad_v': 7 } + self._load_config() + def _load_config(self): try: config_path = os.path.join('.', 'config', 'dex_config.yaml') @@ -91,22 +92,24 @@ class XBOXController: cfg = yaml.safe_load(f) or {} xbox_cfg = cfg.get('xbox', {}) # override button_map if provided - # bm = xbox_cfg.get('button_map') - # if isinstance(bm, dict): - # for k, v in bm.items(): - # try: - # self.button_map[k] = int(v) - # except Exception: - # pass - - # # override axis_map if provided - # am = xbox_cfg.get('axis_map') - # if isinstance(am, dict): - # for k, v in am.items(): - # try: - # self.axis_map[k] = int(v) - # except Exception: - # pass + bm = xbox_cfg.get('button_map') + if isinstance(bm, dict): + for k, v in bm.items(): + if k in self.button_map: + try: + self.button_map[k] = int(v) + except Exception: + pass + + # override axis_map if provided + am = xbox_cfg.get('axis_map') + if isinstance(am, dict): + for k, v in am.items(): + if k in self.axis_map: + try: + self.axis_map[k] = int(v) + except Exception: + pass self.initial_height = xbox_cfg.get('initial_height', 0.89) self.forward_command_offset = xbox_cfg.get('forward_command_offset', 0.0) @@ -155,6 +158,9 @@ class XBOXController: # c -> gotoSTOP if self.map.y == 1: self.flag.fsm_state_command = 'gotoSTOP' + # a -> gotoWALKAMP + elif self.map.a == 1: + self.flag.fsm_state_command = 'gotoWALKAMP' # h -> gotoDH (Left trigger + A) # v -> gotoBEYONDMIMIC (Left trigger + home) elif self.map.l_trigger < -0.5 and self.map.home == 1: diff --git a/Deploy_Tienkung/policy/stop/fsm_stop.py b/Deploy_Tienkung/policy/stop/fsm_stop.py index 17b7ef3..8fcd714 100644 --- a/Deploy_Tienkung/policy/stop/fsm_stop.py +++ b/Deploy_Tienkung/policy/stop/fsm_stop.py @@ -80,6 +80,8 @@ class FSMStateStop(FSMState): return FSMStateName.STOP elif flag.fsm_state_command == "gotoZERO": return FSMStateName.ZERO + elif flag.fsm_state_command == "gotoWALKAMP": + return FSMStateName.WALKAMP elif flag.fsm_state_command == "gotoMYPOLICY": return FSMStateName.MYPOLICY elif flag.fsm_state_command == "gotoXSIMRUN": diff --git a/Deploy_Tienkung/policy/xsim_run/config/xsim_run.yaml b/Deploy_Tienkung/policy/xsim_run/config/xsim_run.yaml new file mode 100644 index 0000000..62fb4fd --- /dev/null +++ b/Deploy_Tienkung/policy/xsim_run/config/xsim_run.yaml @@ -0,0 +1,82 @@ +model_path: "../mypolicy/model/policy.onnx" +motor_num: 29 +actions_size: 23 +dt: 0.01 +warm_start_time: 0.0 +command_clip: 1.0 + +sim: + mujoco_timestep: 0.005 + +joint_names: [ + hip_pitch_l_joint, hip_pitch_r_joint, waist_yaw_joint, + hip_roll_l_joint, hip_roll_r_joint, waist_roll_joint, + hip_yaw_l_joint, hip_yaw_r_joint, waist_pitch_joint, + knee_pitch_l_joint, knee_pitch_r_joint, + shoulder_pitch_l_joint, shoulder_pitch_r_joint, + ankle_pitch_l_joint, ankle_pitch_r_joint, + shoulder_roll_l_joint, shoulder_roll_r_joint, + ankle_roll_l_joint, ankle_roll_r_joint, + shoulder_yaw_l_joint, shoulder_yaw_r_joint, + elbow_pitch_l_joint, elbow_pitch_r_joint +] + +control: + action_scale: 0.25 + decimation: 2 + +gait: + gait_air_ratio_l: 0.6 + gait_air_ratio_r: 0.6 + gait_phase_offset_l: 0.6 + gait_phase_offset_r: 0.1 + gait_cycle: 0.64 + +normalization: + clip_scales: + clip_observations: 100.0 + clip_actions: 100.0 + +size: + num_hist: 10 + observations_size: 84 + +gains: + kp: [ + 300.0, 300.0, 400.0, + 300.0, 300.0, 400.0, + 150.0, 150.0, 400.0, + 350.0, 350.0, + 150.0, 150.0, + 30.0, 30.0, + 50.0, 50.0, + 16.8, 16.8, + 50.0, 50.0, + 150.0, 150.0 + ] + kd: [ + 10.0, 10.0, 5.0, + 10.0, 10.0, 10.0, + 5.0, 5.0, 10.0, + 10.0, 10.0, + 7.5, 7.5, + 2.5, 2.5, + 2.5, 2.5, + 1.4, 1.4, + 2.5, 2.5, + 5.0, 5.0 + ] + +init_state: + default_joint_angles: [ + 0.0, 0.0, 0.0, + -0.5, -0.5, 0.0, + 0.0, 0.0, 0.0, + 1.0, 1.0, + 0.0, 0.0, + -0.5, -0.5, + 0.2, -0.2, + 0.0, 0.0, + 0.0, 0.0, + -0.3, -0.3 + ] diff --git a/Deploy_Tienkung/policy/xsim_run/fsm_xsim_run.py b/Deploy_Tienkung/policy/xsim_run/fsm_xsim_run.py new file mode 100644 index 0000000..763a692 --- /dev/null +++ b/Deploy_Tienkung/policy/xsim_run/fsm_xsim_run.py @@ -0,0 +1,303 @@ +""" +FSM state implementation for an xSIM MuJoCo run policy that follows the +TienKung-Lab sim2sim observation/action flow more closely. +""" + +import os + +import numpy as np +import onnxruntime as ort +import yaml +from scipy.spatial.transform import Rotation + +from FSM.fsm_base import FSMState, FSMStateName +from common.joystick import ControlFlag +from common.robot_data import RobotData + + +class FSMStateXSIMRUN(FSMState): + """Direct-position run policy for xSIM MuJoCo.""" + + def __init__(self, robot_data: RobotData): + super().__init__(robot_data) + self.current_state_name = FSMStateName.XSIMRUN + self.log_prefix = "FSMStateXSIMRUN" + + current_dir = os.path.dirname(os.path.abspath(__file__)) + config_path = os.path.join(current_dir, "config", "xsim_run.yaml") + with open(config_path, "r", encoding="utf-8") as f: + policy_config = yaml.safe_load(f) + + self.action_num_ = int(policy_config["actions_size"]) + self.motor_num_ = int(policy_config["motor_num"]) + self.dt_ = float(policy_config["dt"]) + self.command_clip_ = float(policy_config.get("command_clip", 1.0)) + + size_config = policy_config.get("size", {}) + self.num_hist_ = int(size_config["num_hist"]) + self.obs_size_ = int(size_config["observations_size"]) + + control_config = policy_config.get("control", {}) + self.action_scale_ = float(control_config["action_scale"]) + self.decimation_ = int(control_config["decimation"]) + self.warm_start_time_ = float( + control_config.get( + "warm_start_time", + policy_config.get("warm_start_time", 0.0), + ) + ) + + sim_config = policy_config.get("sim", {}) + self.mujoco_timestep_ = float(sim_config.get("mujoco_timestep", 0.005)) + self.policy_period_ = self.dt_ * self.decimation_ + + gait_config = policy_config.get("gait", {}) + self.gait_cycle_ = float(gait_config["gait_cycle"]) + self.phase_ratio_ = np.array( + [ + gait_config["gait_air_ratio_l"], + gait_config["gait_air_ratio_r"], + ], + dtype=np.float32, + ) + self.phase_offset_ = np.array( + [ + gait_config["gait_phase_offset_l"], + gait_config["gait_phase_offset_r"], + ], + dtype=np.float32, + ) + + norm_config = policy_config.get("normalization", {}) + clip_config = norm_config.get("clip_scales", {}) + self.clip_obs_ = float(clip_config.get("clip_observations", 100.0)) + self.clip_act_ = float(clip_config.get("clip_actions", 100.0)) + + self.default_angles_lab_ = np.array( + policy_config["init_state"]["default_joint_angles"], + dtype=np.float32, + ) + self.stiffness_lab_ = np.array(policy_config["gains"]["kp"], dtype=np.float32) + self.damping_lab_ = np.array(policy_config["gains"]["kd"], dtype=np.float32) + + model_rel_path = policy_config["model_path"] + self.model_path_ = os.path.normpath(os.path.join(current_dir, model_rel_path)) + self._init_onnx_session() + + # sim2sim.py uses policy output in Isaac order and then maps to MuJoCo order. + self.mujoco_to_policy_idx_ = np.array( + [0, 6, 12, 1, 7, 13, 2, 8, 14, 3, 9, 15, 19, 4, 10, 16, 20, 5, 11, 17, 21, 18, 22], + dtype=int, + ) + self.policy_to_mujoco_idx_ = np.array( + [0, 3, 6, 9, 13, 17, 1, 4, 7, 10, 14, 18, 2, 5, 8, 11, 15, 19, 21, 12, 16, 20, 22], + dtype=int, + ) + + # RobotData stores 29 joints in leg -> waist -> arm order. + self.mujoco_control_indices_ = np.array( + [ + 0, 1, 2, 3, 4, 5, + 6, 7, 8, 9, 10, 11, + 12, 13, 14, + 15, 16, 17, 18, + 22, 23, 24, 25, + ], + dtype=int, + ) + + self.default_angles_mujoco23_ = self.default_angles_lab_[self.policy_to_mujoco_idx_] + self.observations_ = np.zeros(self.obs_size_ * self.num_hist_, dtype=np.float32) + self.obs_history_ = np.zeros_like(self.observations_) + self.actions_ = np.zeros(self.action_num_, dtype=np.float32) + self.last_actions_ = np.zeros(self.action_num_, dtype=np.float32) + self.current_gait_ = np.zeros(6, dtype=np.float32) + self.hold_pose_29_ = np.zeros(self.motor_num_, dtype=np.float32) + self._warm_start_pose_29_ = np.zeros(self.motor_num_, dtype=np.float32) + self._first_obs = True + self._policy_step_counter = 0 + self.waiting_for_motion_ = True + self.motion_threshold_ = 1e-3 + + if self.warm_start_time_ > 0 and self.policy_period_ > 0: + self._warm_start_steps = max(1, int(self.warm_start_time_ / self.policy_period_)) + else: + self._warm_start_steps = 0 + self._warmup_inference_counter = 0 + + self.kp_29_ = np.zeros(self.motor_num_, dtype=np.float32) + self.kd_29_ = np.zeros(self.motor_num_, dtype=np.float32) + for lab_idx, mj_idx in enumerate(self.mujoco_control_indices_[self.policy_to_mujoco_idx_]): + self.kp_29_[mj_idx] = self.stiffness_lab_[lab_idx] + self.kd_29_[mj_idx] = self.damping_lab_[lab_idx] + + def _init_onnx_session(self) -> None: + options = ort.SessionOptions() + options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL + options.intra_op_num_threads = 1 + options.inter_op_num_threads = 1 + self.ort_session_ = ort.InferenceSession( + self.model_path_, + options, + providers=["CPUExecutionProvider"], + ) + print(f"[{self.log_prefix}] ONNX model loaded: {self.model_path_}") + + def on_enter(self): + self.observations_.fill(0.0) + self.obs_history_.fill(0.0) + self.actions_.fill(0.0) + self.last_actions_.fill(0.0) + self.current_gait_.fill(0.0) + self._first_obs = True + self._policy_step_counter = 0 + self._warmup_inference_counter = 0 + self.waiting_for_motion_ = True + + current_q = self.robot_data_.get_joint_pos().copy() + self.hold_pose_29_ = current_q + self._warm_start_pose_29_ = current_q + + base = self.robot_data_.q_d_.shape[0] - self.motor_num_ + self.robot_data_.q_d_[base:base + self.motor_num_] = current_q + self.robot_data_.q_dot_d_[base:base + self.motor_num_] = 0.0 + self.robot_data_.tau_d_[base:base + self.motor_num_] = 0.0 + self.robot_data_.joint_kp_p_[:self.motor_num_] = self.kp_29_ + self.robot_data_.joint_kd_p_[:self.motor_num_] = self.kd_29_ + print(f"[{self.log_prefix}] enter") + + def run(self, flag: ControlFlag): + walk_cmd = np.clip( + np.array(self.robot_data_.get_walk_cmd(), dtype=np.float32), + -self.command_clip_, + self.command_clip_, + ) + base = self.robot_data_.q_d_.shape[0] - self.motor_num_ + + if self.waiting_for_motion_: + if np.max(np.abs(walk_cmd)) <= self.motion_threshold_: + self.robot_data_.q_d_[base:base + self.motor_num_] = self.hold_pose_29_ + self.robot_data_.q_dot_d_[base:base + self.motor_num_] = 0.0 + self.robot_data_.tau_d_[base:base + self.motor_num_] = 0.0 + self.robot_data_.joint_kp_p_[:self.motor_num_] = self.kp_29_ + self.robot_data_.joint_kd_p_[:self.motor_num_] = self.kd_29_ + return + self.waiting_for_motion_ = False + self._warm_start_pose_29_ = self.robot_data_.get_joint_pos().copy() + print(f"[{self.log_prefix}] motion command detected: {walk_cmd}") + + if int(self.robot_data_.time_now_ / self.dt_) % self.decimation_ == 0: + self.current_gait_ = self._compute_gait_features() + self.compute_observation(walk_cmd) + self.compute_actions() + + target_mujoco23 = ( + self.actions_[self.policy_to_mujoco_idx_] * self.action_scale_ + + self.default_angles_mujoco23_ + ) + target_q_29 = self.hold_pose_29_.copy() + target_q_29[self.mujoco_control_indices_] = target_mujoco23 + + commanded_q_29 = target_q_29 + if self._warm_start_steps > 0 and self._warmup_inference_counter < self._warm_start_steps: + self._warmup_inference_counter += 1 + blend = self._warmup_inference_counter / float(self._warm_start_steps) + commanded_q_29 = (1.0 - blend) * self._warm_start_pose_29_ + blend * target_q_29 + + self.robot_data_.q_d_[base:base + self.motor_num_] = commanded_q_29 + self.robot_data_.q_dot_d_[base:base + self.motor_num_] = 0.0 + self.robot_data_.tau_d_[base:base + self.motor_num_] = 0.0 + self.robot_data_.joint_kp_p_[:self.motor_num_] = self.kp_29_ + self.robot_data_.joint_kd_p_[:self.motor_num_] = self.kd_29_ + self.last_actions_[:] = self.actions_ + + def _compute_gait_features(self) -> np.ndarray: + t = self._policy_step_counter * self.policy_period_ / self.gait_cycle_ + gait_phase = (t + self.phase_offset_) % 1.0 + self._policy_step_counter += 1 + return np.concatenate( + [ + np.sin(2.0 * np.pi * gait_phase), + np.cos(2.0 * np.pi * gait_phase), + self.phase_ratio_, + ], + axis=0, + ).astype(np.float32) + + def compute_observation(self, walk_cmd: np.ndarray): + if np.linalg.norm(self.robot_data_.imu_quat_) > 0.0: + q_wxyz = self.robot_data_.imu_quat_.astype(np.float32) + q_xyzw = np.array([q_wxyz[1], q_wxyz[2], q_wxyz[3], q_wxyz[0]], dtype=np.float32) + else: + roll = float(self.robot_data_.imu_data_[2]) + pitch = float(self.robot_data_.imu_data_[1]) + yaw = float(self.robot_data_.imu_data_[0]) + q_wxyz = self.euler_to_quaternion_scipy(roll, pitch, yaw) + q_xyzw = np.array([q_wxyz[1], q_wxyz[2], q_wxyz[3], q_wxyz[0]], dtype=np.float32) + + gravity = self.quat_rotate_inverse_numpy(q_xyzw, np.array([0.0, 0.0, -1.0], dtype=np.float32)) + q_29 = self.robot_data_.get_joint_pos() + dq_29 = self.robot_data_.get_joint_vel() + q_23 = q_29[self.mujoco_control_indices_] + dq_23 = dq_29[self.mujoco_control_indices_] + + proprio = np.concatenate( + [ + self.robot_data_.get_angular_velocity(), + gravity, + walk_cmd, + (q_23 - self.default_angles_mujoco23_)[self.mujoco_to_policy_idx_], + dq_23[self.mujoco_to_policy_idx_], + np.clip(self.last_actions_, -self.clip_act_, self.clip_act_), + self.current_gait_, + ], + axis=0, + ).astype(np.float32) + + if self._first_obs: + for i in range(self.num_hist_): + start = i * self.obs_size_ + self.obs_history_[start:start + self.obs_size_] = proprio + self._first_obs = False + else: + self.obs_history_ = np.roll(self.obs_history_, -self.obs_size_) + self.obs_history_[-self.obs_size_:] = proprio + + self.observations_ = np.clip(self.obs_history_, -self.clip_obs_, self.clip_obs_) + + def compute_actions(self): + input_name = self.ort_session_.get_inputs()[0].name + input_data = self.observations_.reshape(1, -1).astype(np.float32) + outputs = self.ort_session_.run(None, {input_name: input_data}) + self.actions_[:] = np.clip(outputs[0][0][: self.action_num_], -self.clip_act_, self.clip_act_) + + def on_exit(self): + print(f"[{self.log_prefix}] exit") + + def check_transition(self, flag: ControlFlag) -> FSMStateName: + if flag.fsm_state_command == "gotoSTOP": + return FSMStateName.STOP + if flag.fsm_state_command == "gotoZERO": + return FSMStateName.ZERO + if flag.fsm_state_command == "gotoWALKAMP": + return FSMStateName.WALKAMP + if flag.fsm_state_command == "gotoMYPOLICY": + return FSMStateName.MYPOLICY + if flag.fsm_state_command == "gotoXSIMRUN": + return FSMStateName.XSIMRUN + return None + + @staticmethod + def euler_to_quaternion_scipy(roll, pitch, yaw, degrees=False): + r = Rotation.from_euler("xyz", [roll, pitch, yaw], degrees=degrees) + q_xyzw = r.as_quat() + return np.array([q_xyzw[3], q_xyzw[0], q_xyzw[1], q_xyzw[2]], dtype=np.float32) + + @staticmethod + def quat_rotate_inverse_numpy(q_xyzw, v): + q_w = q_xyzw[3] + q_v = q_xyzw[:3] + a = v * (2.0 * q_w * q_w - 1.0) + b = np.cross(q_v, v) * (2.0 * q_w) + c = q_v * (2.0 * np.dot(q_v, v)) + return a - b + c diff --git a/Deploy_Tienkung/udp_loopback/README.md b/Deploy_Tienkung/udp_loopback/README.md index ec577d4..d6f5c39 100644 --- a/Deploy_Tienkung/udp_loopback/README.md +++ b/Deploy_Tienkung/udp_loopback/README.md @@ -5,6 +5,7 @@ 包含: - `udp_keyboard_sender.py`:从终端读取按键,编码 UDP 报文并发送 +- `udp_xbox_sender.py`:订阅 `/xbox_data`,把 Xbox 摇杆/按键转成 UDP 报文 - `udp_loopback_node.py`:接收 UDP 报文,解码事件并计算目标值 - `protocol.py`:自定义协议和状态结构 - `config/udp_loopback.yaml`:本地测试配置 @@ -40,6 +41,56 @@ cd /home/meiqi/tienkung/Deploy_Tienkung python3 udp_loopback/udp_keyboard_sender.py ``` +如果改成 Xbox 经 UDP 转发,则启动方式是: + +1. 把 [dex_config.yaml](/home/meiqi/tienkung/Deploy_Tienkung/config/dex_config.yaml) 里的 `control_tool` 改成 `udp_loopback` +2. 启动控制节点: + +```bash +cd /home/meiqi/tienkung/Deploy_Tienkung +source /opt/ros/jazzy/setup.bash +export ROS_DOMAIN_ID=10 +python3 rl_control_node_sim.py +``` + +3. 启动 MuJoCo: + +```bash +cd /home/meiqi/tienkung/xSIM_MUJOCO +source /opt/ros/jazzy/setup.bash +export ROS_DOMAIN_ID=10 +python3 scripts/simulator_view_asyn.py -m evt2 +``` + +4. 启动手柄节点: + +```bash +source /opt/ros/jazzy/setup.bash +export ROS_DOMAIN_ID=10 +ros2 run joy joy_node --ros-args -r joy:=/xbox_data +``` + +5. 启动 UDP Xbox 转发: + +```bash +cd /home/meiqi/tienkung/Deploy_Tienkung +source /opt/ros/jazzy/setup.bash +export ROS_DOMAIN_ID=10 +python3 udp_loopback/udp_xbox_sender.py +``` + +默认按键映射: + +- `A -> mode_stride -> gotoWALKAMP` +- `X -> pose_home -> gotoZERO` +- `Y -> pose_hold -> gotoSTOP` +- `B -> mode_dash -> gotoMYPOLICY` +- `START -> trim_reset` +- 左摇杆 Y -> 连续前后速度 +- 左摇杆 X -> 连续横移速度 +- 右摇杆 X -> 连续转向速度 +- 十字键左右 -> 高度增减 + 此时 UDP 接收结果会在接收侧被映射回现有 FSM 命令: - `pose_home -> gotoZERO` @@ -60,6 +111,10 @@ python3 udp_loopback/udp_keyboard_sender.py - `sway_right` - `spin_left` - `spin_right` +- `set_surge` +- `set_sway` +- `set_spin` +- `set_lift` - `lift_up` - `lift_down` - `trim_reset` diff --git a/Deploy_Tienkung/udp_loopback/config/udp_loopback.yaml b/Deploy_Tienkung/udp_loopback/config/udp_loopback.yaml index 02d13f8..af8282a 100644 --- a/Deploy_Tienkung/udp_loopback/config/udp_loopback.yaml +++ b/Deploy_Tienkung/udp_loopback/config/udp_loopback.yaml @@ -5,6 +5,14 @@ sender: target_port: 31000 source_tag: local_keys +xbox_sender: + joy_topic: /xbox_data + source_tag: xbox_udp + deadzone: 0.10 + analog_epsilon: 0.01 + dpad_threshold: 0.50 + trigger_pressed_threshold: -0.50 + receiver: listen_host: 127.0.0.1 listen_port: 31000 diff --git a/Deploy_Tienkung/udp_loopback/protocol.py b/Deploy_Tienkung/udp_loopback/protocol.py index b961f0c..e8bfdbb 100644 --- a/Deploy_Tienkung/udp_loopback/protocol.py +++ b/Deploy_Tienkung/udp_loopback/protocol.py @@ -10,7 +10,7 @@ from typing import Any, Dict @dataclass class InputEnvelope: - """Small UDP payload carrying one encoded keyboard event.""" + """Small UDP payload carrying one encoded input event.""" seq_id: int event_code: str diff --git a/Deploy_Tienkung/udp_loopback/udp_fsm_controller.py b/Deploy_Tienkung/udp_loopback/udp_fsm_controller.py index 489df6c..cbd0079 100644 --- a/Deploy_Tienkung/udp_loopback/udp_fsm_controller.py +++ b/Deploy_Tienkung/udp_loopback/udp_fsm_controller.py @@ -177,6 +177,22 @@ class UDPFSMController: self.motion_frame.spin_goal = min( self.max_spin, self.motion_frame.spin_goal + self.spin_step ) + elif event_code == "set_surge": + self.motion_frame.surge_goal = max( + -self.max_surge, min(self.max_surge, packet.drive_value) + ) + elif event_code == "set_sway": + self.motion_frame.sway_goal = max( + -self.max_sway, min(self.max_sway, packet.drive_value) + ) + elif event_code == "set_spin": + self.motion_frame.spin_goal = max( + -self.max_spin, min(self.max_spin, packet.drive_value) + ) + elif event_code == "set_lift": + self.motion_frame.lift_goal = max( + self.min_lift, min(self.max_lift, packet.drive_value) + ) elif event_code == "lift_up": self.motion_frame.lift_goal = min( self.max_lift, self.motion_frame.lift_goal + self.lift_step diff --git a/Deploy_Tienkung/udp_loopback/udp_xbox_sender.py b/Deploy_Tienkung/udp_loopback/udp_xbox_sender.py new file mode 100644 index 0000000..e712e26 --- /dev/null +++ b/Deploy_Tienkung/udp_loopback/udp_xbox_sender.py @@ -0,0 +1,253 @@ +"""ROS2 Joy -> localhost UDP bridge for Xbox control.""" + +from __future__ import annotations + +import socket +from pathlib import Path +from typing import Dict + +import rclpy +from rclpy.node import Node +from rclpy.qos import HistoryPolicy, QoSProfile, ReliabilityPolicy +from sensor_msgs.msg import Joy +import yaml + +try: + from .protocol import InputEnvelope +except ImportError: # pragma: no cover - direct script execution fallback + from protocol import InputEnvelope + + +class UDPXboxSender(Node): + """Subscribe to Joy messages and forward them as UDP loopback events.""" + + def __init__(self) -> None: + super().__init__("udp_xbox_sender") + self.config: Dict[str, object] = {} + self.seq_id = 0 + self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + self.last_buttons: Dict[str, int] = {} + self.last_dpad_h = 0.0 + + self._load_config() + + qos_profile = QoSProfile( + reliability=ReliabilityPolicy.RELIABLE, + history=HistoryPolicy.KEEP_LAST, + depth=10, + ) + self.subscription = self.create_subscription( + Joy, self.joy_topic, self._joy_callback, qos_profile + ) + + self.get_logger().info( + f"Forwarding {self.joy_topic} -> udp://{self.target_host}:{self.target_port}" + ) + self.get_logger().info( + "Buttons: A=WALKAMP X=ZERO Y=STOP B=MYPOLICY START=reset" + ) + + def destroy_node(self) -> bool: + if self.socket is not None: + self.socket.close() + return super().destroy_node() + + def _load_config(self) -> None: + udp_config_path = Path(__file__).resolve().parent / "config" / "udp_loopback.yaml" + main_config_path = Path(__file__).resolve().parents[1] / "config" / "dex_config.yaml" + + with udp_config_path.open("r", encoding="utf-8") as file: + udp_config = yaml.safe_load(file) or {} + with main_config_path.open("r", encoding="utf-8") as file: + main_config = yaml.safe_load(file) or {} + + sender_cfg = udp_config.get("sender", {}) + xbox_sender_cfg = udp_config.get("xbox_sender", {}) + xbox_cfg = main_config.get("xbox", {}) + + self.target_host = sender_cfg.get("target_host", "127.0.0.1") + self.target_port = int(sender_cfg.get("target_port", 31000)) + self.source_tag = xbox_sender_cfg.get("source_tag", "xbox_udp") + self.joy_topic = xbox_sender_cfg.get("joy_topic", "/xbox_data") + + self.deadzone = float(xbox_sender_cfg.get("deadzone", 0.10)) + self.analog_epsilon = float(xbox_sender_cfg.get("analog_epsilon", 0.01)) + self.dpad_threshold = float(xbox_sender_cfg.get("dpad_threshold", 0.50)) + self.trigger_pressed_threshold = float( + xbox_sender_cfg.get("trigger_pressed_threshold", -0.50) + ) + + self.forward_command_offset = float( + xbox_cfg.get("forward_command_offset", 0.0) + ) + self.lateral_command_offset = float( + xbox_cfg.get("lateral_command_offset", 0.0) + ) + self.rotation_command_offset = float( + xbox_cfg.get("rotation_command_offset", 0.0) + ) + + self.button_map = { + "a": 0, + "b": 1, + "x": 2, + "y": 3, + "lb": 4, + "rb": 5, + "select": 6, + "start": 7, + "home": 8, + } + self.axis_map = { + "lx": 0, + "ly": 1, + "l_trigger": 2, + "rx": 3, + "ry": 4, + "r_trigger": 5, + "dpad_h": 6, + "dpad_v": 7, + } + + self._merge_mapping(self.button_map, xbox_cfg.get("button_map")) + self._merge_mapping(self.axis_map, xbox_cfg.get("axis_map")) + self._merge_mapping(self.button_map, xbox_sender_cfg.get("button_map")) + self._merge_mapping(self.axis_map, xbox_sender_cfg.get("axis_map")) + + def _merge_mapping(self, target: Dict[str, int], override: object) -> None: + if not isinstance(override, dict): + return + for name, index in override.items(): + if name in target: + try: + target[name] = int(index) + except (TypeError, ValueError): + pass + + def _joy_callback(self, msg: Joy) -> None: + axes = list(msg.axes) + [0.0] * 16 + buttons = list(msg.buttons) + [0] * 32 + + state = { + "a": self._button_value(buttons, "a"), + "b": self._button_value(buttons, "b"), + "x": self._button_value(buttons, "x"), + "y": self._button_value(buttons, "y"), + "start": self._button_value(buttons, "start"), + "home": self._button_value(buttons, "home"), + "lx": self._axis_value(axes, "lx"), + "ly": self._axis_value(axes, "ly"), + "rx": self._axis_value(axes, "rx"), + "l_trigger": self._axis_value(axes, "l_trigger"), + "dpad_h": self._axis_value(axes, "dpad_h"), + } + + self._send_mode_events(state) + self._send_trim_event(state) + self._send_lift_events(state) + self._send_analog_events(state) + + self.last_buttons = { + name: int(state[name]) for name in ("a", "b", "x", "y", "start", "home") + } + self.last_dpad_h = float(state["dpad_h"]) + + def _button_value(self, buttons: list[int], name: str) -> int: + index = self.button_map[name] + return int(buttons[index]) if index < len(buttons) else 0 + + def _axis_value(self, axes: list[float], name: str) -> float: + index = self.axis_map[name] + return float(axes[index]) if index < len(axes) else 0.0 + + def _send_mode_events(self, state: Dict[str, float]) -> None: + if self._rising_edge(state, "y"): + self._send_event("pose_hold", "y") + elif self._rising_edge(state, "x"): + self._send_event("pose_home", "x") + elif self._rising_edge(state, "a"): + self._send_event("mode_stride", "a") + elif self._rising_edge(state, "b"): + self._send_event("mode_dash", "b") + elif ( + self._rising_edge(state, "home") + and state["l_trigger"] < self.trigger_pressed_threshold + ): + self._send_event("mode_xrun", "home") + + def _send_trim_event(self, state: Dict[str, float]) -> None: + if self._rising_edge(state, "start"): + self._send_event("trim_reset", "start") + + def _send_lift_events(self, state: Dict[str, float]) -> None: + dpad_h = float(state["dpad_h"]) + if dpad_h <= -self.dpad_threshold and self.last_dpad_h > -self.dpad_threshold: + self._send_event("lift_up", "dpad_left") + elif dpad_h >= self.dpad_threshold and self.last_dpad_h < self.dpad_threshold: + self._send_event("lift_down", "dpad_right") + + def _send_analog_events(self, state: Dict[str, float]) -> None: + surge = self._compute_surge(state["ly"]) + sway = self._cleanup_command( + self._apply_deadzone(state["lx"]) * -0.4 + self.lateral_command_offset + ) + spin = self._cleanup_command( + self._apply_deadzone(state["rx"]) * -0.4 + self.rotation_command_offset + ) + + self._send_event("set_surge", "left_stick_y", surge) + self._send_event("set_sway", "left_stick_x", sway) + self._send_event("set_spin", "right_stick_x", spin) + + def _compute_surge(self, ly: float) -> float: + ly = self._apply_deadzone(ly) + if ly >= 0.0: + value = ly * 0.8 + self.forward_command_offset + else: + value = ly * 0.5 + return self._cleanup_command(value) + + def _apply_deadzone(self, value: float) -> float: + if abs(value) < self.deadzone: + return 0.0 + return float(value) + + def _cleanup_command(self, value: float) -> float: + if abs(value) < self.analog_epsilon: + return 0.0 + return float(value) + + def _rising_edge(self, state: Dict[str, float], name: str) -> bool: + previous = int(self.last_buttons.get(name, 0)) + return int(state[name]) == 1 and previous == 0 + + def _send_event( + self, event_code: str, key_name: str, drive_value: float = 1.0 + ) -> None: + envelope = InputEnvelope( + seq_id=self.seq_id, + event_code=event_code, + key_name=key_name, + drive_value=drive_value, + source_tag=self.source_tag, + ) + self.seq_id += 1 + self.socket.sendto( + envelope.encode(), (self.target_host, self.target_port) + ) + + +def main(args: list[str] | None = None) -> None: + rclpy.init(args=args) + node = UDPXboxSender() + try: + rclpy.spin(node) + except KeyboardInterrupt: + pass + finally: + node.destroy_node() + rclpy.shutdown() + + +if __name__ == "__main__": + main()