First Commit

This commit is contained in:
meiqi
2026-03-27 16:10:51 +08:00
commit c45245038f
103 changed files with 10994 additions and 0 deletions

View File

@@ -0,0 +1,178 @@
import numpy as np
from dataclasses import dataclass, field
def rot_x(x: float) -> np.ndarray:
c, s = np.cos(x), np.sin(x)
return np.array([
[1.0, 0.0, 0.0],
[0.0, c, -s ],
[0.0, s, c ],
], dtype=float)
def rot_y(y: float) -> np.ndarray:
c, s = np.cos(y), np.sin(y)
return np.array([
[ c, 0.0, s],
[0.0, 1.0, 0.0],
[-s, 0.0, c],
], dtype=float)
def rot_z(z: float) -> np.ndarray:
c, s = np.cos(z), np.sin(z)
return np.array([
[c, -s, 0.0],
[s, c, 0.0],
[0.0, 0.0, 1.0],
], dtype=float)
def euler_xyz_to_matrix(euler_a: np.ndarray) -> np.ndarray:
# 对应 C++: RotX * RotY * RotZ
return rot_x(euler_a[0]) @ rot_y(euler_a[1]) @ rot_z(euler_a[2])
def clip_vector(v: np.ndarray, lb: float, ub: float) -> np.ndarray:
# 返回裁剪后的新向量(如果你希望原地修改,可用 out=v
return np.clip(v, lb, ub)
def clip_scalar(a: float, lb: float, ub: float) -> float:
return float(min(max(a, lb), ub))
def gait_phase(timer: float,
gait_cycle: float,
left_theta_offset: float,
right_theta_offset: float,
left_phase_ratio: float,
right_phase_ratio: float) -> np.ndarray:
res = np.zeros(6, dtype=float)
left_phase = (timer / gait_cycle + left_theta_offset) - np.floor(timer / gait_cycle + left_theta_offset)
right_phase = (timer / gait_cycle + right_theta_offset) - np.floor(timer / gait_cycle + right_theta_offset)
res[0] = np.sin(2.0 * np.pi * left_phase)
res[1] = np.sin(2.0 * np.pi * right_phase)
res[2] = np.cos(2.0 * np.pi * left_phase)
res[3] = np.cos(2.0 * np.pi * right_phase)
res[4] = left_phase_ratio
res[5] = right_phase_ratio
return res
def fifth_poly(p0: np.ndarray, p0_dot: np.ndarray, p0_dotdot: np.ndarray,
p1: np.ndarray, p1_dot: np.ndarray, p1_dotdot: np.ndarray,
total_time: float, current_time: float):
"""
返回: pd, pd_dot, pd_dotdot
"""
p0 = np.asarray(p0, dtype=float)
p0_dot = np.asarray(p0_dot, dtype=float)
p0_dotdot = np.asarray(p0_dotdot, dtype=float)
p1 = np.asarray(p1, dtype=float)
p1_dot = np.asarray(p1_dot, dtype=float)
p1_dotdot = np.asarray(p1_dotdot, dtype=float)
n = p0.shape[0]
pd = np.zeros(n, dtype=float)
pd_dot = np.zeros(n, dtype=float)
pd_dotdot = np.zeros(n, dtype=float)
t = current_time
time = total_time
if t < total_time:
A = np.array([
[1.0, 0.0, 0.0, 0.0, 0.0, 0.0],
[0.0, 1.0, 0.0, 0.0, 0.0, 0.0],
[0.0, 0.0, 1.0 / 2.0, 0.0, 0.0, 0.0],
[-10.0 / time**3, -6.0 / time**2, -3.0 / (2.0 * time), 10.0 / time**3, -4.0 / time**2, 1.0 / (2.0 * time)],
[15.0 / time**4, 8.0 / time**3, 3.0 / (2.0 * time**2), -15.0 / time**4, 7.0 / time**3, -1.0 / time**2],
[-6.0 / time**5, -3.0 / time**4, -1.0 / (2.0 * time**3), 6.0 / time**5, -3.0 / time**4, 1.0 / (2.0 * time**3)],
], dtype=float)
for i in range(n):
x0 = np.array([
p0[i], p0_dot[i], p0_dotdot[i],
p1[i], p1_dot[i], p1_dotdot[i]
], dtype=float)
a = A @ x0
pd[i] = a[0] + a[1] * t + a[2] * t**2 + a[3] * t**3 + a[4] * t**4 + a[5] * t**5
pd_dot[i] = a[1] + 2.0 * a[2] * t + 3.0 * a[3] * t**2 + 4.0 * a[4] * t**3 + 5.0 * a[5] * t**4
pd_dotdot[i] = 2.0 * a[2] + 6.0 * a[3] * t + 12.0 * a[4] * t**2 + 20.0 * a[5] * t**3
else:
pd = p1.copy()
pd_dot = p1_dot.copy()
pd_dotdot = p1_dotdot.copy()
return pd, pd_dot, pd_dotdot
@dataclass
class LowPassFilter:
cut_off_freq: float
damp_ratio: float
d_time: float
n_filter: int
dT: float = field(init=False)
sigIn_1: np.ndarray = field(init=False)
sigIn_2: np.ndarray = field(init=False)
sigOut_1: np.ndarray = field(init=False)
sigOut_2: np.ndarray = field(init=False)
a2: float = field(init=False)
a1: float = field(init=False)
a0: float = field(init=False)
b2: float = field(init=False)
b1: float = field(init=False)
b0: float = field(init=False)
def __post_init__(self):
self.dT = self.d_time
self.sigIn_1 = np.zeros(self.n_filter, dtype=float)
self.sigIn_2 = np.zeros(self.n_filter, dtype=float)
self.sigOut_1 = np.zeros(self.n_filter, dtype=float)
self.sigOut_2 = np.zeros(self.n_filter, dtype=float)
freq_in_rad = 2.0 * np.pi * self.cut_off_freq
c = 2.0 / self.dT
sqr_c = c * c
sqr_w = freq_in_rad * freq_in_rad
self.b2 = sqr_c + 2.0 * self.damp_ratio * freq_in_rad * c + sqr_w
self.b1 = -2.0 * (sqr_c - sqr_w)
self.b0 = sqr_c - 2.0 * self.damp_ratio * freq_in_rad * c + sqr_w
self.a2 = sqr_w
self.a1 = 2.0 * sqr_w
self.a0 = sqr_w
self.a2 /= self.b2
self.a1 /= self.b2
self.a0 /= self.b2
self.b1 /= self.b2
self.b0 /= self.b2
self.b2 = 1.0
def m_filter(self, sig_in: np.ndarray) -> np.ndarray:
sig_in = np.asarray(sig_in, dtype=float)
sig_out = (
self.a2 * sig_in
+ self.a1 * self.sigIn_1
+ self.a0 * self.sigIn_2
- self.b1 * self.sigOut_1
- self.b0 * self.sigOut_2
)
self.sigIn_2 = self.sigIn_1
self.sigIn_1 = sig_in
self.sigOut_2 = self.sigOut_1
self.sigOut_1 = sig_out
return sig_out

View File

View File

@@ -0,0 +1,74 @@
"""
Body ID Mapping
Python equivalent of the C++ bodyIdMap functionality
"""
from typing import Dict, List
class BodyServoIdMap:
"""身体伺服电机ID映射"""
def __init__(self):
self.id_to_index_map: Dict[int, int] = {}
self.index_to_id_map: Dict[int, int] = {}
self.name_to_index_map: Dict[str, int] = {}
self.index_to_name_map: Dict[int, str] = {}
self.leg_motor_nums = 0
self.waist_motor_nums = 0
self.arm_motor_nums = 0
self.whole_motor_nums = 0
def body_can_id_map_init(self):
"""初始化身体CAN ID映射"""
# 腿部关节映射 (0-11)
leg_ids = [51, 52, 53, 54, 55, 56, # 左腿
61, 62, 63, 64, 65, 66] # 右腿
leg_names = [
"l_hip_pitch", "l_hip_roll", "l_hip_yaw", "l_knee", "l_ankle_pitch", "l_ankle_roll",
"r_hip_pitch", "r_hip_roll", "r_hip_yaw", "r_knee", "r_ankle_pitch", "r_ankle_roll"
]
self.leg_motor_nums = len(leg_ids)
# 腰部关节映射 (12-14)
waist_ids = [33, 32, 31]
waist_names = ["waist_yaw", "waist_roll", "waist_pitch" ]
self.waist_motor_nums = len(waist_ids)
# 手臂关节映射 (15-28)
arm_ids = [11, 12, 13, 14, 15, 16, 17, # 左臂
21, 22, 23, 24, 25, 26, 27] # 右臂
arm_names = [
"l_shoulder_pitch", "l_shoulder_roll", "l_shoulder_yaw", "l_elbow",
"l_wrist_yaw", "l_wrist_pitch", "l_wrist_roll",
"r_shoulder_pitch", "r_shoulder_roll", "r_shoulder_yaw", "r_elbow",
"r_wrist_yaw", "r_wrist_pitch", "r_wrist_roll"
]
self.arm_motor_nums = len(arm_ids)
# 合并所有映射
all_ids = leg_ids + waist_ids + arm_ids
all_names = leg_names + waist_names + arm_names
self.whole_motor_nums = len(all_ids)
# 创建双向映射
for index, (can_id, name) in enumerate(zip(all_ids, all_names)):
self.id_to_index_map[can_id] = index
self.index_to_id_map[index] = can_id
self.name_to_index_map[name] = index
self.index_to_name_map[index] = name
def get_index_by_id(self, can_id: int) -> int:
"""根据CAN ID获取索引"""
return self.id_to_index_map.get(can_id, -1)
def get_id_by_index(self, index: int) -> int:
"""根据索引获取CAN ID"""
return self.index_to_id_map.get(index, -1)
def get_index_by_name(self, name: str) -> int:
"""根据关节名称获取索引"""
return self.name_to_index_map.get(name, -1)
def get_name_by_index(self, index: int) -> str:
"""根据索引获取关节名称"""
return self.index_to_name_map.get(index, "")

View File

@@ -0,0 +1,236 @@
"""
Joystick Control Module
Python equivalent of the C++ Joystick functionality for ROS Joy messages
"""
import os
import yaml
import threading
from dataclasses import dataclass
from typing import Optional
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Joy
import numpy as np
@dataclass
class ControlFlag:
"""手柄控制标志"""
fsm_state_command: str = "gotoZERO"
# 禁用、启用手柄控制标志
enable: bool = True
@dataclass
class YUNZHUOMap:
"""云卓T12手柄按键映射 (对应ROS Joy消息)"""
a: float = 0.0 # axes[8] #a,b,c,d手柄轴初始值为-1
b: float = 0.0 # axes[9]
c: float = 0.0 # axes[10]
d: float = 0.0 # axes[11]
e: float = 0.0 # axes[4] e,f,g,h手柄轴初始值为0.0
f: float = 0.0 # axes[7]
g: float = 0.0 # axes[5]
h: float = 0.0 # axes[6]
x1: float = 0.0 # axes[3]
x2: float = 0.0 # axes[0]
y1: float = 0.0 # axes[2]
y2: float = 0.0 # axes[1]
class YUNZHUOFlag(ControlFlag): # 继承ControlFlag
def __init__(self):
super().__init__() # 调用父类初始化
# walk command
self.x_speed_command: float = 0.0
self.y_speed_command: float = 0.0
self.yaw_speed_command: float = 0.0
self.walk_height_command: float = 0.0
# floating base command
self.waist_roll_command: float = 0.0
self.waist_pitch_command: float = 0.0
self.waist_yaw_command: float = 0.0
self.waist_height_command: float = 0.0
class JoystickHumanoid:
"""人形机器人手柄控制器 (ROS Joy版本)"""
def __init__(self):
print("Joystick Start")
# 初始化成员变量
self.joy_map = YUNZHUOMap()
self.joy_flag = YUNZHUOFlag()
self.data_mutex = threading.Lock()
# 配置参数
self.initial_height = 0.0
self.current_height = 0.0
self.max_height = 0.0
self.min_height = 0.0
self.x_command_offset = 0.0
self.y_command_offset = 0.0
self.yaw_command_offset = 0.0
self.max_x_plus_speed = 0.0
self.max_x_minus_speed = 0.0
self.max_y_speed = 0.0
self.max_yaw_speed = 0.0
# 高度平滑控制
self.target_height = 0.0
# 加载配置文件
self._load_config()
def _load_config(self):
"""加载YAML配置文件"""
config_path = os.path.join('.', "config", "dex_config.yaml")
with open(config_path, 'r') as file:
config = yaml.safe_load(file)
if not config:
print("[Joystick_humanoid] Failed to load config file")
return
joystick_cfg = config.get("joystick", {})
# 加载配置参数
self.initial_height = joystick_cfg.get("initial_height")
self.x_command_offset = joystick_cfg.get("x_command_offset")
self.y_command_offset = joystick_cfg.get("y_command_offset")
self.yaw_command_offset = joystick_cfg.get("yaw_command_offset")
self.max_x_plus_speed = joystick_cfg.get("max_x_plus_speed")
self.max_x_minus_speed = joystick_cfg.get("max_x_minus_speed")
self.max_y_speed = joystick_cfg.get("max_y_speed")
self.max_yaw_speed = joystick_cfg.get("max_yaw_speed")
self.max_height = joystick_cfg.get("max_height")
self.min_height = joystick_cfg.get("min_height")
print(f"Loaded initial_height: {self.initial_height}, "
f"x_command_offset: {self.x_command_offset}, "
f"y_command_offset: {self.y_command_offset}, "
f"yaw_command_offset: {self.yaw_command_offset},"
f"max_x_plus_speed: {self.max_x_plus_speed}, "
f"max_x_minus_speed: {self.max_x_minus_speed}, "
f"max_y_speed: {self.max_y_speed}, "
f"max_yaw_speed: {self.max_yaw_speed}, "
f"max_height: {self.max_height},"
f"min_height: {self.min_height}")
self.current_height = self.initial_height
self.target_height = self.initial_height
self.joy_flag.waist_height_command = self.current_height
self.joy_flag.walk_height_command = self.current_height
def joy_map_read(self, msg: Joy):
"""处理ROS Joy消息更新手柄映射"""
with self.data_mutex:
if len(msg.axes) >= 12: # 确保有足够的轴数据
yunzhuo_map = YUNZHUOMap(
a=msg.axes[8] if len(msg.axes) > 8 else 0.0,
b=msg.axes[9] if len(msg.axes) > 9 else 0.0,
c=msg.axes[10] if len(msg.axes) > 10 else 0.0,
d=msg.axes[11] if len(msg.axes) > 11 else 0.0,
e=msg.axes[4] if len(msg.axes) > 4 else 0.0,
f=msg.axes[7] if len(msg.axes) > 7 else 0.0,
g=msg.axes[5] if len(msg.axes) > 5 else 0.0,
h=msg.axes[6] if len(msg.axes) > 6 else 0.0,
x1=msg.axes[3] if len(msg.axes) > 3 else 0.0,
x2=msg.axes[0] if len(msg.axes) > 1 else 0.0,
y1=msg.axes[2] if len(msg.axes) > 2 else 0.0,
y2=msg.axes[1] if len(msg.axes) > 0 else 0.0)
self.joy_map = yunzhuo_map
def joy_flag_update(self):
"""根据手柄输入更新控制标志"""
with self.data_mutex:
# 更新手柄启动标志
if self.joy_map.f == -1.0:
self.joy_flag.enable = False
else:
self.joy_flag.enable = True
# FSM状态切换命令
if self.joy_map.c == 1.0:
self.joy_flag.fsm_state_command = "gotoSTOP"
else:
button_pressed_nums = self.check_button_pressed_nums(
self.joy_map)
if button_pressed_nums == 0:
if self.joy_map.d == 1.0:
self.joy_flag.fsm_state_command = "gotoZERO"
elif self.joy_map.a == 1.0:
self.joy_flag.fsm_state_command = "gotoWALKAMP"
# 获取walk速度命令
self.get_x_y_yaw_speed_command()
# 获取高度命令
self.get_walk_height_command()
if button_pressed_nums == 1:
if self.joy_map.e == -1.0:
#e上拨
if self.joy_map.a == 1.0:
self.joy_flag.fsm_state_command = "gotoBEYONDMIMIC"
elif self.joy_map.d == 1.0:
self.joy_flag.fsm_state_command = "gotoBEYONDZERO"
def get_joy_flag(self) -> ControlFlag:
"""获取当前手柄标志"""
with self.data_mutex:
return self.joy_flag
def init(self) -> int:
"""初始化手柄控制器"""
print("Joystick controller initialized")
return 0
def check_button_pressed_nums(self, joy_map: YUNZHUOMap) -> int:
"""检查按下的按钮数量"""
count = 0
if joy_map.e != 0.0:
count += 1
if joy_map.f != 0.0:
count += 1
if joy_map.g != 0.0:
count += 1
if joy_map.h != 0.0:
count += 1
return count
def get_x_y_yaw_speed_command(self):
"""获取当前速度命令"""
# 速度命令计算
self.joy_flag.y_speed_command = (self.joy_map.x1 * -self.max_y_speed +
self.y_command_offset)
# X速度 (前进/后退)
if self.joy_map.y1 >= 0:
self.joy_flag.x_speed_command = (
self.joy_map.y1 * self.max_x_plus_speed + self.x_command_offset
) # 前进快一点
else:
self.joy_flag.x_speed_command = self.joy_map.y1 * self.max_x_minus_speed # 后退慢一点
# 偏航速度
self.joy_flag.yaw_speed_command = (
self.joy_map.x2 * -self.max_yaw_speed + self.yaw_command_offset)
def get_walk_height_command(self):
"""获取当前高度命令"""
current_height_command = self.joy_flag.walk_height_command
deadzone_height = 0.5
# 高度命令计算
if self.joy_map.x2 >= deadzone_height:
# x2 下拨
self.joy_flag.walk_height_command += -self.joy_map.x2 * (
self.joy_flag.walk_height_command - self.min_height)
if self.joy_map.x2 <= -deadzone_height:
# x2 上拨
self.joy_flag.walk_height_command += -self.joy_map.x2 * (
self.max_height - self.joy_flag.walk_height_command)
# 1s中高度变化3cm, step= 0.03 / 100 hz = 0.0003
step = 0.03 / 100
self.joy_flag.walk_height_command = np.clip(
self.joy_flag.walk_height_command, current_height_command - step,
current_height_command + step)

View File

@@ -0,0 +1,11 @@
import queue
class PeekableQueue(queue.Queue):
def peek(self):
"""取出再放回"""
item = self.get_nowait()
try:
self.put_nowait(item)
except queue.Full:
pass
return item

View File

@@ -0,0 +1,163 @@
"""
Robot Data Structure
Python equivalent of the C++ RobotData class
"""
import numpy as np
from scipy.spatial.transform import Rotation
from common.joystick import ControlFlag
import copy
class RobotData:
"""机器人状态数据结构"""
def __init__(self, motor_num: int = 29, whole_joint_num: int = 35):
self.motor_num = motor_num
self.whole_joint_num = whole_joint_num
# Joint states (actual)
self.q_a_ = np.zeros(whole_joint_num) # Joint positions
self.q_dot_a_ = np.zeros(whole_joint_num) # Joint velocities
self.tau_a_ = np.zeros(whole_joint_num) # Joint torques
self.temperature_a = np.zeros(motor_num) # Joint temperatures
self.q_a_last = np.zeros(whole_joint_num) # 上一时刻关节位置
self.qdot_a_last = np.zeros(whole_joint_num) # 上一时刻关节速度
self.tor_a_last = np.zeros(whole_joint_num) # 上一时刻关节力矩
# Joint commands (desired)
self.q_d_ = np.zeros(whole_joint_num) # Desired joint positions
self.q_d_s_ = np.zeros(whole_joint_num) # Desired serial joint positions
self.q_dot_d_ = np.zeros(whole_joint_num) # Desired joint velocities
self.tau_d_ = np.zeros(whole_joint_num) # Desired joint torques
# Control gains
self.joint_kp_p_ = np.zeros(motor_num) # Proportional gains
self.joint_kd_p_ = np.zeros(motor_num) # Derivative gains
# IMU data: [yaw, pitch, roll, gyro_x, gyro_y, gyro_z, acc_x, acc_y, acc_z]
self.imu_data_ = np.zeros(13)
self.imu_quat_ = np.array([1.0, 0.0, 0.0, 0.0], dtype=np.float32) # w, x, y, z
# Timing
self.time_now_ = 0.0
# Configuration
self.config_file_ = ""
# control cmd
# walk command
self.walk_cmd_ = np.zeros(3) # x_speed, y_speed, yaw_speed
# 控制符
self.control_flag = ControlFlag()
def copy_from(self, other: 'RobotData'):
"""从另一个RobotData对象复制数据"""
self.q_a_[:] = other.q_a_[:]
self.q_dot_a_[:] = other.q_dot_a_[:]
self.tau_a_[:] = other.tau_a_[:]
self.q_d_[:] = other.q_d_[:]
self.q_dot_d_[:] = other.q_dot_d_[:]
self.tau_d_[:] = other.tau_d_[:]
self.joint_kp_p_[:] = other.joint_kp_p_[:]
self.joint_kd_p_[:] = other.joint_kd_p_[:]
self.imu_data_[:] = other.imu_data_[:]
self.imu_quat_[:] = other.imu_quat_[:]
self.time_now_ = other.time_now_
self.config_file_ = other.config_file_
self.walk_cmd_[:] = other.walk_cmd_[:]
self.control_flag = copy.deepcopy(other.control_flag)
def get_joint_pos(self) -> np.ndarray:
joint_start_idx = 35 - self.motor_num
joint_pos = self.q_a_[joint_start_idx:].astype(np.float32)
return joint_pos
def get_serial_joint_pos_desired(self) -> np.ndarray:
joint_start_idx = 35 - self.motor_num
joint_pos_desired = self.q_d_s_[joint_start_idx:].astype(np.float32)
return joint_pos_desired
def get_joint_vel(self)-> np.ndarray:
joint_start_idx = 35 - self.motor_num
joint_vel = self.q_dot_a_[joint_start_idx:].astype(np.float32)
return joint_vel
def get_angular_velocity(self) -> np.ndarray:
omega_xyz = np.array([
self.imu_data_[3],
self.imu_data_[4],
self.imu_data_[5]
], dtype=np.float32)
return omega_xyz
def get_robot_quat(self):
if np.linalg.norm(self.imu_quat_) > 0.0:
return self.imu_quat_.astype(np.float32)
rpy = np.array([
self.imu_data_[2], # roll
self.imu_data_[1], # pitch
self.imu_data_[0] # yaw
], dtype=np.float32) * 1.0
robot_quat_wxyz = self.euler_to_quaternion_scipy(rpy[0], rpy[1], rpy[2])
return robot_quat_wxyz
def euler_to_quaternion_scipy(self, roll, pitch, yaw, degrees=False):
"""
使用SciPy进行欧拉角转四元数
参数:
roll: 绕x轴的旋转角度
pitch: 绕y轴的旋转角度
yaw: 绕z轴的旋转角度
degrees: 输入角度是否为度,默认为弧度
返回:
[w, x, y, z]: 四元数分量 (w为实部)
"""
# 创建旋转对象 (顺序: 'xyz' 对应 roll, pitch, yaw)
rotation = Rotation.from_euler('xyz', [roll, pitch, yaw], degrees=degrees)
# 转换为四元数 (顺序: [x, y, z, w])
quaternion = rotation.as_quat()
return [quaternion[3], quaternion[0], quaternion[1], quaternion[2]] # 返回 w, x, y, z
def get_waist_yrp(self) -> np.ndarray:
joint_pos = self.get_joint_pos()
waist_yaw, waist_roll, waist_pitch = joint_pos[12], joint_pos[13], joint_pos[14]
return np.array([waist_yaw, waist_roll, waist_pitch], dtype=np.float32)
def get_base_linear_acceleration(self) -> np.ndarray:
lin_acc = np.array([
self.imu_data_[6],
self.imu_data_[7],
self.imu_data_[8]
], dtype=np.float32)
return lin_acc
def get_project_gravity(self) -> np.ndarray:
"""根据机器人姿态重力投影(待完善)
Args:
None
"""
robot_quat_wxyz = self.get_robot_quat()
robot_quat_xyzw = np.array([robot_quat_wxyz[1], robot_quat_wxyz[2], robot_quat_wxyz[3], robot_quat_wxyz[0]])
g = np.array([0., 0., -1.])
projected_gravity = self.quat_rotate_inverse_numpy(robot_quat_xyzw, g)
return projected_gravity
def quat_rotate_inverse_numpy(self, q, v):
"""
q: [x, y, z, w], shape=(4)\\
v: [x, y, z], shape=(3)
"""
q_w = q[3]
q_vec = q[:3]
a = v * (2.0 * q_w ** 2 - 1.0)
b = np.cross(q_vec, v) * q_w * 2.0
c = q_vec * np.dot(q_vec, v) * 2.0
return a - b + c
def get_walk_cmd(self) -> np.ndarray:
"""获取行走命令: [x_speed, y_speed, yaw_speed]"""
return self.walk_cmd_

View File

@@ -0,0 +1,814 @@
"""
Robot Interface
Python equivalent of the C++ RobotInterface class
"""
from __future__ import annotations
import queue
from common.peekqueue import PeekableQueue
import yaml
import os
from abc import ABC, abstractmethod
from typing import Dict, Any
import numpy as np
# ROS messages
from bodyctrl_msgs.msg import MotorStatusMsg, CmdMotorCtrl, MotorCtrl, Imu, MotorStatus
import rclpy
from std_msgs.msg import String
from rclpy.node import Node
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy
import transforms3d as t3d
from .body_id_map import BodyServoIdMap
from .robot_data import RobotData
import functools
import time
import math
from sptlib_python import funcSPTrans as FuncSPTrans
from common.joystick import ControlFlag
from geometry_msgs.msg import TwistStamped
from std_msgs.msg import Float64
from FSM.fsm_base import FSMStateName
def timing_decorator(func):
"""
装饰器:记录函数执行时间
"""
@functools.wraps(func)
def wrapper(*args, **kwargs):
start_time = time.perf_counter()
result = func(*args, **kwargs)
end_time = time.perf_counter()
execution_time = end_time - start_time
# print(f"[TIMING] {func.__name__} executed in {execution_time:.6f} seconds")
return result
return wrapper
class RobotInterface(ABC):
"""机器人接口抽象基类"""
def __init__(self, robot_data: RobotData):
self.robot_data_ = robot_data
@abstractmethod
def init(self, node: Node):
"""初始化接口"""
pass
@abstractmethod
def update_robot_data(self, flag:ControlFlag, time_passed: float):
"""更新机器人状态"""
pass
@abstractmethod
def send_motor_commands(self, flag: ControlFlag):
"""发布电机控制命令"""
pass
class RobotInterfaceImpl(RobotInterface):
"""机器人接口具体实现"""
def __init__(self, robot_data: RobotData, config_path: str = ''):
super().__init__(robot_data)
self.initialized = False
self.node = None
self.config_path = config_path
# ID映射
self.id_map = BodyServoIdMap()
self.id_map.body_can_id_map_init()
# 消息队列
self.queue_leg_motor_state = PeekableQueue(maxsize=1)
self.queue_arm_motor_state = PeekableQueue(maxsize=1)
self.queue_waist_motor_state = PeekableQueue(maxsize=1)
self.queue_imu_xsens = PeekableQueue(maxsize=1)
self.queue_walk_cmd = PeekableQueue(maxsize=1)
# 关节维度
self.floating_base_dof = 6
self.whole_joint_nums = self.id_map.whole_motor_nums + self.floating_base_dof
# 临时变量用于优化计算
self.temp_q = np.empty(self.id_map.whole_motor_nums)
# 预分配另一个用于存储中间计算的临时数组
self._temp_zero_cnt = np.empty(self.id_map.whole_motor_nums)
# 电机控制参数
self.motor_dir = np.ones(self.id_map.whole_motor_nums)
self.zero_cnt = np.zeros(self.id_map.whole_motor_nums)
self.zero_offset = np.zeros(self.id_map.whole_motor_nums)
# 添加标志位,用于跟踪是否是首次接收数据
self.first_leg_data_received = False
self.first_arm_data_received = False
self.first_waist_data_received = False
# 关节限位
self.joint_limits = {}
self.joint_pos_threshold = math.pi
# 串并联转换器
self.fun_s2p = FuncSPTrans()
# 串并联转换相关变量
self.left_ankle_indices = np.array([4, 5]) + self.floating_base_dof
self.right_ankle_indices = np.array([10, 11]) + self.floating_base_dof
self.q_a_p = np.zeros(4) # 并联关节位置
self.qdot_a_p = np.zeros(4) # 并联关节速度
self.tor_a_p = np.zeros(4) # 并联关节力矩
self.ankle_kp_p = np.zeros(4) # 并联关节刚度
self.ankle_kd_p = np.zeros(4) # 并联关节阻尼
# TF相关属性
self.tf_buffer = None
self.tf_listener = None
# ROS publishers and subscribers
self.pub_leg_motor_cmd = None
self.pub_arm_motor_cmd = None
self.pub_waist_motor_cmd = None
self.pub_fsm_state = None
self.sub_leg_state = None
self.sub_arm_state = None
self.sub_waist_state = None
# 当前机器人所处状态
self.current_state: FSMStateName = FSMStateName.STOP
def update_param(self, current_state: FSMStateName = None):
"""更新机器人接口"""
if current_state is not None:
self.current_state = current_state
def load_config(self):
"""从配置文件加载关键参数"""
config_path = self.config_path
if not os.path.exists(config_path):
self.node.get_logger().error(
f"Joint limits config file not found: {config_path}")
raise FileNotFoundError(
f"Joint limits config file not found: {config_path}")
try:
with open(config_path, 'r') as f:
config = yaml.safe_load(f)
except Exception as e:
self.node.get_logger().error(
f"Failed to load joint limits config from {config_path}: {e}")
raise RuntimeError(
f"Failed to load joint limits config from {config_path}: {e}")
# 读取运行模式
self.sim = config.get('sim')
self.debug = config.get('debug')
# 机器人接口配置
interface_config = config.get('robot_interface')
# 是否限位
self.clip_actions = interface_config.get('clip_actions')
# 加载关节限位值
self._load_joint_limits(interface_config)
# 加载控制状态
self._load_control_status(interface_config)
# 零位
self.zero_pos = np.array(interface_config.get('zero_pos'))
# 电流转换比例
self.ct_scale = np.array(interface_config.get('ct_scale'))
# IMU 数据偏移
self.xsense_roll_offset = interface_config.get(
'xsense_data_roll_offset')
# 禁用电机
self.disable_joints_ = interface_config.get('disable_joints', False)
# 脚踝Kp,Kd
self.ankle_kp_p = np.array(interface_config.get('ankle_kp_p'))
self.ankle_kd_p = np.array(interface_config.get('ankle_kd_p'))
def _load_control_status(self, config: Dict[str, Any]):
# 字符串命令到枚举值的映射
state_to_FSMState = {
"STOP": FSMStateName.STOP,
"ZERO": FSMStateName.ZERO,
"WALKAMP": FSMStateName.WALKAMP,
"MYPOLICY": FSMStateName.MYPOLICY,
"XSIMRUN": FSMStateName.XSIMRUN,
}
self.waist_control_status = [state_to_FSMState[state] for state in config.get('waist_control_status')]
self.legs_control_status = [state_to_FSMState[state] for state in config.get('legs_control_status')]
self.arms_control_status = [state_to_FSMState[state] for state in config.get('arms_control_status')]
self.left_arm_only_status = [state_to_FSMState[state] for state in config.get('left_arm_only_status')]
self.right_arm_only_status = [state_to_FSMState[state] for state in config.get('right_arm_only_status')]
def _load_joint_limits(self, config: Dict[str, Any]):
"""从配置文件加载关节限位值"""
# 从配置中获取关节限位信息
joint_limits_config = config.get('joint_limits', None)
if joint_limits_config is None:
error_msg = "No joint_limits section in config"
self.node.get_logger().error(error_msg)
raise ValueError(error_msg)
else:
# 从配置中加载限位值
for joint_name, limits in joint_limits_config.items():
if 'min' in limits and 'max' in limits:
self.joint_limits[joint_name] = {
'min': float(limits['min']),
'max': float(limits['max'])
}
else:
error_msg = f"Invalid limits for joint {joint_name}"
self.node.get_logger().error(error_msg)
raise ValueError(error_msg)
self.node.get_logger().info(f"Loaded joint limits from {config}")
# 预计算ID到限位的映射
self.id_to_limits = {}
for joint_name, limits in self.joint_limits.items():
index = self.id_map.get_index_by_name(joint_name)
if index >= 0:
motor_id = self.id_map.get_id_by_index(index)
self.id_to_limits[motor_id] = limits
# 记录加载的限位值
for joint_name, limits in self.joint_limits.items():
self.node.get_logger().debug(
f"Joint {joint_name}: [{limits['min']}, {limits['max']}]")
print("-" * 30 + '关节限位值' + '-' * 30)
print(self.joint_limits)
def init(self, node: Node):
"""初始化接口"""
self.node = node
self.initialized = True
# 初始化ROS接口
self._init_ros_interfaces()
# 加载配置文件
self.load_config()
node.get_logger().info("Robot interface initialized")
def _init_ros_interfaces(self):
"""初始化ROS接口"""
qos_profile = QoSProfile(
# reliability=ReliabilityPolicy.RELIABLE,
# history=HistoryPolicy.KEEP_LAST,
depth=10)
# 发布者
self.pub_leg_motor_cmd = self.node.create_publisher(
CmdMotorCtrl, '/leg/cmd_ctrl', qos_profile)
self.pub_arm_motor_cmd = self.node.create_publisher(
CmdMotorCtrl, '/arm/cmd_ctrl', qos_profile)
self.pub_waist_motor_cmd = self.node.create_publisher(
CmdMotorCtrl, '/waist/cmd_ctrl', qos_profile)
self.pub_fsm_state = self.node.create_publisher(
String, '/control/fsm_state', qos_profile)
# 订阅者
self.sub_leg_state = self.node.create_subscription(
MotorStatusMsg, '/leg/status', self._leg_motor_status_callback,
qos_profile)
self.sub_arm_state = self.node.create_subscription(
MotorStatusMsg, '/arm/status', self._arm_motor_status_callback,
qos_profile)
self.sub_waist_state = self.node.create_subscription(
MotorStatusMsg, '/waist/status', self._waist_motor_status_callback,
qos_profile)
#(非电机相关)
self.sub_imu_xsens = self.node.create_subscription(
Imu, '/imu/status', self._imu_status_callback, qos_profile)
# @timing_decorator
def get_imu_data(self):
"""处理传感器数据(非电机)"""
# 处理IMU数据
while True:
try:
msg = self.queue_imu_xsens.peek()
self.robot_data_.imu_data_[0] = msg.euler.yaw
self.robot_data_.imu_data_[1] = msg.euler.pitch
self.robot_data_.imu_data_[2] = msg.euler.roll
self.robot_data_.imu_data_[3] = msg.angular_velocity.x
self.robot_data_.imu_data_[4] = msg.angular_velocity.y
self.robot_data_.imu_data_[5] = msg.angular_velocity.z
self.robot_data_.imu_data_[6] = msg.linear_acceleration.x
self.robot_data_.imu_data_[7] = msg.linear_acceleration.y
self.robot_data_.imu_data_[8] = msg.linear_acceleration.z
self.robot_data_.imu_quat_[0] = msg.orientation.w
self.robot_data_.imu_quat_[1] = msg.orientation.x
self.robot_data_.imu_quat_[2] = msg.orientation.y
self.robot_data_.imu_quat_[3] = msg.orientation.z
break
except queue.Empty:
time.sleep(0.0001)
def update_robot_state(self):
"""读取电机状态数据更新为机器人状态数据"""
# 处理腿部电机状态
while True:
try:
msg = self.queue_leg_motor_state.peek()
if self.debug:
current_time = self.node.get_clock().now().to_msg()
msg_time = msg.header.stamp
time_diff = (current_time.sec -
msg_time.sec) * 1000000000 + (
current_time.nanosec - msg_time.nanosec)
time_diff_ms = time_diff / 1000000.0
print(f"Time difference: {time_diff_ms} ms")
for status in msg.status:
self.motor_state_to_robot_state(
status, self.first_leg_data_received)
break
except queue.Empty:
time.sleep(0.0001)
# 处理手臂电机状态
while True:
try:
msg = self.queue_arm_motor_state.peek()
if self.debug:
current_time = self.node.get_clock().now().to_msg()
msg_time = msg.header.stamp
time_diff = (current_time.sec -
msg_time.sec) * 1000000000 + (
current_time.nanosec - msg_time.nanosec)
time_diff_ms = time_diff / 1000000.0
print(f"Time difference: {time_diff_ms} ms")
for status in msg.status:
self.motor_state_to_robot_state(
status, self.first_arm_data_received)
break
except queue.Empty:
time.sleep(0.0001)
# 处理腰部电机状态
while True:
try:
msg = self.queue_waist_motor_state.peek()
if self.debug:
current_time = self.node.get_clock().now().to_msg()
msg_time = msg.header.stamp
time_diff = (current_time.sec -
msg_time.sec) * 1000000000 + (
current_time.nanosec - msg_time.nanosec)
time_diff_ms = time_diff / 1000000.0
print(f"Time difference: {time_diff_ms} ms")
for status in msg.status:
self.motor_state_to_robot_state(
status, self.first_waist_data_received)
break
except queue.Empty:
time.sleep(0.0001)
def motor_state_to_robot_state(self, status, received_flag: bool):
index = self.id_map.get_index_by_id(status.name)
if index >= 0:
robotdata_index = index + self.floating_base_dof # 偏移到完整关节数组中
# 直接赋值
self.robot_data_.q_a_[robotdata_index] = status.pos
self.robot_data_.q_dot_a_[robotdata_index] = status.speed
self.robot_data_.tau_a_[
robotdata_index] = status.current * self.ct_scale[min(
index,
len(self.ct_scale) - 1)]
self.robot_data_.temperature_a[
robotdata_index - self.floating_base_dof] = status.temperature
self.robot_data_.q_a_[robotdata_index] = (
status.pos - self.zero_pos[index]
) * self.motor_dir[index] + self.zero_offset[index]
if self.robot_data_.q_a_[robotdata_index] > math.pi:
self.zero_cnt[index] = -1.0
elif self.robot_data_.q_a_[robotdata_index] < -math.pi:
self.zero_cnt[index] = 1.0
self.robot_data_.q_a_[
robotdata_index] += self.zero_cnt[index] * 2.0 * math.pi
self.robot_data_.q_dot_a_[robotdata_index] *= self.motor_dir[index]
self.robot_data_.tau_a_[robotdata_index] *= self.motor_dir[index]
if not received_flag or abs(
self.robot_data_.q_a_[robotdata_index] -
self.robot_data_.q_a_last[robotdata_index]
) > self.joint_pos_threshold:
if received_flag:
self.node.get_logger().warn(
f"Joint {index} error detected")
self.robot_data_.q_a_[
robotdata_index] = self.robot_data_.q_a_last[
robotdata_index]
self.robot_data_.q_dot_a_[
robotdata_index] = self.robot_data_.qdot_a_last[
robotdata_index]
self.robot_data_.tau_a_[
robotdata_index] = self.robot_data_.tor_a_last[
robotdata_index]
else:
# 首次接收数据,更新标志位
received_flag = True
self.robot_data_.q_a_last[robotdata_index] = self.robot_data_.q_a_[
robotdata_index]
self.robot_data_.qdot_a_last[
robotdata_index] = self.robot_data_.q_dot_a_[robotdata_index]
self.robot_data_.tor_a_last[
robotdata_index] = self.robot_data_.tau_a_[robotdata_index]
def update_sensor_states(self):
# 获取Imu数据
self.get_imu_data()
# 添加IMU偏移
self.robot_data_.imu_data_[2] += self.xsense_roll_offset
def update_robot_data(self, flag: ControlFlag, time_passed: float):
# 更新传感器数据
self.update_sensor_states()
# 更新电机状态数据
self.update_robot_state()
# 更新机器人控制命令
self.update_robot_cmd(flag)
# 脚踝并联转串联
self.backup_serial_cmd()
if not self.sim:
#真机
self.ankle_parallel_to_serial()
# 更新robot_data 时间戳
self.robot_data_.time_now_ = time_passed
def backup_serial_cmd(self):
self.robot_data_.q_d_s_ = self.robot_data_.q_d_.copy()
def _check_and_clip_joint_limits_fast(
self, cmd_name: int, position: float) -> tuple[bool, float]:
"""
快速检查并修正关节位置限位(避免重复查询)
"""
if not self.sim and self.clip_actions:
limit = self.id_to_limits[cmd_name]
clipped_pos = np.clip(position, limit["min"], limit["max"])
return clipped_pos == position, clipped_pos
else:
return True, position
@timing_decorator
def convert_to_motor_commands(self):
"""将机器人控制命令转换为电机控制命令"""
# 使用切片操作一次性复制数据,避免逐个元素赋值
q_d_reordered = self.robot_data_.q_d_[self.floating_base_dof:]
qdot_d_reordered = self.robot_data_.q_dot_d_[self.floating_base_dof:]
tor_d_reordered = self.robot_data_.tau_d_[self.floating_base_dof:]
# 计算 q_d_reordered - self.zero_offset
np.subtract(q_d_reordered, self.zero_offset, out=self.temp_q)
# 计算 self.zero_cnt * 2.0 * self.pi
np.multiply(self.zero_cnt, 2.0 * math.pi, out=self._temp_zero_cnt)
# 计算 q_d_reordered - self.zero_offset - self.zero_cnt * 2.0 * self.pi
np.subtract(self.temp_q, self._temp_zero_cnt, out=self.temp_q)
# 计算 (q_d_reordered - self.zero_offset - self.zero_cnt * 2.0 * self.pi) * self.motor_dir
np.multiply(self.temp_q, self.motor_dir, out=self.temp_q)
# 计算最终结果 (q_d_reordered - self.zero_offset - self.zero_cnt * 2.0 * self.pi) * self.motor_dir + self.zero_pos
np.add(self.temp_q,
self.zero_pos,
out=self.robot_data_.q_d_[self.floating_base_dof:])
# qdot_d和tor_d的计算也可以向量化
np.multiply(qdot_d_reordered,
self.motor_dir,
out=self.robot_data_.q_dot_d_[self.floating_base_dof:])
np.multiply(tor_d_reordered,
self.motor_dir,
out=self.robot_data_.tau_d_[self.floating_base_dof:])
# 如果关节被禁用
if self.disable_joints_:
self.robot_data_.joint_kp_p_.fill(0.0)
self.robot_data_.joint_kd_p_.fill(0.0)
self.robot_data_.tau_d_.fill(0.0)
self.node.get_logger().warn("Joints disabled!")
def publish_motor_commands(self, flag: ControlFlag):
# flag_fsm_command = flag.fsm_state_command
current_state = self.current_state
if self.pub_fsm_state is not None:
state_msg = String()
state_msg.data = current_state.name
self.pub_fsm_state.publish(state_msg)
# 发布腿部控制命令
if self.legs_control_status == [] or current_state in self.legs_control_status:
leg_msg = CmdMotorCtrl()
leg_msg.header.stamp = self.node.get_clock().now().to_msg()
for i in range(self.id_map.leg_motor_nums):
cmd = MotorCtrl()
cmd.name = self.id_map.get_id_by_index(i)
cmd.kp = float(self.robot_data_.joint_kp_p_[i])
cmd.kd = float(self.robot_data_.joint_kd_p_[i])
cmd.pos = float(self.robot_data_.q_d_[i +
self.floating_base_dof])
cmd.spd = float(
self.robot_data_.q_dot_d_[i + self.floating_base_dof])
cmd.tor = float(
self.robot_data_.tau_d_[i + self.floating_base_dof])
# 检查关节位置限位
within_limit, cmd.pos = self._check_and_clip_joint_limits_fast(
cmd.name, cmd.pos)
if not within_limit:
print(
f"Joint (id: {cmd.name}) position {cmd.pos} is out of limits"
)
leg_msg.cmds.append(cmd)
self.pub_leg_motor_cmd.publish(leg_msg)
# 只在特定模式下控制腰部
if current_state in self.waist_control_status:
# 腰部控制命令
waist_msg = CmdMotorCtrl()
waist_msg.header.stamp = self.node.get_clock().now().to_msg()
for i in range(self.id_map.waist_motor_nums):
cmd = MotorCtrl()
motor_idx = i + self.id_map.leg_motor_nums
cmd.name = self.id_map.get_id_by_index(
motor_idx) # 12 -> 33(yaw)
cmd.kp = float(self.robot_data_.joint_kp_p_[motor_idx])
cmd.kd = float(self.robot_data_.joint_kd_p_[motor_idx])
cmd.pos = float(self.robot_data_.q_d_[motor_idx +
self.floating_base_dof])
cmd.spd = float(
self.robot_data_.q_dot_d_[motor_idx +
self.floating_base_dof])
cmd.tor = float(
self.robot_data_.tau_d_[motor_idx +
self.floating_base_dof])
# 检查关节位置限位
within_limit, cmd.pos = self._check_and_clip_joint_limits_fast(
cmd.name, cmd.pos)
if not within_limit:
print(
f"Joint (id: {cmd.name}) position {cmd.pos} is out of limits"
)
waist_msg.cmds.append(cmd)
# print(f'waist_msg {waist_msg}')
self.pub_waist_motor_cmd.publish(waist_msg)
# 只在特定模式下控制手臂
if current_state in self.arms_control_status:
# 手臂控制命令
arm_msg = CmdMotorCtrl()
arm_msg.header.stamp = self.node.get_clock().now().to_msg()
if current_state in self.left_arm_only_status:
control_index = np.arange(0, 7)
elif current_state in self.right_arm_only_status:
control_index = np.arange(self.id_map.arm_motor_nums - 7, self.id_map.arm_motor_nums)
else:
control_index = np.arange(0, self.id_map.arm_motor_nums)
for i in control_index:
cmd = MotorCtrl()
motor_idx = i + self.id_map.leg_motor_nums + self.id_map.waist_motor_nums
cmd.name = self.id_map.get_id_by_index(motor_idx)
cmd.kp = float(self.robot_data_.joint_kp_p_[motor_idx])
cmd.kd = float(self.robot_data_.joint_kd_p_[motor_idx])
cmd.pos = float(self.robot_data_.q_d_[motor_idx +
self.floating_base_dof])
cmd.spd = float(
self.robot_data_.q_dot_d_[motor_idx +
self.floating_base_dof])
cmd.tor = float(
self.robot_data_.tau_d_[motor_idx +
self.floating_base_dof])
# 检查关节位置限位
within_limit, cmd.pos = self._check_and_clip_joint_limits_fast(
cmd.name, cmd.pos)
if not within_limit:
print(
f"Joint (id: {cmd.name}) position {cmd.pos} is out of limits"
)
arm_msg.cmds.append(cmd)
# print(f'arm_msg {arm_msg}')
self.pub_arm_motor_cmd.publish(arm_msg)
@timing_decorator
def send_motor_commands(self, flag: ControlFlag):
"""发布电机控制命令"""
if not self.initialized:
return
if not self.sim:
#真机
self.ankle_serial_to_parallel()
self.convert_to_motor_commands()
self.publish_motor_commands(flag)
# ROS回调函数
def _leg_motor_status_callback(self, msg):
"""腿部电机状态回调"""
try:
self.queue_leg_motor_state.put_nowait(msg)
except queue.Full:
# 队列满时移除旧数据,加入新数据
try:
self.queue_leg_motor_state.get_nowait() # 移除旧数据
self.queue_leg_motor_state.put_nowait(msg) # 加入新数据
except:
pass # 如果仍然无法加入,忽略
def _arm_motor_status_callback(self, msg):
"""手臂电机状态回调"""
try:
self.queue_arm_motor_state.put_nowait(msg)
except queue.Full:
# 队列满时移除旧数据,加入新数据
try:
self.queue_arm_motor_state.get_nowait() # 移除旧数据
self.queue_arm_motor_state.put_nowait(msg) # 加入新数据
except:
pass # 如果仍然无法加入,忽略
def _waist_motor_status_callback(self, msg):
"""腰部电机状态回调"""
try:
self.queue_waist_motor_state.put_nowait(msg)
except queue.Full:
# 队列满时移除旧数据,加入新数据
try:
self.queue_waist_motor_state.get_nowait() # 移除旧数据
self.queue_waist_motor_state.put_nowait(msg) # 加入新数据
except:
pass # 如果仍然无法加入,忽略
def _imu_status_callback(self, msg):
"""IMU状态回调"""
try:
self.queue_imu_xsens.put_nowait(msg)
except queue.Full:
try:
self.queue_imu_xsens.get_nowait() # 移除旧数据
self.queue_imu_xsens.put_nowait(msg) # 加入新数据
except:
pass # 如果仍然无法加入,忽略
def ankle_parallel_to_serial(self):
# 串并联转换:并转串 (类似C++版本中的处理)
# 提取左右脚两个踝关节(并联关节)
q_a_p = np.zeros(4) # 并联关节角度(实际)
qdot_a_p = np.zeros(4) # 并联关节速度(实际)
tor_a_p = np.zeros(4) # 并联关节力矩(实际)
q_a_s = np.zeros(4) # 串联关节角度(实际)
qdot_a_s = np.zeros(4) # 串联关节速度(实际)
tor_a_s = np.zeros(4) # 串联关节力矩(实际)
q_a_p[:2] = self.robot_data_.q_a_[
self.left_ankle_indices] # 左脚踝关节 (pitch, roll)
q_a_p[2:] = self.robot_data_.q_a_[
self.right_ankle_indices] # 右脚踝关节 (pitch, roll)
qdot_a_p[:2] = self.robot_data_.q_dot_a_[self.left_ankle_indices]
qdot_a_p[2:] = self.robot_data_.q_dot_a_[self.right_ankle_indices]
tor_a_p[:2] = self.robot_data_.tau_a_[self.left_ankle_indices]
tor_a_p[2:] = self.robot_data_.tau_a_[self.right_ankle_indices]
self.q_a_p = q_a_p.copy()
self.qdot_a_p = qdot_a_p.copy()
self.tor_a_p = tor_a_p.copy()
if self.debug:
print("-" * 20 + "并转串联前" + "-" * 20)
print("q_a_p:", q_a_p)
print("qdot_a_p:", qdot_a_p)
print("tor_a_p:", tor_a_p)
# 计算并转串(正运动学)
self.fun_s2p.set_p_est(q_a_p, qdot_a_p, tor_a_p)
self.fun_s2p.calcFK()
self.fun_s2p.calcIK()
success, q_a_s, qdot_a_s, tor_a_s = self.fun_s2p.get_s_state()
if self.debug:
print("-" * 20 + "并转串联后" + "-" * 20)
print("q_a_s:", q_a_s)
print("qdot_a_s:", qdot_a_s)
print("tor_a_s:", tor_a_s)
# 用串联关节值替换原来的并联关节值
self.robot_data_.q_a_[self.left_ankle_indices] = q_a_s[:2] # 左脚踝关节串联值
self.robot_data_.q_a_[self.right_ankle_indices] = q_a_s[2:] # 右脚踝关节串联值
self.robot_data_.q_dot_a_[self.left_ankle_indices] = qdot_a_s[:2]
self.robot_data_.q_dot_a_[self.right_ankle_indices] = qdot_a_s[2:]
self.robot_data_.tau_a_[self.left_ankle_indices] = tor_a_s[:2]
self.robot_data_.tau_a_[self.right_ankle_indices] = tor_a_s[2:]
def ankle_serial_to_parallel(self):
# 串转并将串联关节命令转换为并联关节命令类似C++版本)
# 提取踝关节两关节的串联命令
q_d_p = np.zeros(4) # 并联关节角度(期望)
qdot_d_p = np.zeros(4) # 并联关节速度(期望)
tor_d_p = np.zeros(4) # 并联关节力矩(期望)
q_d_s = np.zeros(4) # 串联关节角度(期望)
qdot_d_s = np.zeros(4) # 串联关节速度(期望)
tor_d_s = np.zeros(4) # 串联关节力矩(期望)
q_d_s[:2] = self.robot_data_.q_d_[self.left_ankle_indices] # 左脚踝关节串联命令
q_d_s[2:] = self.robot_data_.q_d_[
self.right_ankle_indices] # 右脚踝关节串联命令
qdot_d_s[:2] = self.robot_data_.q_dot_d_[self.left_ankle_indices]
qdot_d_s[2:] = self.robot_data_.q_dot_d_[self.right_ankle_indices]
tor_d_s[:2] = self.robot_data_.tau_d_[self.left_ankle_indices]
tor_d_s[2:] = self.robot_data_.tau_d_[self.right_ankle_indices]
q_a_s = np.zeros(4) # 串联关节角度(实际)
qdot_a_s = np.zeros(4) # 串联关节速度(实际)
q_a_s[:2] = self.robot_data_.q_a_[self.left_ankle_indices] # 左脚踝关节串联值
q_a_s[2:] = self.robot_data_.q_a_[self.right_ankle_indices] # 右脚踝关节串联值
qdot_a_s[:2] = self.robot_data_.q_dot_a_[
self.left_ankle_indices] # 左脚踝关节串联速度
qdot_a_s[2:] = self.robot_data_.q_dot_a_[
self.right_ankle_indices] # 右脚踝关节串联速度
kp = np.zeros(4) # 串联关节刚度
kd = np.zeros(4) # 串联关节阻尼
kp[:2] = self.robot_data_.joint_kp_p_[self.left_ankle_indices -
self.floating_base_dof]
kp[2:] = self.robot_data_.joint_kp_p_[self.right_ankle_indices -
self.floating_base_dof]
kd[:2] = self.robot_data_.joint_kd_p_[self.left_ankle_indices -
self.floating_base_dof]
kd[2:] = self.robot_data_.joint_kd_p_[self.right_ankle_indices -
self.floating_base_dof]
tor_d_s = kp * (q_d_s - q_a_s) + kd * (qdot_d_s - qdot_a_s)
if self.debug:
print("-" * 20 + "串转并联前" + "-" * 20)
print("q_d_s:", q_d_s)
print("qdot_d_s:", qdot_d_s)
print("tor_d_s:", tor_d_s)
# 串转并计算
self.fun_s2p.set_s_des(q_d_s, qdot_d_s, tor_d_s)
self.fun_s2p.calc_joint_pos_ref()
self.fun_s2p.calc_joint_tor_des()
success, q_d_p, qdot_d_p, tor_d_p = self.fun_s2p.get_p_des()
q_d_p = (tor_d_p - self.ankle_kd_p *
(qdot_d_p - self.qdot_a_p)) / self.ankle_kp_p + self.q_a_p
if self.debug:
print("-" * 20 + "串转并联后" + "-" * 20)
print("q_d_p:", q_d_p)
print("qdot_d_p:", qdot_d_p)
print("tor_d_p:", tor_d_p)
# 用并联关节命令覆盖原来的串联命令
self.robot_data_.q_d_[self.left_ankle_indices] = q_d_p[:2] # 左脚踝关节并联命令
self.robot_data_.q_d_[self.right_ankle_indices] = q_d_p[2:] # 右脚踝关节并联命令
self.robot_data_.q_dot_d_[self.left_ankle_indices] = qdot_d_p[:2]
self.robot_data_.q_dot_d_[self.right_ankle_indices] = qdot_d_p[2:]
# self.robot_data_.tau_d_[self.left_ankle_indices] = tor_d_p[:2]
# self.robot_data_.tau_d_[self.right_ankle_indices] = tor_d_p[2:]
self.robot_data_.tau_d_[self.left_ankle_indices] = 0.0
self.robot_data_.tau_d_[self.right_ankle_indices] = 0.0
# 更新脚踝Kp,Kd
self.robot_data_.joint_kp_p_[
self.left_ankle_indices -
self.floating_base_dof] = self.ankle_kp_p[:2]
self.robot_data_.joint_kp_p_[
self.right_ankle_indices -
self.floating_base_dof] = self.ankle_kp_p[2:]
self.robot_data_.joint_kd_p_[
self.left_ankle_indices -
self.floating_base_dof] = self.ankle_kd_p[:2]
self.robot_data_.joint_kd_p_[
self.right_ankle_indices -
self.floating_base_dof] = self.ankle_kd_p[2:]
def update_robot_cmd(self, flag: ControlFlag):
"""更新机器人控制命令"""
if flag.enable:
# 使用getattr设置默认值避免AttributeError
x_command = getattr(flag, 'x_speed_command')
y_command = getattr(flag, 'y_speed_command')
yaw_command = getattr(flag, 'yaw_speed_command')
self.robot_data_.walk_cmd_ = [x_command, y_command, yaw_command]
self.robot_data_.control_flag.fsm_state_command = flag.fsm_state_command
def get_robot_interface(robot_data: RobotData, config_path: str) -> RobotInterface:
"""工厂函数,返回机器人接口实例"""
return RobotInterfaceImpl(robot_data, config_path)

View File

@@ -0,0 +1,422 @@
"""
Keyboard Control Module for SSH and Local Environments
Keyboard input handling for robot state management without external libraries
"""
import threading
import sys
import select
import termios
import tty
import os
import yaml
from typing import Optional
from .joystick import ControlFlag
import signal
class KeyboardFlag(ControlFlag): # 继承ControlFlag
def __init__(self):
super().__init__() # 调用父类初始化
self.x_speed_command: float = 0.0
self.y_speed_command: float = 0.0
self.yaw_speed_command: float = 0.0
self.height_cmd: float = 0.89
class KeyboardController:
"""键盘控制器纯Python实现不依赖外部库"""
def __init__(self):
print("Keyboard Control Started (Pure Python Implementation)")
# 初始化成员变量
self.keyboard_flag = KeyboardFlag()
self.data_mutex = threading.Lock()
# 状态追踪变量
self.current_height = 0.89
self.target_height = 0.89
self.height_step = 0.05
# 配置参数
self.initial_height = 0.89
self.forward_command_offset = 0.0
self.lateral_command_offset = 0.0
self.rotation_command_offset = 0.0
self.max_forward_speed = 1.0
self.max_lateral_speed = 0.5
self.max_rotation_speed = 0.5
# 控制标志
self.running = False
self.input_thread = None
self.original_terminal_settings = None
# 加载配置文件
self._load_config()
print("Available keyboard commands:")
print(" z - Goto ZERO state")
print(" c - Goto STOP state")
print(" m - Goto WALKAMP state")
print(" p - Goto MYPOLICY state")
print(" n - Goto XSIMRUN state")
print(" Left/Right arrows - Adjust height")
print(" w/a/s/d - Movement controls")
print(" q/e - Rotation controls (turn left/right)")
print(" r - Reset all movement commands to zero")
print(" x - Quit")
print(" Ctrl+C - Emergency stop")
def _load_config(self):
"""加载YAML配置文件"""
try:
config_path = os.path.join('.', "config", "dex_config.yaml")
with open(config_path, 'r') as file:
config = yaml.safe_load(file)
if not config:
print("[Keyboard_controller] Failed to load config file")
return
keyboard_cfg = config.get("keyboard", {})
# 加载配置参数
self.initial_height = keyboard_cfg.get("initial_height", 0.89)
self.forward_command_offset = keyboard_cfg.get("forward_command_offset", 0.0)
self.lateral_command_offset = keyboard_cfg.get("lateral_command_offset", 0.0)
self.rotation_command_offset = keyboard_cfg.get("rotation_command_offset", 0.0)
self.height_step = keyboard_cfg.get("height_step", 0.05)
self.max_forward_speed = keyboard_cfg.get("max_forward_speed", 1.0)
self.max_lateral_speed = keyboard_cfg.get("max_lateral_speed", 0.5)
self.max_rotation_speed = keyboard_cfg.get("max_rotation_speed", 0.5)
print(f"Loaded keyboard config:")
print(f" Initial height: {self.initial_height}")
print(f" Height step: {self.height_step}")
print(f" Max forward speed: {self.max_forward_speed}")
print(f" Max lateral speed: {self.max_lateral_speed}")
print(f" Max rotation speed: {self.max_rotation_speed}")
self.current_height = self.initial_height
self.target_height = self.initial_height
self.keyboard_flag.height_cmd = self.current_height
except Exception as e:
print(f"[Keyboard_controller] YAML load error: {e}")
def start(self):
"""启动键盘监听线程"""
self.running = True
self.input_thread = threading.Thread(target=self._input_loop, daemon=True)
self.input_thread.start()
print("Keyboard input thread started")
def stop(self):
"""停止键盘监听"""
self.running = False
# 等待线程结束(但不要无限等待)
if self.input_thread and self.input_thread.is_alive():
self.input_thread.join(timeout=1.0)
if self.input_thread.is_alive():
print("Warning: Keyboard thread did not exit cleanly")
# # 恢复终端设置重要这会让Ctrl+C重新工作
if self.original_terminal_settings:
try:
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, self.original_terminal_settings)
self.original_terminal_settings = None
except Exception as e:
print(f"Error restoring terminal settings: {e}")
print("Keyboard input thread stopped")
def _input_loop(self):
"""主输入循环"""
# 保存原始终端设置
self.original_terminal_settings = termios.tcgetattr(sys.stdin)
try:
# 设置终端为原始模式,支持即时按键检测
tty.setraw(sys.stdin.fileno())
print("Keyboard listener ready. Press keys to control.")
print("Press 'x' to quit or Ctrl+C for emergency stop.")
while self.running:
# 检查输入100ms超时避免占用太多CPU
if select.select([sys.stdin], [], [], 0.1)[0]:
key = sys.stdin.read(1)
self._process_key(key)
except KeyboardInterrupt:
print("\nEmergency stop detected! Stopping keyboard controller.")
self._emergency_stop()
except Exception as e:
print(f"Input loop error: {e}")
finally:
# 确保终端设置被恢复
if self.original_terminal_settings:
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, self.original_terminal_settings)
self.original_terminal_settings = None
def _process_key(self, key):
"""处理按键输入"""
if key == 'w':
self._on_w_key()
elif key == 's':
self._on_s_key()
elif key == 'a':
self._on_a_key()
elif key == 'd':
self._on_d_key()
elif key == 'q':
self._on_q_key()
elif key == 'e':
self._on_e_key()
elif key == 'z':
self._on_z_key()
elif key == 'c':
self._on_c_key()
elif key == 'm':
self._on_m_key()
elif key == 'h':
self._on_h_key()
elif key == 'r':
self._on_r_key()
elif key == 'x':
self._on_x_key()
elif key == 'g':
self._on_g_key()
elif key == 'p':
self._on_p_key()
elif key == 'n':
self._on_n_key()
elif key == 'o':
self._on_o_key()
elif key == 'v':
self._on_v_key()
elif key == '\x03': # Ctrl+C
print("\nCtrl+C detected - sending interrupt signal")
self._handle_ctrl_c()
elif key == '\x1b': # ESC键可能是方向键
self._handle_arrow_key()
else:
# 忽略其他按键
pass
def _handle_arrow_key(self):
"""处理方向键序列"""
# 方向键序列: ESC + [ + A/B/C/D
if select.select([sys.stdin], [], [], 0.1)[0]:
key2 = sys.stdin.read(1)
if key2 == '[':
if select.select([sys.stdin], [], [], 0.1)[0]:
key3 = sys.stdin.read(1)
if key3 == 'D': # 左箭头
self._on_left_arrow()
elif key3 == 'C': # 右箭头
self._on_right_arrow()
elif key3 == 'A': # 上箭头
self._on_up_arrow()
elif key3 == 'B': # 下箭头
self._on_down_arrow()
def _on_left_arrow(self):
"""处理左箭头键 - 增加高度"""
with self.data_mutex:
self._increase_height()
def _on_right_arrow(self):
"""处理右箭头键 - 降低高度"""
with self.data_mutex:
self._decrease_height()
def _on_up_arrow(self):
"""处理上箭头键(备用功能)"""
print("Up arrow pressed - available for additional functions")
def _on_down_arrow(self):
"""处理下箭头键(备用功能)"""
print("Down arrow pressed - available for additional functions")
def _on_w_key(self):
"""处理w键 - 前进"""
with self.data_mutex:
self.keyboard_flag.x_speed_command += 0.1
if self.keyboard_flag.x_speed_command > self.max_forward_speed:
self.keyboard_flag.x_speed_command = self.max_forward_speed
print(f"Moving forward (speed: {self.keyboard_flag.x_speed_command:.2f})")
def _on_s_key(self):
"""处理s键 - 后退"""
with self.data_mutex:
self.keyboard_flag.x_speed_command -= 0.1
if self.keyboard_flag.x_speed_command < -self.max_forward_speed:
self.keyboard_flag.x_speed_command = -self.max_forward_speed
print(f"Moving backward (speed: {self.keyboard_flag.x_speed_command:.2f})")
def _on_a_key(self):
"""处理a键 - 左移"""
with self.data_mutex:
self.keyboard_flag.y_speed_command -= 0.1
if self.keyboard_flag.y_speed_command < -self.max_lateral_speed:
self.keyboard_flag.y_speed_command = -self.max_lateral_speed
print(f"Moving left (speed: {self.keyboard_flag.y_speed_command:.2f})")
def _on_d_key(self):
"""处理d键 - 右移"""
with self.data_mutex:
self.keyboard_flag.y_speed_command += 0.1
if self.keyboard_flag.y_speed_command > self.max_lateral_speed:
self.keyboard_flag.y_speed_command = self.max_lateral_speed
print(f"Moving right (speed: {self.keyboard_flag.y_speed_command:.2f})")
def _on_q_key(self):
"""处理q键 - 左转"""
with self.data_mutex:
self.keyboard_flag.yaw_speed_command -= 0.1
if self.keyboard_flag.yaw_speed_command < -self.max_rotation_speed:
self.keyboard_flag.yaw_speed_command = -self.max_rotation_speed
print(f"Turning left (speed: {self.keyboard_flag.yaw_speed_command:.2f})")
def _on_e_key(self):
"""处理e键 - 右转"""
with self.data_mutex:
self.keyboard_flag.yaw_speed_command += 0.1
if self.keyboard_flag.yaw_speed_command > self.max_rotation_speed:
self.keyboard_flag.yaw_speed_command = self.max_rotation_speed
print(f"Turning right (speed: {self.keyboard_flag.yaw_speed_command:.2f})")
def _on_z_key(self):
"""处理z键 - 切换到ZERO状态"""
with self.data_mutex:
self.keyboard_flag.fsm_state_command = "gotoZERO"
print("Command: gotoZERO")
def _on_v_key(self):
"""处理v键 - 切换到BEYONGDMIMIC状态"""
with self.data_mutex:
self.keyboard_flag.fsm_state_command = "gotoBEYONDMIMIC"
print("Command: gotoBEYONDMIMIC")
def _on_c_key(self):
"""处理c键 - 切换到STOP状态"""
with self.data_mutex:
self.keyboard_flag.fsm_state_command = "gotoSTOP"
print("Command: gotoSTOP")
def _on_r_key(self):
"""处理r键 - 重置移动命令"""
with self.data_mutex:
self.keyboard_flag.x_speed_command = 0.0
self.keyboard_flag.y_speed_command = 0.0
self.keyboard_flag.yaw_speed_command = 0.0
print("All movement commands reset to zero")
def _on_x_key(self):
"""处理x键 - 退出"""
with self.data_mutex:
self.running = False
print("Quit command received")
def _on_m_key(self):
"""处理m键 - 切换到WALKAMP状态"""
with self.data_mutex:
self.keyboard_flag.fsm_state_command = "gotoWALKAMP"
print("Command: gotoWALKAMP")
def _on_p_key(self):
"""处理p键 - 切换到MYPOLICY状态"""
with self.data_mutex:
self.keyboard_flag.x_speed_command = 0.0
self.keyboard_flag.y_speed_command = 0.0
self.keyboard_flag.yaw_speed_command = 0.0
self.keyboard_flag.fsm_state_command = "gotoMYPOLICY"
print("Command: gotoMYPOLICY (movement commands reset to zero)")
def _on_n_key(self):
"""处理n键 - 切换到XSIMRUN状态"""
with self.data_mutex:
self.keyboard_flag.x_speed_command = 0.0
self.keyboard_flag.y_speed_command = 0.0
self.keyboard_flag.yaw_speed_command = 0.0
self.keyboard_flag.fsm_state_command = "gotoXSIMRUN"
print("Command: gotoXSIMRUN (movement commands reset to zero)")
def _handle_ctrl_c(self):
"""处理Ctrl+C - 发送SIGINT信号给主进程"""
# 先停止键盘控制器
self.running = False
# 恢复终端设置,让信号处理正常工作
if self.original_terminal_settings:
try:
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, self.original_terminal_settings)
self.original_terminal_settings = None
except:
pass
# 发送SIGINT信号给当前进程
os.kill(os.getpid(), signal.SIGINT)
def _increase_height(self):
"""增加机器人高度"""
new_target = self.target_height + self.height_step
if new_target <= 0.90:
self.target_height = new_target
print(f"Height increased to {self.target_height:.2f}m")
else:
print("Maximum height reached (0.90m)")
def _decrease_height(self):
"""降低机器人高度"""
new_target = self.target_height - self.height_step
if new_target >= 0.65:
self.target_height = new_target
print(f"Height decreased to {self.target_height:.2f}m")
else:
print("Minimum height reached (0.65m)")
def update_flag(self):
"""更新控制标志"""
with self.data_mutex:
# 平滑高度调节
if abs(self.current_height - self.target_height) > 0.0001:
if self.current_height < self.target_height:
self.current_height += 0.0001
else:
self.current_height -= 0.0001
else:
self.current_height = self.target_height
self.keyboard_flag.height_cmd = self.current_height
def get_keyboard_flag(self) -> KeyboardFlag:
"""获取当前键盘标志的副本"""
with self.data_mutex:
flag_copy = KeyboardFlag()
flag_copy.__dict__.update(self.keyboard_flag.__dict__)
return flag_copy
def init(self) -> int:
"""初始化键盘控制器"""
print("Keyboard controller initialized (Pure Python)")
return 0
# 测试代码
if __name__ == "__main__":
controller = KeyboardController()
controller.init()
controller.start()
try:
# 主循环
while controller.running:
controller.update_flag()
import time
time.sleep(0.01)
except KeyboardInterrupt:
print("\nMain loop interrupted")
finally:
controller.stop()

View File

@@ -0,0 +1,250 @@
"""
XBOX Controller compatibility layer.
Implements the same FSM modes and control flags as `stdin_keyboard_control.py` / `joystick.py`.
"""
import os
import yaml
import threading
from typing import Optional
from dataclasses import dataclass
from sensor_msgs.msg import Joy
from .joystick import ControlFlag
class XBOXFlag(ControlFlag):
def __init__(self):
super().__init__()
self.x_speed_command: float = 0.0
self.y_speed_command: float = 0.0
self.yaw_speed_command: float = 0.0
self.motion_number: int = 0
self.height_cmd: float = 0.89
@dataclass
class XBOXMap:
# minimal axes/buttons mapping; populated from Joy message in xbox_map_read
a: float = 0.0
b: float = 0.0
x: float = 0.0
y: float = 0.0
lb: float = 0.0
rb: float = 0.0
select: float = 0.0
start: float = 0.0
l_trigger: float = 0.0
r_trigger: float = 0.0
lx: float = 0.0
ly: float = 0.0
rx: float = 0.0
ry: float = 0.0
# optional keys for devices with limited buttons
home: float = 0.0
# dpad placeholders (axes 6/7 or buttons depending on device)
dpad_h: float = 0.0
dpad_v: float = 0.0
class XBOXController:
"""XBOX controller that mirrors the joystick keyboard behavior."""
def __init__(self):
print("XBOX Controller Start")
self.map = XBOXMap()
self.flag = XBOXFlag()
self.data_mutex = threading.Lock()
# state tracking
self.last_select = 0
self.last_start = 0
# configuration
self.initial_height = 0.89
self.current_height = 0.89
self.target_height = 0.89
self.forward_command_offset = 0.0
self.lateral_command_offset = 0.0
self.rotation_command_offset = 0.0
# smoothing
self.height_step = 0.05
self._load_config()
# default button map indices (can be overridden in config)
self.button_map = {
'a': 0, 'b': 1, 'x': 2, 'y': 3,
'lb': 4, 'rb': 5, 'start': 7,
'select': 6, 'home': 8
}
# default axis map indices (can be overridden in config)
self.axis_map = {
'lx': 0, 'ly': 1, 'rx': 3, 'ry': 4,
'l_trigger': 2, 'r_trigger': 5,
'dpad_h': 6, 'dpad_v': 7
}
def _load_config(self):
try:
config_path = os.path.join('.', 'config', 'dex_config.yaml')
with open(config_path, 'r') as f:
cfg = yaml.safe_load(f) or {}
xbox_cfg = cfg.get('xbox', {})
# override button_map if provided
# bm = xbox_cfg.get('button_map')
# if isinstance(bm, dict):
# for k, v in bm.items():
# try:
# self.button_map[k] = int(v)
# except Exception:
# pass
# # override axis_map if provided
# am = xbox_cfg.get('axis_map')
# if isinstance(am, dict):
# for k, v in am.items():
# try:
# self.axis_map[k] = int(v)
# except Exception:
# pass
self.initial_height = xbox_cfg.get('initial_height', 0.89)
self.forward_command_offset = xbox_cfg.get('forward_command_offset', 0.0)
self.lateral_command_offset = xbox_cfg.get('lateral_command_offset', 0.0)
self.rotation_command_offset = xbox_cfg.get('rotation_command_offset', 0.0)
self.height_step = xbox_cfg.get('height_step', 0.05)
self.current_height = self.initial_height
self.target_height = self.initial_height
self.flag.height_cmd = self.current_height
print(f"Loaded XBOX config: initial_height={self.initial_height}")
except Exception as e:
print(f"[XBOXController] YAML load error: {e}")
def xbox_map_read(self, msg: Joy):
"""Populate internal map from a ROS Joy message."""
with self.data_mutex:
# axes layout may differ; try safe indexing
axes = list(msg.axes) + [0.0] * 16
buttons = list(msg.buttons) + [0] * 32
# common mapping assumptions (best-effort)
self.map.lx = axes[self.axis_map['lx']]
self.map.ly = axes[self.axis_map['ly']]
self.map.rx = axes[self.axis_map['rx']]
self.map.ry = axes[self.axis_map['ry']]
# triggers sometimes on axes
self.map.l_trigger = axes[self.axis_map['l_trigger']]
self.map.r_trigger = axes[self.axis_map['r_trigger']]
# dpad may be on axes
self.map.dpad_h = axes[self.axis_map['dpad_h']]
self.map.dpad_v = axes[self.axis_map['dpad_v']]
# buttons using button_map indices
for name, idx in self.button_map.items():
try:
val = buttons[idx]
except Exception:
val = 0
setattr(self.map, name, val)
def xbox_flag_update(self):
"""Update ControlFlag from the xbox map, mirroring joystick logic."""
with self.data_mutex:
# FSM state mapping - cover keyboard commands z/c/m/h/g/p/o
# c -> gotoSTOP
if self.map.y == 1:
self.flag.fsm_state_command = 'gotoSTOP'
# h -> gotoDH (Left trigger + A)
# v -> gotoBEYONDMIMIC (Left trigger + home)
elif self.map.l_trigger < -0.5 and self.map.home == 1:
self.flag.fsm_state_command = 'gotoBEYONDMIMIC'
# z -> gotoZERO
elif self.map.x == 1:
self.flag.fsm_state_command = 'gotoZERO'
# detect state change
if not hasattr(self, '_last_state'):
self._last_state = self.flag.fsm_state_command
state_changed = (self.flag.fsm_state_command != self._last_state)
self._last_state = self.flag.fsm_state_command
if (state_changed and
(self.flag.fsm_state_command == 'gotoZERO' or self.flag.fsm_state_command == 'gotoSTOP')):
self.current_height = self.initial_height
self.target_height = self.initial_height
self.flag.height_cmd = self.current_height
# velocity mapping: continuous (left stick) and small discrete adjustments via buttons
ly = float(self.map.ly)
lx = float(self.map.lx)
rx = float(self.map.rx)
# continuous stick control (same scaling as joystick)
if ly >= 0:
self.flag.x_speed_command = (ly * 0.8 + self.forward_command_offset)
else:
self.flag.x_speed_command = ly * 0.5
self.flag.y_speed_command = (lx * -0.4 + self.lateral_command_offset)
self.flag.yaw_speed_command = (rx * -0.4 + self.rotation_command_offset)
# discrete movement adjustments (map buttons to keyboard-like increments)
# emulate w/s/a/d via shoulder buttons or dpad if needed
# D-pad (axes 6/7) is common: we'll read them in xbox_map_read if available
dpad_h = getattr(self.map, 'dpad_h', 0.0)
dpad_v = getattr(self.map, 'dpad_v', 0.0)
# left/right dpad emulate arrow keys for height adjust
motion_add_number = 0
if dpad_h == -1.0 and getattr(self, 'last_dpad_h', 0.0) == 0.0:
# left arrow -> increase height
motion_add_number = 1
elif dpad_h == 1.0 and getattr(self, 'last_dpad_h', 0.0) == 0.0:
# right arrow -> decrease height
motion_add_number = -1
# Remove select/start height adjustments to use only dpad_h for height control
# if self.map.select == 1 and self.last_select == 0 and motion_add_number == 0:
# motion_add_number = 1
# elif self.map.start == 1 and self.last_start == 0 and motion_add_number == 0:
# motion_add_number = -1
self.last_select = self.map.select
self.last_start = self.map.start
self.last_dpad_h = dpad_h
self.flag.motion_number = motion_add_number
if motion_add_number != 0:
new_target = self.target_height + (motion_add_number * 0.05)
if new_target > 0.90:
new_target = 0.90
elif new_target < 0.65:
new_target = 0.65
self.target_height = new_target
# smooth height
step = 0.01
if abs(self.current_height - self.target_height) > step:
if self.current_height < self.target_height:
self.current_height += step
else:
self.current_height -= step
else:
self.current_height = self.target_height
self.flag.height_cmd = self.current_height
# reset movement (r key) -> using START button
if self.map.start == 1:
self.flag.x_speed_command = 0.0
self.flag.y_speed_command = 0.0
self.flag.yaw_speed_command = 0.0
def get_xbox_flag(self) -> ControlFlag:
with self.data_mutex:
return self.flag
def init(self) -> int:
print("XBOX controller initialized")
return 0