Files
tienkung-szu/Deploy_Tienkung/common/robot_interface.py
2026-03-30 15:30:30 +08:00

813 lines
34 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Robot Interface
Python equivalent of the C++ RobotInterface class
"""
from __future__ import annotations
import queue
from common.peekqueue import PeekableQueue
import yaml
import os
from abc import ABC, abstractmethod
from typing import Dict, Any
import numpy as np
# ROS messages
from bodyctrl_msgs.msg import MotorStatusMsg, CmdMotorCtrl, MotorCtrl, Imu, MotorStatus
import rclpy
from std_msgs.msg import String
from rclpy.node import Node
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy
import transforms3d as t3d
from .body_id_map import BodyServoIdMap
from .robot_data import RobotData
import functools
import time
import math
from sptlib_python import funcSPTrans as FuncSPTrans
from common.joystick import ControlFlag
from geometry_msgs.msg import TwistStamped
from std_msgs.msg import Float64
from FSM.fsm_base import FSMStateName
def timing_decorator(func):
"""
装饰器:记录函数执行时间
"""
@functools.wraps(func)
def wrapper(*args, **kwargs):
start_time = time.perf_counter()
result = func(*args, **kwargs)
end_time = time.perf_counter()
execution_time = end_time - start_time
# print(f"[TIMING] {func.__name__} executed in {execution_time:.6f} seconds")
return result
return wrapper
class RobotInterface(ABC):
"""机器人接口抽象基类"""
def __init__(self, robot_data: RobotData):
self.robot_data_ = robot_data
@abstractmethod
def init(self, node: Node):
"""初始化接口"""
pass
@abstractmethod
def update_robot_data(self, flag:ControlFlag, time_passed: float):
"""更新机器人状态"""
pass
@abstractmethod
def send_motor_commands(self, flag: ControlFlag):
"""发布电机控制命令"""
pass
class RobotInterfaceImpl(RobotInterface):
"""机器人接口具体实现"""
def __init__(self, robot_data: RobotData, config_path: str = ''):
super().__init__(robot_data)
self.initialized = False
self.node = None
self.config_path = config_path
# ID映射
self.id_map = BodyServoIdMap()
self.id_map.body_can_id_map_init()
# 消息队列
self.queue_leg_motor_state = PeekableQueue(maxsize=1)
self.queue_arm_motor_state = PeekableQueue(maxsize=1)
self.queue_waist_motor_state = PeekableQueue(maxsize=1)
self.queue_imu_xsens = PeekableQueue(maxsize=1)
self.queue_walk_cmd = PeekableQueue(maxsize=1)
# 关节维度
self.floating_base_dof = 6
self.whole_joint_nums = self.id_map.whole_motor_nums + self.floating_base_dof
# 临时变量用于优化计算
self.temp_q = np.empty(self.id_map.whole_motor_nums)
# 预分配另一个用于存储中间计算的临时数组
self._temp_zero_cnt = np.empty(self.id_map.whole_motor_nums)
# 电机控制参数
self.motor_dir = np.ones(self.id_map.whole_motor_nums)
self.zero_cnt = np.zeros(self.id_map.whole_motor_nums)
self.zero_offset = np.zeros(self.id_map.whole_motor_nums)
# 添加标志位,用于跟踪是否是首次接收数据
self.first_leg_data_received = False
self.first_arm_data_received = False
self.first_waist_data_received = False
# 关节限位
self.joint_limits = {}
self.joint_pos_threshold = math.pi
# 串并联转换器
self.fun_s2p = FuncSPTrans()
# 串并联转换相关变量
self.left_ankle_indices = np.array([4, 5]) + self.floating_base_dof
self.right_ankle_indices = np.array([10, 11]) + self.floating_base_dof
self.q_a_p = np.zeros(4) # 并联关节位置
self.qdot_a_p = np.zeros(4) # 并联关节速度
self.tor_a_p = np.zeros(4) # 并联关节力矩
self.ankle_kp_p = np.zeros(4) # 并联关节刚度
self.ankle_kd_p = np.zeros(4) # 并联关节阻尼
# TF相关属性
self.tf_buffer = None
self.tf_listener = None
# ROS publishers and subscribers
self.pub_leg_motor_cmd = None
self.pub_arm_motor_cmd = None
self.pub_waist_motor_cmd = None
self.pub_fsm_state = None
self.sub_leg_state = None
self.sub_arm_state = None
self.sub_waist_state = None
# 当前机器人所处状态
self.current_state: FSMStateName = FSMStateName.STOP
def update_param(self, current_state: FSMStateName = None):
"""更新机器人接口"""
if current_state is not None:
self.current_state = current_state
def load_config(self):
"""从配置文件加载关键参数"""
config_path = self.config_path
if not os.path.exists(config_path):
self.node.get_logger().error(
f"Joint limits config file not found: {config_path}")
raise FileNotFoundError(
f"Joint limits config file not found: {config_path}")
try:
with open(config_path, 'r') as f:
config = yaml.safe_load(f)
except Exception as e:
self.node.get_logger().error(
f"Failed to load joint limits config from {config_path}: {e}")
raise RuntimeError(
f"Failed to load joint limits config from {config_path}: {e}")
# 读取运行模式
self.sim = config.get('sim')
self.debug = config.get('debug')
# 机器人接口配置
interface_config = config.get('robot_interface')
# 是否限位
self.clip_actions = interface_config.get('clip_actions')
# 加载关节限位值
self._load_joint_limits(interface_config)
# 加载控制状态
self._load_control_status(interface_config)
# 零位
self.zero_pos = np.array(interface_config.get('zero_pos'))
# 电流转换比例
self.ct_scale = np.array(interface_config.get('ct_scale'))
# IMU 数据偏移
self.xsense_roll_offset = interface_config.get(
'xsense_data_roll_offset')
# 禁用电机
self.disable_joints_ = interface_config.get('disable_joints', False)
# 脚踝Kp,Kd
self.ankle_kp_p = np.array(interface_config.get('ankle_kp_p'))
self.ankle_kd_p = np.array(interface_config.get('ankle_kd_p'))
def _load_control_status(self, config: Dict[str, Any]):
# 字符串命令到枚举值的映射
state_to_FSMState = {
"STOP": FSMStateName.STOP,
"ZERO": FSMStateName.ZERO,
"WALKAMP": FSMStateName.WALKAMP,
}
self.waist_control_status = [state_to_FSMState[state] for state in config.get('waist_control_status')]
self.legs_control_status = [state_to_FSMState[state] for state in config.get('legs_control_status')]
self.arms_control_status = [state_to_FSMState[state] for state in config.get('arms_control_status')]
self.left_arm_only_status = [state_to_FSMState[state] for state in config.get('left_arm_only_status')]
self.right_arm_only_status = [state_to_FSMState[state] for state in config.get('right_arm_only_status')]
def _load_joint_limits(self, config: Dict[str, Any]):
"""从配置文件加载关节限位值"""
# 从配置中获取关节限位信息
joint_limits_config = config.get('joint_limits', None)
if joint_limits_config is None:
error_msg = "No joint_limits section in config"
self.node.get_logger().error(error_msg)
raise ValueError(error_msg)
else:
# 从配置中加载限位值
for joint_name, limits in joint_limits_config.items():
if 'min' in limits and 'max' in limits:
self.joint_limits[joint_name] = {
'min': float(limits['min']),
'max': float(limits['max'])
}
else:
error_msg = f"Invalid limits for joint {joint_name}"
self.node.get_logger().error(error_msg)
raise ValueError(error_msg)
self.node.get_logger().info(f"Loaded joint limits from {config}")
# 预计算ID到限位的映射
self.id_to_limits = {}
for joint_name, limits in self.joint_limits.items():
index = self.id_map.get_index_by_name(joint_name)
if index >= 0:
motor_id = self.id_map.get_id_by_index(index)
self.id_to_limits[motor_id] = limits
# 记录加载的限位值
for joint_name, limits in self.joint_limits.items():
self.node.get_logger().debug(
f"Joint {joint_name}: [{limits['min']}, {limits['max']}]")
print("-" * 30 + '关节限位值' + '-' * 30)
print(self.joint_limits)
def init(self, node: Node):
"""初始化接口"""
self.node = node
self.initialized = True
# 初始化ROS接口
self._init_ros_interfaces()
# 加载配置文件
self.load_config()
node.get_logger().info("Robot interface initialized")
def _init_ros_interfaces(self):
"""初始化ROS接口"""
qos_profile = QoSProfile(
# reliability=ReliabilityPolicy.RELIABLE,
# history=HistoryPolicy.KEEP_LAST,
depth=10)
# 发布者
self.pub_leg_motor_cmd = self.node.create_publisher(
CmdMotorCtrl, '/leg/cmd_ctrl', qos_profile)
self.pub_arm_motor_cmd = self.node.create_publisher(
CmdMotorCtrl, '/arm/cmd_ctrl', qos_profile)
self.pub_waist_motor_cmd = self.node.create_publisher(
CmdMotorCtrl, '/waist/cmd_ctrl', qos_profile)
self.pub_fsm_state = self.node.create_publisher(
String, '/control/fsm_state', qos_profile)
# 订阅者
self.sub_leg_state = self.node.create_subscription(
MotorStatusMsg, '/leg/status', self._leg_motor_status_callback,
qos_profile)
self.sub_arm_state = self.node.create_subscription(
MotorStatusMsg, '/arm/status', self._arm_motor_status_callback,
qos_profile)
self.sub_waist_state = self.node.create_subscription(
MotorStatusMsg, '/waist/status', self._waist_motor_status_callback,
qos_profile)
#(非电机相关)
self.sub_imu_xsens = self.node.create_subscription(
Imu, '/imu/status', self._imu_status_callback, qos_profile)
# @timing_decorator
def get_imu_data(self):
"""处理传感器数据(非电机)"""
# 处理IMU数据
while True:
try:
msg = self.queue_imu_xsens.peek()
self.robot_data_.imu_data_[0] = msg.euler.yaw
self.robot_data_.imu_data_[1] = msg.euler.pitch
self.robot_data_.imu_data_[2] = msg.euler.roll
self.robot_data_.imu_data_[3] = msg.angular_velocity.x
self.robot_data_.imu_data_[4] = msg.angular_velocity.y
self.robot_data_.imu_data_[5] = msg.angular_velocity.z
self.robot_data_.imu_data_[6] = msg.linear_acceleration.x
self.robot_data_.imu_data_[7] = msg.linear_acceleration.y
self.robot_data_.imu_data_[8] = msg.linear_acceleration.z
self.robot_data_.imu_quat_[0] = msg.orientation.w
self.robot_data_.imu_quat_[1] = msg.orientation.x
self.robot_data_.imu_quat_[2] = msg.orientation.y
self.robot_data_.imu_quat_[3] = msg.orientation.z
break
except queue.Empty:
time.sleep(0.0001)
def update_robot_state(self):
"""读取电机状态数据更新为机器人状态数据"""
# 处理腿部电机状态
while True:
try:
msg = self.queue_leg_motor_state.peek()
if self.debug:
current_time = self.node.get_clock().now().to_msg()
msg_time = msg.header.stamp
time_diff = (current_time.sec -
msg_time.sec) * 1000000000 + (
current_time.nanosec - msg_time.nanosec)
time_diff_ms = time_diff / 1000000.0
print(f"Time difference: {time_diff_ms} ms")
for status in msg.status:
self.motor_state_to_robot_state(
status, self.first_leg_data_received)
break
except queue.Empty:
time.sleep(0.0001)
# 处理手臂电机状态
while True:
try:
msg = self.queue_arm_motor_state.peek()
if self.debug:
current_time = self.node.get_clock().now().to_msg()
msg_time = msg.header.stamp
time_diff = (current_time.sec -
msg_time.sec) * 1000000000 + (
current_time.nanosec - msg_time.nanosec)
time_diff_ms = time_diff / 1000000.0
print(f"Time difference: {time_diff_ms} ms")
for status in msg.status:
self.motor_state_to_robot_state(
status, self.first_arm_data_received)
break
except queue.Empty:
time.sleep(0.0001)
# 处理腰部电机状态
while True:
try:
msg = self.queue_waist_motor_state.peek()
if self.debug:
current_time = self.node.get_clock().now().to_msg()
msg_time = msg.header.stamp
time_diff = (current_time.sec -
msg_time.sec) * 1000000000 + (
current_time.nanosec - msg_time.nanosec)
time_diff_ms = time_diff / 1000000.0
print(f"Time difference: {time_diff_ms} ms")
for status in msg.status:
self.motor_state_to_robot_state(
status, self.first_waist_data_received)
break
except queue.Empty:
time.sleep(0.0001)
def motor_state_to_robot_state(self, status, received_flag: bool):
index = self.id_map.get_index_by_id(status.name)
if index >= 0:
robotdata_index = index + self.floating_base_dof # 偏移到完整关节数组中
# 直接赋值
self.robot_data_.q_a_[robotdata_index] = status.pos
self.robot_data_.q_dot_a_[robotdata_index] = status.speed
self.robot_data_.tau_a_[
robotdata_index] = status.current * self.ct_scale[min(
index,
len(self.ct_scale) - 1)]
self.robot_data_.temperature_a[
robotdata_index - self.floating_base_dof] = status.temperature
self.robot_data_.q_a_[robotdata_index] = (
status.pos - self.zero_pos[index]
) * self.motor_dir[index] + self.zero_offset[index]
if self.robot_data_.q_a_[robotdata_index] > math.pi:
self.zero_cnt[index] = -1.0
elif self.robot_data_.q_a_[robotdata_index] < -math.pi:
self.zero_cnt[index] = 1.0
self.robot_data_.q_a_[
robotdata_index] += self.zero_cnt[index] * 2.0 * math.pi
self.robot_data_.q_dot_a_[robotdata_index] *= self.motor_dir[index]
self.robot_data_.tau_a_[robotdata_index] *= self.motor_dir[index]
if not received_flag or abs(
self.robot_data_.q_a_[robotdata_index] -
self.robot_data_.q_a_last[robotdata_index]
) > self.joint_pos_threshold:
if received_flag:
self.node.get_logger().warn(
f"Joint {index} error detected")
self.robot_data_.q_a_[
robotdata_index] = self.robot_data_.q_a_last[
robotdata_index]
self.robot_data_.q_dot_a_[
robotdata_index] = self.robot_data_.qdot_a_last[
robotdata_index]
self.robot_data_.tau_a_[
robotdata_index] = self.robot_data_.tor_a_last[
robotdata_index]
else:
# 首次接收数据,更新标志位
received_flag = True
self.robot_data_.q_a_last[robotdata_index] = self.robot_data_.q_a_[
robotdata_index]
self.robot_data_.qdot_a_last[
robotdata_index] = self.robot_data_.q_dot_a_[robotdata_index]
self.robot_data_.tor_a_last[
robotdata_index] = self.robot_data_.tau_a_[robotdata_index]
def update_sensor_states(self):
# 获取Imu数据
self.get_imu_data()
# 添加IMU偏移
self.robot_data_.imu_data_[2] += self.xsense_roll_offset
def update_robot_data(self, flag: ControlFlag, time_passed: float):
# 更新传感器数据
self.update_sensor_states()
# 更新电机状态数据
self.update_robot_state()
# 更新机器人控制命令
self.update_robot_cmd(flag)
# 脚踝并联转串联
self.backup_serial_cmd()
if not self.sim:
#真机
self.ankle_parallel_to_serial()
# 更新robot_data 时间戳
self.robot_data_.time_now_ = time_passed
def backup_serial_cmd(self):
self.robot_data_.q_d_s_ = self.robot_data_.q_d_.copy()
def _check_and_clip_joint_limits_fast(
self, cmd_name: int, position: float) -> tuple[bool, float]:
"""
快速检查并修正关节位置限位(避免重复查询)
"""
if not self.sim and self.clip_actions:
limit = self.id_to_limits[cmd_name]
clipped_pos = np.clip(position, limit["min"], limit["max"])
return clipped_pos == position, clipped_pos
else:
return True, position
@timing_decorator
def convert_to_motor_commands(self):
"""将机器人控制命令转换为电机控制命令"""
# 使用切片操作一次性复制数据,避免逐个元素赋值
q_d_reordered = self.robot_data_.q_d_[self.floating_base_dof:]
qdot_d_reordered = self.robot_data_.q_dot_d_[self.floating_base_dof:]
tor_d_reordered = self.robot_data_.tau_d_[self.floating_base_dof:]
# 计算 q_d_reordered - self.zero_offset
np.subtract(q_d_reordered, self.zero_offset, out=self.temp_q)
# 计算 self.zero_cnt * 2.0 * self.pi
np.multiply(self.zero_cnt, 2.0 * math.pi, out=self._temp_zero_cnt)
# 计算 q_d_reordered - self.zero_offset - self.zero_cnt * 2.0 * self.pi
np.subtract(self.temp_q, self._temp_zero_cnt, out=self.temp_q)
# 计算 (q_d_reordered - self.zero_offset - self.zero_cnt * 2.0 * self.pi) * self.motor_dir
np.multiply(self.temp_q, self.motor_dir, out=self.temp_q)
# 计算最终结果 (q_d_reordered - self.zero_offset - self.zero_cnt * 2.0 * self.pi) * self.motor_dir + self.zero_pos
np.add(self.temp_q,
self.zero_pos,
out=self.robot_data_.q_d_[self.floating_base_dof:])
# qdot_d和tor_d的计算也可以向量化
np.multiply(qdot_d_reordered,
self.motor_dir,
out=self.robot_data_.q_dot_d_[self.floating_base_dof:])
np.multiply(tor_d_reordered,
self.motor_dir,
out=self.robot_data_.tau_d_[self.floating_base_dof:])
# 如果关节被禁用
if self.disable_joints_:
self.robot_data_.joint_kp_p_.fill(0.0)
self.robot_data_.joint_kd_p_.fill(0.0)
self.robot_data_.tau_d_.fill(0.0)
self.node.get_logger().warn("Joints disabled!")
def publish_motor_commands(self, flag: ControlFlag):
# flag_fsm_command = flag.fsm_state_command
current_state = self.current_state
if self.pub_fsm_state is not None:
state_msg = String()
state_msg.data = current_state.name
self.pub_fsm_state.publish(state_msg)
# 发布腿部控制命令
if self.legs_control_status == [] or current_state in self.legs_control_status:
leg_msg = CmdMotorCtrl()
leg_msg.header.stamp = self.node.get_clock().now().to_msg()
for i in range(self.id_map.leg_motor_nums):
cmd = MotorCtrl()
cmd.name = self.id_map.get_id_by_index(i)
cmd.kp = float(self.robot_data_.joint_kp_p_[i])
cmd.kd = float(self.robot_data_.joint_kd_p_[i])
cmd.pos = float(self.robot_data_.q_d_[i +
self.floating_base_dof])
cmd.spd = float(
self.robot_data_.q_dot_d_[i + self.floating_base_dof])
cmd.tor = float(
self.robot_data_.tau_d_[i + self.floating_base_dof])
# 检查关节位置限位
within_limit, cmd.pos = self._check_and_clip_joint_limits_fast(
cmd.name, cmd.pos)
if not within_limit:
print(
f"Joint (id: {cmd.name}) position {cmd.pos} is out of limits"
)
leg_msg.cmds.append(cmd)
self.pub_leg_motor_cmd.publish(leg_msg)
# 只在特定模式下控制腰部
if current_state in self.waist_control_status:
# 腰部控制命令
waist_msg = CmdMotorCtrl()
waist_msg.header.stamp = self.node.get_clock().now().to_msg()
for i in range(self.id_map.waist_motor_nums):
cmd = MotorCtrl()
motor_idx = i + self.id_map.leg_motor_nums
cmd.name = self.id_map.get_id_by_index(
motor_idx) # 12 -> 33(yaw)
cmd.kp = float(self.robot_data_.joint_kp_p_[motor_idx])
cmd.kd = float(self.robot_data_.joint_kd_p_[motor_idx])
cmd.pos = float(self.robot_data_.q_d_[motor_idx +
self.floating_base_dof])
cmd.spd = float(
self.robot_data_.q_dot_d_[motor_idx +
self.floating_base_dof])
cmd.tor = float(
self.robot_data_.tau_d_[motor_idx +
self.floating_base_dof])
# 检查关节位置限位
within_limit, cmd.pos = self._check_and_clip_joint_limits_fast(
cmd.name, cmd.pos)
if not within_limit:
print(
f"Joint (id: {cmd.name}) position {cmd.pos} is out of limits"
)
waist_msg.cmds.append(cmd)
# print(f'waist_msg {waist_msg}')
self.pub_waist_motor_cmd.publish(waist_msg)
# 只在特定模式下控制手臂
if current_state in self.arms_control_status:
# 手臂控制命令
arm_msg = CmdMotorCtrl()
arm_msg.header.stamp = self.node.get_clock().now().to_msg()
if current_state in self.left_arm_only_status:
control_index = np.arange(0, 7)
elif current_state in self.right_arm_only_status:
control_index = np.arange(self.id_map.arm_motor_nums - 7, self.id_map.arm_motor_nums)
else:
control_index = np.arange(0, self.id_map.arm_motor_nums)
for i in control_index:
cmd = MotorCtrl()
motor_idx = i + self.id_map.leg_motor_nums + self.id_map.waist_motor_nums
cmd.name = self.id_map.get_id_by_index(motor_idx)
cmd.kp = float(self.robot_data_.joint_kp_p_[motor_idx])
cmd.kd = float(self.robot_data_.joint_kd_p_[motor_idx])
cmd.pos = float(self.robot_data_.q_d_[motor_idx +
self.floating_base_dof])
cmd.spd = float(
self.robot_data_.q_dot_d_[motor_idx +
self.floating_base_dof])
cmd.tor = float(
self.robot_data_.tau_d_[motor_idx +
self.floating_base_dof])
# 检查关节位置限位
within_limit, cmd.pos = self._check_and_clip_joint_limits_fast(
cmd.name, cmd.pos)
if not within_limit:
print(
f"Joint (id: {cmd.name}) position {cmd.pos} is out of limits"
)
arm_msg.cmds.append(cmd)
# print(f'arm_msg {arm_msg}')
self.pub_arm_motor_cmd.publish(arm_msg)
@timing_decorator
def send_motor_commands(self, flag: ControlFlag):
"""发布电机控制命令"""
if not self.initialized:
return
if not self.sim:
#真机
self.ankle_serial_to_parallel()
self.convert_to_motor_commands()
self.publish_motor_commands(flag)
# ROS回调函数
def _leg_motor_status_callback(self, msg):
"""腿部电机状态回调"""
try:
self.queue_leg_motor_state.put_nowait(msg)
except queue.Full:
# 队列满时移除旧数据,加入新数据
try:
self.queue_leg_motor_state.get_nowait() # 移除旧数据
self.queue_leg_motor_state.put_nowait(msg) # 加入新数据
except:
pass # 如果仍然无法加入,忽略
def _arm_motor_status_callback(self, msg):
"""手臂电机状态回调"""
try:
self.queue_arm_motor_state.put_nowait(msg)
except queue.Full:
# 队列满时移除旧数据,加入新数据
try:
self.queue_arm_motor_state.get_nowait() # 移除旧数据
self.queue_arm_motor_state.put_nowait(msg) # 加入新数据
except:
pass # 如果仍然无法加入,忽略
def _waist_motor_status_callback(self, msg):
"""腰部电机状态回调"""
try:
self.queue_waist_motor_state.put_nowait(msg)
except queue.Full:
# 队列满时移除旧数据,加入新数据
try:
self.queue_waist_motor_state.get_nowait() # 移除旧数据
self.queue_waist_motor_state.put_nowait(msg) # 加入新数据
except:
pass # 如果仍然无法加入,忽略
def _imu_status_callback(self, msg):
"""IMU状态回调"""
try:
self.queue_imu_xsens.put_nowait(msg)
except queue.Full:
try:
self.queue_imu_xsens.get_nowait() # 移除旧数据
self.queue_imu_xsens.put_nowait(msg) # 加入新数据
except:
pass # 如果仍然无法加入,忽略
def ankle_parallel_to_serial(self):
# 串并联转换:并转串 (类似C++版本中的处理)
# 提取左右脚两个踝关节(并联关节)
q_a_p = np.zeros(4) # 并联关节角度(实际)
qdot_a_p = np.zeros(4) # 并联关节速度(实际)
tor_a_p = np.zeros(4) # 并联关节力矩(实际)
q_a_s = np.zeros(4) # 串联关节角度(实际)
qdot_a_s = np.zeros(4) # 串联关节速度(实际)
tor_a_s = np.zeros(4) # 串联关节力矩(实际)
q_a_p[:2] = self.robot_data_.q_a_[
self.left_ankle_indices] # 左脚踝关节 (pitch, roll)
q_a_p[2:] = self.robot_data_.q_a_[
self.right_ankle_indices] # 右脚踝关节 (pitch, roll)
qdot_a_p[:2] = self.robot_data_.q_dot_a_[self.left_ankle_indices]
qdot_a_p[2:] = self.robot_data_.q_dot_a_[self.right_ankle_indices]
tor_a_p[:2] = self.robot_data_.tau_a_[self.left_ankle_indices]
tor_a_p[2:] = self.robot_data_.tau_a_[self.right_ankle_indices]
self.q_a_p = q_a_p.copy()
self.qdot_a_p = qdot_a_p.copy()
self.tor_a_p = tor_a_p.copy()
if self.debug:
print("-" * 20 + "并转串联前" + "-" * 20)
print("q_a_p:", q_a_p)
print("qdot_a_p:", qdot_a_p)
print("tor_a_p:", tor_a_p)
# 计算并转串(正运动学)
self.fun_s2p.set_p_est(q_a_p, qdot_a_p, tor_a_p)
self.fun_s2p.calcFK()
self.fun_s2p.calcIK()
success, q_a_s, qdot_a_s, tor_a_s = self.fun_s2p.get_s_state()
if self.debug:
print("-" * 20 + "并转串联后" + "-" * 20)
print("q_a_s:", q_a_s)
print("qdot_a_s:", qdot_a_s)
print("tor_a_s:", tor_a_s)
# 用串联关节值替换原来的并联关节值
self.robot_data_.q_a_[self.left_ankle_indices] = q_a_s[:2] # 左脚踝关节串联值
self.robot_data_.q_a_[self.right_ankle_indices] = q_a_s[2:] # 右脚踝关节串联值
self.robot_data_.q_dot_a_[self.left_ankle_indices] = qdot_a_s[:2]
self.robot_data_.q_dot_a_[self.right_ankle_indices] = qdot_a_s[2:]
self.robot_data_.tau_a_[self.left_ankle_indices] = tor_a_s[:2]
self.robot_data_.tau_a_[self.right_ankle_indices] = tor_a_s[2:]
def ankle_serial_to_parallel(self):
# 串转并将串联关节命令转换为并联关节命令类似C++版本)
# 提取踝关节两关节的串联命令
q_d_p = np.zeros(4) # 并联关节角度(期望)
qdot_d_p = np.zeros(4) # 并联关节速度(期望)
tor_d_p = np.zeros(4) # 并联关节力矩(期望)
q_d_s = np.zeros(4) # 串联关节角度(期望)
qdot_d_s = np.zeros(4) # 串联关节速度(期望)
tor_d_s = np.zeros(4) # 串联关节力矩(期望)
q_d_s[:2] = self.robot_data_.q_d_[self.left_ankle_indices] # 左脚踝关节串联命令
q_d_s[2:] = self.robot_data_.q_d_[
self.right_ankle_indices] # 右脚踝关节串联命令
qdot_d_s[:2] = self.robot_data_.q_dot_d_[self.left_ankle_indices]
qdot_d_s[2:] = self.robot_data_.q_dot_d_[self.right_ankle_indices]
tor_d_s[:2] = self.robot_data_.tau_d_[self.left_ankle_indices]
tor_d_s[2:] = self.robot_data_.tau_d_[self.right_ankle_indices]
q_a_s = np.zeros(4) # 串联关节角度(实际)
qdot_a_s = np.zeros(4) # 串联关节速度(实际)
q_a_s[:2] = self.robot_data_.q_a_[self.left_ankle_indices] # 左脚踝关节串联值
q_a_s[2:] = self.robot_data_.q_a_[self.right_ankle_indices] # 右脚踝关节串联值
qdot_a_s[:2] = self.robot_data_.q_dot_a_[
self.left_ankle_indices] # 左脚踝关节串联速度
qdot_a_s[2:] = self.robot_data_.q_dot_a_[
self.right_ankle_indices] # 右脚踝关节串联速度
kp = np.zeros(4) # 串联关节刚度
kd = np.zeros(4) # 串联关节阻尼
kp[:2] = self.robot_data_.joint_kp_p_[self.left_ankle_indices -
self.floating_base_dof]
kp[2:] = self.robot_data_.joint_kp_p_[self.right_ankle_indices -
self.floating_base_dof]
kd[:2] = self.robot_data_.joint_kd_p_[self.left_ankle_indices -
self.floating_base_dof]
kd[2:] = self.robot_data_.joint_kd_p_[self.right_ankle_indices -
self.floating_base_dof]
tor_d_s = kp * (q_d_s - q_a_s) + kd * (qdot_d_s - qdot_a_s)
if self.debug:
print("-" * 20 + "串转并联前" + "-" * 20)
print("q_d_s:", q_d_s)
print("qdot_d_s:", qdot_d_s)
print("tor_d_s:", tor_d_s)
# 串转并计算
self.fun_s2p.set_s_des(q_d_s, qdot_d_s, tor_d_s)
self.fun_s2p.calc_joint_pos_ref()
self.fun_s2p.calc_joint_tor_des()
success, q_d_p, qdot_d_p, tor_d_p = self.fun_s2p.get_p_des()
q_d_p = (tor_d_p - self.ankle_kd_p *
(qdot_d_p - self.qdot_a_p)) / self.ankle_kp_p + self.q_a_p
if self.debug:
print("-" * 20 + "串转并联后" + "-" * 20)
print("q_d_p:", q_d_p)
print("qdot_d_p:", qdot_d_p)
print("tor_d_p:", tor_d_p)
# 用并联关节命令覆盖原来的串联命令
self.robot_data_.q_d_[self.left_ankle_indices] = q_d_p[:2] # 左脚踝关节并联命令
self.robot_data_.q_d_[self.right_ankle_indices] = q_d_p[2:] # 右脚踝关节并联命令
self.robot_data_.q_dot_d_[self.left_ankle_indices] = qdot_d_p[:2]
self.robot_data_.q_dot_d_[self.right_ankle_indices] = qdot_d_p[2:]
# self.robot_data_.tau_d_[self.left_ankle_indices] = tor_d_p[:2]
# self.robot_data_.tau_d_[self.right_ankle_indices] = tor_d_p[2:]
self.robot_data_.tau_d_[self.left_ankle_indices] = 0.0
self.robot_data_.tau_d_[self.right_ankle_indices] = 0.0
# 更新脚踝Kp,Kd
self.robot_data_.joint_kp_p_[
self.left_ankle_indices -
self.floating_base_dof] = self.ankle_kp_p[:2]
self.robot_data_.joint_kp_p_[
self.right_ankle_indices -
self.floating_base_dof] = self.ankle_kp_p[2:]
self.robot_data_.joint_kd_p_[
self.left_ankle_indices -
self.floating_base_dof] = self.ankle_kd_p[:2]
self.robot_data_.joint_kd_p_[
self.right_ankle_indices -
self.floating_base_dof] = self.ankle_kd_p[2:]
def update_robot_cmd(self, flag: ControlFlag):
"""更新机器人控制命令"""
if flag.enable:
# 使用getattr设置默认值避免AttributeError
x_command = getattr(flag, 'x_speed_command')
y_command = getattr(flag, 'y_speed_command')
yaw_command = getattr(flag, 'yaw_speed_command')
self.robot_data_.walk_cmd_ = [x_command, y_command, yaw_command]
self.robot_data_.control_flag.fsm_state_command = flag.fsm_state_command
def get_robot_interface(robot_data: RobotData, config_path: str) -> RobotInterface:
"""工厂函数,返回机器人接口实例"""
return RobotInterfaceImpl(robot_data, config_path)