Files
tienkung-szu/xSIM_MUJOCO/scripts/simulator_view_asyn.py
2026-03-27 16:10:51 +08:00

966 lines
40 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
MuJoCo机器人控制脚本
实现位置和力矩混合控制,读取传感器数据
"""
import mujoco
import mujoco.viewer
import time
import numpy as np
import matplotlib.pyplot as plt
from collections import deque
import threading
import rclpy
from rclpy.node import Node
# import message_filters
from bodyctrl_msgs.msg import MotorCtrl, MotorStatus,MotorStatusMsg, CmdMotorCtrl, Imu, Euler
from queue import Queue, Full
from pynput import keyboard
from scipy.spatial.transform import Rotation
from std_msgs.msg import String
import os
from elastic_band import ElasticBand
import argparse
def quaternion_to_euler_scipy(w, x, y, z, degrees=False):
"""
使用SciPy进行四元数转欧拉角
"""
# 创建旋转对象 (注意顺序: [x, y, z, w])
rotation = Rotation.from_quat([x, y, z, w])
# 转换为欧拉角 (顺序: 'xyz' 对应 roll, pitch, yaw)
euler_angles = rotation.as_euler('xyz', degrees=degrees)
return euler_angles[0], euler_angles[1], euler_angles[2]
class RobotSimulator:
def __init__(self, model_path, node: Node | None = None, debug=False, robot_config='full'):
self.debug = debug
# 加载模型
self.model = mujoco.MjModel.from_xml_path(model_path)
self.data = mujoco.MjData(self.model)
self.default_timestep = self.model.opt.timestep
# 初始化控制模式 (0=(位置控制(仅上半身)和力控都存在), 1=全身仅力矩控制)
self.control_mode = 1
self.execution_mode = "torque_pvd"
self.direct_position_states = {"XSIMRUN"}
self.torque_disable_mask = int(self.model.opt.disableactuator)
self.direct_position_disable_mask = 4
# 记录数据用于绘图
record_length = 10000
self.time_history = deque(maxlen=record_length)
self.joint_pos_history = deque(maxlen=record_length)
self.joint_vel_history = deque(maxlen=record_length)
self.joint_torque_history = deque(maxlen=record_length)
self.imu_orient_history = deque(maxlen=record_length)
self.imu_accel_history = deque(maxlen=record_length)
self.imu_pos_history = deque(maxlen=record_length)
# 根据机器人配置设置关节映射
if robot_config == '21':
# 21自由度配置没有手腕关节
self.motor_id_to_joint = {
1: 'head_roll_joint',
2: 'head_pitch_joint',
3: 'head_yaw_joint',
11: 'shoulder_pitch_l_joint',
12: 'shoulder_roll_l_joint',
13: 'shoulder_yaw_l_joint',
14: 'elbow_pitch_l_joint',
21: 'shoulder_pitch_r_joint',
22: 'shoulder_roll_r_joint',
23: 'shoulder_yaw_r_joint',
24: 'elbow_pitch_r_joint',
33: 'waist_yaw_joint',
51: 'hip_pitch_l_joint',
52: 'hip_roll_l_joint',
53: 'hip_yaw_l_joint',
54: 'knee_pitch_l_joint',
55: 'ankle_pitch_l_joint',
56: 'ankle_roll_l_joint',
61: 'hip_pitch_r_joint',
62: 'hip_roll_r_joint',
63: 'hip_yaw_r_joint',
64: 'knee_pitch_r_joint',
65: 'ankle_pitch_r_joint',
66: 'ankle_roll_r_joint'
}
#双臂关节名称
self.arm_joint_names = [
'shoulder_pitch_l_joint', 'shoulder_roll_l_joint',
'shoulder_yaw_l_joint', 'elbow_pitch_l_joint',
'shoulder_pitch_r_joint', 'shoulder_roll_r_joint',
'shoulder_yaw_r_joint', 'elbow_pitch_r_joint',
]
#腰部关节名
self.waist_joint_names = [
'waist_yaw_joint',
]
else:
# 全配置(包含手腕关节)
self.motor_id_to_joint = {
1: 'head_roll_joint',
2: 'head_pitch_joint',
3: 'head_yaw_joint',
11: 'shoulder_pitch_l_joint',
12: 'shoulder_roll_l_joint',
13: 'shoulder_yaw_l_joint',
14: 'elbow_pitch_l_joint',
15: 'elbow_yaw_l_joint',
16: 'wrist_pitch_l_joint',
17: 'wrist_roll_l_joint',
21: 'shoulder_pitch_r_joint',
22: 'shoulder_roll_r_joint',
23: 'shoulder_yaw_r_joint',
24: 'elbow_pitch_r_joint',
25: 'elbow_yaw_r_joint',
26: 'wrist_pitch_r_joint',
27: 'wrist_roll_r_joint',
# 31: 'waist_yaw_joint',
# 32: 'waist_roll_joint',
# 33: 'waist_pitch_joint',
33: 'waist_yaw_joint',
32: 'waist_roll_joint',
31: 'waist_pitch_joint',
51: 'hip_pitch_l_joint',
52: 'hip_roll_l_joint',
53: 'hip_yaw_l_joint',
54: 'knee_pitch_l_joint',
55: 'ankle_pitch_l_joint',
56: 'ankle_roll_l_joint',
61: 'hip_pitch_r_joint',
62: 'hip_roll_r_joint',
63: 'hip_yaw_r_joint',
64: 'knee_pitch_r_joint',
65: 'ankle_pitch_r_joint',
66: 'ankle_roll_r_joint'
}
#双臂关节名称
self.arm_joint_names = [
'shoulder_pitch_l_joint', 'shoulder_roll_l_joint',
'shoulder_yaw_l_joint', 'elbow_pitch_l_joint', 'elbow_yaw_l_joint',
'wrist_pitch_l_joint', 'wrist_roll_l_joint',
'shoulder_pitch_r_joint', 'shoulder_roll_r_joint',
'shoulder_yaw_r_joint', 'elbow_pitch_r_joint', 'elbow_yaw_r_joint',
'wrist_pitch_r_joint', 'wrist_roll_r_joint'
]
#腰部关节名
self.waist_joint_names = [
'waist_yaw_joint', 'waist_roll_joint', 'waist_pitch_joint'
]
# 关节到电机ID的映射
self.joint_to_motor_id = {
joint: id
for id, joint in self.motor_id_to_joint.items()
}
#腿部关节名称
self.leg_joint_names = [
'hip_roll_l_joint', 'hip_pitch_l_joint', 'hip_yaw_l_joint',
'knee_pitch_l_joint', 'ankle_pitch_l_joint', 'ankle_roll_l_joint',
'hip_roll_r_joint', 'hip_pitch_r_joint', 'hip_yaw_r_joint',
'knee_pitch_r_joint', 'ankle_pitch_r_joint', 'ankle_roll_r_joint'
]
# 获取传感器/执行器索引
self.get_all_robot_info()
self.sim_view_lock = threading.Lock()
# 升降带
self.elastic_band = ElasticBand(enable=True)
# 根据配置设置弹性带连接的链接
if robot_config == '21':
self.band_attached_link = self.model.body("waist_yaw_link").id
else:
self.band_attached_link = self.model.body("waist_pitch_link").id
# 启动可视化器
self.paused = False
# self.viewer = mujoco.viewer.launch_passive(self.model, self.data)
self.viewer = mujoco.viewer.launch_passive(
self.model, self.data, key_callback=self.elastic_band.MujuocoKeyCallback
)
self.view_dt = 0.03
# 线程控制变量
self.joint_cmd_lock = threading.Lock()
self.stop_event = threading.Event()
#创建ROS相关话题
self.node = node
if self.node:
self.node.create_subscription(CmdMotorCtrl, 'arm/cmd_ctrl',
self.joint_cmd_callback, 10)
self.node.create_subscription(CmdMotorCtrl, 'leg/cmd_ctrl',
self.joint_cmd_callback, 10)
self.node.create_subscription(CmdMotorCtrl, 'waist/cmd_ctrl',
self.joint_cmd_callback, 10)
self.node.create_subscription(String, '/control/fsm_state',
self.fsm_state_callback, 10)
#创建发布者
self.arm_status_pub = self.node.create_publisher(
MotorStatusMsg, 'arm/status', 20)
self.leg_status_pub = self.node.create_publisher(
MotorStatusMsg, 'leg/status', 20)
self.waist_status_pub = self.node.create_publisher(
MotorStatusMsg, 'waist/status', 10)
self.imu_status_pub = self.node.create_publisher(
Imu, 'imu/status', 10)
# 传感器返回的机器人状态
self.joint_positions = {}
self.joint_velocities = {}
self.joint_torques = {}
self.imu_orientation = None
self.imu_position = None
self.imu_angular_velocity = None
self.imu_linear_velocity = None
self.imu_linear_acceleration = None
self.imu_magnetometer = None
# 关节命令
self.raw_joint_commands = Queue()
self.joint_commands = {}
self.last_raw_joint_commands = {}
# 发布线程相关
self.pub_thread = None
self.pub_thread_lock = threading.Lock()
self.pub_thread_running = False
self.last_sensor_data = None
self.sensor_data_lock = threading.Lock()
# 初始化控制器
self.init_controller()
def init_controller(self):
self._set_execution_mode("torque_pvd")
def _set_execution_mode(self, mode):
if mode == self.execution_mode:
return
if mode == "direct_position":
self.model.opt.disableactuator = self.direct_position_disable_mask
self.model.opt.timestep = 0.005
self.execution_mode = "direct_position"
print("切换到直接位置控制模式:启用 pos_* 执行器,禁用 motor_* 执行器")
return
self.model.opt.disableactuator = self.torque_disable_mask
self.model.opt.timestep = self.default_timestep
self.execution_mode = "torque_pvd"
print("切换到PVD力矩控制模式启用 motor_* 执行器,禁用 pos_* 执行器")
def switch_to_torque_control(self):
"""切换到力矩控制:禁用位置控制器"""
self.control_mode = 1
self._set_execution_mode("torque_pvd")
def switch_to_position_torque_control(self):
"""切换到位置力控混合控制模式:启用位置控制器"""
self.control_mode = 0
self._set_execution_mode("direct_position")
def fsm_state_callback(self, msg: String):
state_name = msg.data.strip()
if state_name in self.direct_position_states:
self._set_execution_mode("direct_position")
else:
self._set_execution_mode("torque_pvd")
def start_keyboard_listener(self):
"""使用 pynput 监听键盘"""
def on_press(key):
try:
if key == keyboard.Key.num_lock:
self.paused = not self.paused
print(f"仿真已{'暂停' if self.paused else '继续'}")
if key == keyboard.Key.end:
if self.control_mode == 1:
self.switch_to_position_torque_control()
else:
self.switch_to_torque_control()
except Exception as e:
print(f"键盘监听错误: {e}")
def listen():
with keyboard.Listener(on_press=on_press) as listener:
while not self.stop_event.is_set():
time.sleep(0.1)
listener.stop()
thread = threading.Thread(target=listen, daemon=True)
thread.start()
def start_publishing(self):
"""启动发布线程"""
if not self.node:
return
with self.pub_thread_lock:
if not self.pub_thread_running:
self.pub_thread_running = True
self.pub_thread = threading.Thread(target=self._publish_thread,
daemon=True)
self.pub_thread.start()
def stop_publishing(self):
"""停止发布线程"""
with self.pub_thread_lock:
if self.pub_thread_running:
self.pub_thread_running = False
if self.pub_thread:
self.pub_thread.join(timeout=1.0)
def _publish_thread(self):
"""发布状态的独立线程"""
pub_freq = 400 # 100Hz发布频率
pub_interval = 1.0 / pub_freq
sensor_data = None
while self.pub_thread_running and not self.stop_event.is_set():
start_time = time.perf_counter()
# 获取最新的传感器数据
with self.sensor_data_lock:
if self.last_sensor_data is not None:
sensor_data = self.last_sensor_data.copy()
# 为每个关节组发布状态
self._publish_joint_group(self.arm_joint_names,
self.arm_status_pub, sensor_data)
self._publish_joint_group(self.waist_joint_names,
self.waist_status_pub, sensor_data)
self._publish_joint_group(self.leg_joint_names,
self.leg_status_pub, sensor_data)
# 发布Imu
self._publish_imu_status(self.imu_status_pub, sensor_data)
# 控制发布频率
elapsed = time.perf_counter() - start_time
sleep_time = pub_interval - elapsed
if sleep_time > 0:
time.sleep(sleep_time)
def _publish_imu_status(self, publisher, sensor_data):
if publisher and sensor_data:
orientaion = sensor_data['imu_orientation']
# position = sensor_data['imu_position']
imu_angular_velocity = sensor_data['imu_angular_velocity']
imu_linear_acceleration = sensor_data['imu_linear_acceleration']
imu_msg: Imu = Imu()
imu_msg.header.stamp = self.node.get_clock().now().to_msg()
imu_msg.orientation.x = orientaion[1]
imu_msg.orientation.y = orientaion[2]
imu_msg.orientation.z = orientaion[3]
imu_msg.orientation.w = orientaion[0]
# 四元数转换为欧拉角
r, p, y = quaternion_to_euler_scipy(*list(orientaion))
imu_msg.euler = Euler()
imu_msg.euler.roll = r
imu_msg.euler.pitch = p
imu_msg.euler.yaw = y
imu_msg.angular_velocity.x = imu_angular_velocity[0]
imu_msg.angular_velocity.y = imu_angular_velocity[1]
imu_msg.angular_velocity.z = imu_angular_velocity[2]
imu_msg.linear_acceleration.x = imu_linear_acceleration[0]
imu_msg.linear_acceleration.y = imu_linear_acceleration[1]
imu_msg.linear_acceleration.z = imu_linear_acceleration[2]
try:
publisher.publish(imu_msg)
except Exception as e:
print(f"发布 imu 状态时出错: {e}")
def _publish_joint_group(self, joint_names, publisher, sensor_data):
"""发布指定关节组的状态"""
if publisher and sensor_data:
joint_positions = sensor_data['joint_positions']
joint_velocities = sensor_data['joint_velocities']
joint_torques = sensor_data['joint_torques']
#构造关节状态消息
motor_status_msg = MotorStatusMsg()
motor_status_msg.header.stamp = self.node.get_clock().now().to_msg()
for joint_name in joint_names:
if joint_name in joint_positions and joint_name in joint_velocities:
motor_status = self.construct_motor_status(
joint_name, joint_positions[joint_name],
joint_velocities[joint_name], joint_torques[joint_name])
motor_status_msg.status.append(motor_status)
try:
publisher.publish(motor_status_msg)
except Exception as e:
print(f"发布状态{motor_status_msg}时出错: {e}")
def get_all_robot_info(self):
# 获取关节索引
self.joint_names = self.arm_joint_names + self.waist_joint_names + self.leg_joint_names
# 获取执行器索引
self.pos_actuator_names = [f"pos_{name}" for name in self.joint_names]
self.motor_actuator_names = [
f"motor_{name}" for name in self.joint_names
]
self.pos_actuator_indices_map = {}
self.motor_actuator_indices_map = {}
for name in self.pos_actuator_names:
idx = mujoco.mj_name2id(self.model, mujoco.mjtObj.mjOBJ_ACTUATOR,
name)
self.pos_actuator_indices_map[name] = idx
for name in self.motor_actuator_names:
idx = mujoco.mj_name2id(self.model, mujoco.mjtObj.mjOBJ_ACTUATOR,
name)
self.motor_actuator_indices_map[name] = idx
# 获取执行器增益
self.original_gains = self.model.actuator_gainprm.copy()
self.original_biases = self.model.actuator_biasprm.copy()
# 获取传感器索引
self.joint_pos_sensor_names = [
f"{name}_pos" for name in self.joint_names
]
self.joint_vel_sensor_names = [
f"{name}_vel" for name in self.joint_names
]
self.joint_torque_sensor_names = [
f"{name}_torque" for name in self.joint_names
]
self.joint_pos_sensor_indices_map = {}
self.joint_vel_sensor_indices_map = {}
self.joint_torque_sensor_indices_map = {}
for name in self.joint_pos_sensor_names:
idx = mujoco.mj_name2id(self.model, mujoco.mjtObj.mjOBJ_SENSOR,
name)
self.joint_pos_sensor_indices_map[name] = idx
for name in self.joint_vel_sensor_names:
idx = mujoco.mj_name2id(self.model, mujoco.mjtObj.mjOBJ_SENSOR,
name)
self.joint_vel_sensor_indices_map[name] = idx
for name in self.joint_torque_sensor_names:
idx = mujoco.mj_name2id(self.model, mujoco.mjtObj.mjOBJ_SENSOR,
name)
self.joint_torque_sensor_indices_map[name] = idx
# IMU传感器索引
self.imu_orient_idx = mujoco.mj_name2id(self.model,
mujoco.mjtObj.mjOBJ_SENSOR,
'orientation')
self.imu_orient_address = self.model.sensor_adr[self.imu_orient_idx]
self.imu_pos_idx = mujoco.mj_name2id(self.model,
mujoco.mjtObj.mjOBJ_SENSOR,
'position')
self.imu_pos_address = self.model.sensor_adr[self.imu_pos_idx]
self.imu_gyro_idx = mujoco.mj_name2id(self.model,
mujoco.mjtObj.mjOBJ_SENSOR,
'angular-velocity')
self.imu_gyro_address = self.model.sensor_adr[self.imu_gyro_idx]
self.imu_vel_idx = mujoco.mj_name2id(self.model,
mujoco.mjtObj.mjOBJ_SENSOR,
'linear-velocity')
self.imu_vel_address = self.model.sensor_adr[self.imu_vel_idx]
self.imu_accel_idx = mujoco.mj_name2id(self.model,
mujoco.mjtObj.mjOBJ_SENSOR,
'linear-acceleration')
self.imu_accel_address = self.model.sensor_adr[self.imu_accel_idx]
self.imu_mag_idx = mujoco.mj_name2id(self.model,
mujoco.mjtObj.mjOBJ_SENSOR,
'magnetometer')
self.imu_mag_address = self.model.sensor_adr[self.imu_mag_idx]
def pvd_controller(self, joint_name, pos_d, spd_d, tor_d, kp, kd):
"""
PVD控制器实现函数用于计算关节的输出扭矩
参数:
joint_name: 关节名称,用于获取关节状态
pos_d: 期望位置
spd_d: 期望速度
tor_d: 期望扭矩
kp: 位置比例增益系数
kd: 速度比例增益系数
返回值:
output_torque: 计算得到的输出扭矩值
"""
status = self.get_status(joint_name)
pos_a = status['position']
spd_a = status['velocity']
output_torque = kp * (pos_d - pos_a) + kd * (spd_d - spd_a) + tor_d
return output_torque
def joint_cmd_callback(self, msg: CmdMotorCtrl):
cmd: MotorCtrl = None
# 每一组命令全部接受完再释放锁
with self.joint_cmd_lock:
current_time = self.node.get_clock().now().to_msg()
msg_time = msg.header.stamp
time_diff = (current_time.sec - msg_time.sec) * 1000000000 + (current_time.nanosec - msg_time.nanosec)
time_diff_ms = time_diff / 1000000.0
if time_diff_ms > 100:
print(f"[WARNING] Time difference: {time_diff_ms} ms, abandon this command")
return
for cmd in msg.cmds:
try:
self.raw_joint_commands.put_nowait(cmd)
except Full:
print(f"raw joint command 队列已满")
self.raw_joint_commands.get_nowait()
self.raw_joint_commands.put_nowait(cmd)
def calc_motor_cmd(self, joint_name, pos, spd, tor, kp, kd):
# joint_name = self.motor_id_to_joint[name_id]
# 计算下发力矩
target_torque = self.pvd_controller(joint_name, pos, spd, tor, kp, kd)
self.joint_commands[joint_name] = target_torque
if self.debug:
print(f"{joint_name} torque : {target_torque}\n")
def set_motor_cmd(self):
with self.joint_cmd_lock:
while not self.raw_joint_commands.empty():
cmd: MotorCtrl = self.raw_joint_commands.get_nowait()
name = cmd.name
pos = cmd.pos
spd = cmd.spd
tor = cmd.tor
kp = cmd.kp
kd = cmd.kd
joint_name = self.motor_id_to_joint[name]
self.last_raw_joint_commands[joint_name] = { "pos": pos, "spd": spd, "tor": tor, "kp": kp, "kd": kd}
if self.execution_mode == "direct_position":
for motor_index in self.motor_actuator_indices_map.values():
if motor_index != -1:
self.data.ctrl[motor_index] = 0.0
for joint_name in self.joint_names:
if joint_name in self.last_raw_joint_commands:
pos = self.last_raw_joint_commands[joint_name].get("pos")
pos_name = f"pos_{joint_name}"
pos_index = self.pos_actuator_indices_map.get(pos_name, -1)
if pos_index != -1:
self.data.ctrl[pos_index] = pos
return
for joint_name in self.joint_names:
if joint_name in self.last_raw_joint_commands:
pos = self.last_raw_joint_commands[joint_name].get("pos")
spd = self.last_raw_joint_commands[joint_name].get("spd")
tor = self.last_raw_joint_commands[joint_name].get("tor")
kp = self.last_raw_joint_commands[joint_name].get("kp")
kd = self.last_raw_joint_commands[joint_name].get("kd")
self.calc_motor_cmd(joint_name, pos, spd, tor, kp, kd)
motor_name = f"motor_{joint_name}"
motor_index = self.motor_actuator_indices_map[motor_name]
if motor_index != -1:
# 确保电机存在且已收到命令
self.data.ctrl[motor_index] = self.joint_commands[joint_name]
def viewer_thread(self):
try:
while self.viewer.is_running() and not self.stop_event.is_set():
with self.sim_view_lock:
self.viewer.sync()
time.sleep(self.view_dt)
except Exception as e:
print(f"Viewer thread error: {e}")
finally:
print("Viewer thread stopped")
def read_sensors(self):
"""读取所有传感器数据"""
# 读取关节传感器数据
for name in self.joint_names:
pos_sensor_name = f"{name}_pos"
vel_sensor_name = f"{name}_vel"
torque_sensor_name = f"{name}_torque"
pos_idx = self.joint_pos_sensor_indices_map[pos_sensor_name]
vel_idx = self.joint_vel_sensor_indices_map[vel_sensor_name]
torque_idx = self.joint_torque_sensor_indices_map[
torque_sensor_name]
self.joint_positions[name] = self.data.sensordata[pos_idx]
self.joint_velocities[name] = self.data.sensordata[vel_idx]
self.joint_torques[name] = self.data.sensordata[torque_idx]
# 读取IMU数据
self.imu_orientation = self.data.sensordata[self.imu_orient_address:self.
imu_orient_address + 4]
self.imu_position = self.data.sensordata[self.
imu_pos_address:self.imu_pos_address +
3]
self.imu_angular_velocity = self.data.sensordata[self.imu_gyro_address:self
.imu_gyro_address + 3]
self.imu_linear_velocity = self.data.sensordata[self.imu_vel_address:self.
imu_vel_address + 3]
self.imu_linear_acceleration = self.data.sensordata[
self.imu_accel_address:self.imu_accel_address + 3]
self.imu_magnetometer = self.data.sensordata[self.imu_mag_address:self.
imu_mag_address + 3]
# 更新传感器数据供发布线程使用
plot_data = {
'joint_positions': np.array(list(self.joint_positions.values())),
'joint_velocities': np.array(list(self.joint_velocities.values())),
'joint_torques': np.array(list(self.joint_torques.values())),
'imu_orientation': self.imu_orientation,
'imu_position': self.imu_position,
'imu_angular_velocity': self.imu_angular_velocity,
'imu_linear_velocity': self.imu_linear_velocity,
'imu_linear_acceleration': self.imu_linear_acceleration,
'imu_magnetometer': self.imu_magnetometer
}
with self.sensor_data_lock:
self.last_sensor_data = {
'joint_positions': self.joint_positions.copy(),
'joint_velocities': self.joint_velocities.copy(),
'joint_torques': self.joint_torques.copy(),
'imu_orientation': self.imu_orientation,
'imu_position': self.imu_position,
'imu_angular_velocity': self.imu_angular_velocity,
'imu_linear_velocity': self.imu_linear_velocity,
'imu_linear_acceleration': self.imu_linear_acceleration,
'imu_magnetometer': self.imu_magnetometer
}
return plot_data
def construct_motor_status(self, joint_name, pos, speed, torque=None):
"""
构建电机状态消息
Args:
joint_name (str): 关节名称
pos (float): 位置值
speed (float): 速度值
Returns:
MotorStatus: 电机状态消息
"""
motor_status = MotorStatus()
motor_status.name = self.joint_to_motor_id.get(joint_name, int())
motor_status.pos = pos
motor_status.speed = speed
motor_status.current = torque
return motor_status
def get_status(self, joint_name):
# Todo: 处理还未收到传感器状态的情况
with self.sensor_data_lock:
joint_positions = self.last_sensor_data['joint_positions'].copy()
joint_velocities = self.last_sensor_data['joint_velocities'].copy()
joint_torques = self.last_sensor_data['joint_torques'].copy()
pos = joint_positions[joint_name]
vel = joint_velocities[joint_name]
torque = joint_torques[joint_name]
return {'position': pos, 'velocity': vel, 'torque': torque}
def simulate_thread(self):
"""运行控制循环"""
# 控制参数
time_step = 0
control_frequency = 100 # 100Hz控制频率
sensor_frequency = 100 # 100Hz传感器读取频率
control_interval = 1 / control_frequency
target_time = 0
# last_control_time = 0
try:
while self.viewer.is_running() and not self.stop_event.is_set():
step_start = time.perf_counter()
target_time += self.model.opt.timestep
with self.sim_view_lock:
# 读取传感器数据
if int(1.0 / self.model.opt.timestep * target_time) % int(
1.0 / self.model.opt.timestep /
sensor_frequency) == 0:
sensor_data = self.read_sensors()
# 存储数据用于绘图
self.time_history.append(target_time)
self.joint_pos_history.append(
sensor_data['joint_positions'].copy())
self.joint_vel_history.append(
sensor_data['joint_velocities'].copy())
self.joint_torque_history.append(
sensor_data['joint_torques'].copy())
self.imu_orient_history.append(
sensor_data['imu_orientation'].copy())
self.imu_accel_history.append(
sensor_data['imu_linear_acceleration'].copy())
self.imu_pos_history.append(
sensor_data['imu_position'].copy())
# 打印部分传感器数据
if time_step % 100 == 0 and self.debug:
print(f"Time: {target_time:.3f}s")
print(
f"Control Mode: {'Position' if self.control_mode == 0 else 'Torque'}"
)
print(
f"Joint 0 Pos: {sensor_data['joint_positions'][0]:.3f}, "
f"Vel: {sensor_data['joint_velocities'][0]:.3f}, "
f"Torque: {sensor_data['joint_torques'][0]:.3f}"
)
print(
f"IMU Orientation: [{sensor_data['imu_orientation'][0]:.3f}, "
f"{sensor_data['imu_orientation'][1]:.3f}, "
f"{sensor_data['imu_orientation'][2]:.3f}, "
f"{sensor_data['imu_orientation'][3]:.3f}]")
print(
f"IMU Acceleration: [{sensor_data['imu_linear_acceleration'][0]:.3f}, "
f"{sensor_data['imu_linear_acceleration'][1]:.3f}, "
f"{sensor_data['imu_linear_acceleration'][2]:.3f}]"
)
print("-" * 50)
# 添加虚拟力
self.add_virtual_force()
# 控制命令写入
self.set_motor_cmd()
# 前进仿真一步
if not self.paused:
mujoco.mj_step(self.model, self.data)
else:
mujoco.mj_forward(self.model, self.data)
time_step += 1
# 控制仿真速度
time_until_next_step = self.model.opt.timestep - (time.perf_counter() -
step_start)
print(f'sim timestep: {time_until_next_step}')
if time_until_next_step > 0:
time.sleep(time_until_next_step)
except Exception as e:
print(f"Simulation thread error: {e}")
finally:
print("Simulation thread stopped")
def add_virtual_force(self):
if self.elastic_band.enable:
self.data.xfrc_applied[self.band_attached_link, :3] = self.elastic_band.Advance(
self.data.qpos[0:3], self.data.qvel[0:3]
)
def plot_sensor_data(self):
"""绘制传感器数据"""
if len(self.time_history) == 0:
return
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
fig.suptitle('Sensor Data')
time_array = np.array(list(self.time_history))
# 绘制关节位置
if len(self.joint_pos_history) > 0:
pos_array = np.array(list(self.joint_pos_history))
axes[0, 0].plot(time_array, pos_array[:, 0], label='Joint 0')
axes[0, 0].plot(time_array, pos_array[:, 1], label='Joint 1')
axes[0, 0].set_title('Joint Positions')
axes[0, 0].set_xlabel('Time (s)')
axes[0, 0].set_ylabel('Position (rad)')
axes[0, 0].legend()
axes[0, 0].grid(True)
# 绘制关节速度
if len(self.joint_vel_history) > 0:
vel_array = np.array(list(self.joint_vel_history))
axes[0, 1].plot(time_array, vel_array[:, 0], label='Joint 0')
axes[0, 1].plot(time_array, vel_array[:, 1], label='Joint 1')
axes[0, 1].set_title('Joint Velocities')
axes[0, 1].set_xlabel('Time (s)')
axes[0, 1].set_ylabel('Velocity (rad/s)')
axes[0, 1].legend()
axes[0, 1].grid(True)
# 绘制IMU方向
if len(self.imu_orient_history) > 0:
orient_array = np.array(list(self.imu_orient_history))
axes[1, 0].plot(time_array, orient_array[:, 0], label='w')
axes[1, 0].plot(time_array, orient_array[:, 1], label='x')
axes[1, 0].plot(time_array, orient_array[:, 2], label='y')
axes[1, 0].plot(time_array, orient_array[:, 3], label='z')
axes[1, 0].set_title('IMU Orientation (Quaternion)')
axes[1, 0].set_xlabel('Time (s)')
axes[1, 0].set_ylabel('Quaternion')
axes[1, 0].legend()
axes[1, 0].grid(True)
# # 绘制IMU加速度
# if len(self.imu_accel_history) > 0:
# accel_array = np.array(list(self.imu_accel_history))
# axes[1, 1].plot(time_array, accel_array[:, 0], label='X')
# axes[1, 1].plot(time_array, accel_array[:, 1], label='Y')
# axes[1, 1].plot(time_array, accel_array[:, 2], label='Z')
# axes[1, 1].set_title('IMU Linear Acceleration')
# axes[1, 1].set_xlabel('Time (s)')
# axes[1, 1].set_ylabel('Acceleration (m/s²)')
# axes[1, 1].legend()
# axes[1, 1].grid(True)
# 绘制IMU位置
if len(self.imu_pos_history) > 0:
pos_array = np.array(list(self.imu_pos_history))
axes[1, 1].plot(time_array, pos_array[:, 0], label='X')
axes[1, 1].plot(time_array, pos_array[:, 1], label='Y')
axes[1, 1].plot(time_array, pos_array[:, 2], label='Z')
axes[1, 1].set_title('IMU Linear Position')
axes[1, 1].set_xlabel('Time (s)')
axes[1, 1].set_ylabel('Position (m)')
axes[1, 1].legend()
axes[1, 1].grid(True)
plt.tight_layout()
plt.show()
def start(self):
from threading import Thread
viewer_thread = Thread(target=self.viewer_thread)
sim_thread = Thread(target=self.simulate_thread)
# 设置为守护线程,确保主线程结束时它们也会结束
viewer_thread.daemon = True
sim_thread.daemon = True
try:
# 设置初始姿态
# self.set_keyframe("zero") # 或者 "zero_height" 或 "zero_standup" 'zero'
# print("机器人已设置为zero姿态")
self.start_keyboard_listener()
viewer_thread.start()
sim_thread.start()
# 启动发布线程
self.start_publishing()
# 主线程等待,直到收到 KeyboardInterrupt
while viewer_thread.is_alive() and sim_thread.is_alive():
time.sleep(0.1)
except KeyboardInterrupt:
print("\nReceived KeyboardInterrupt, stopping simulation...")
finally:
# 设置停止事件
self.stop_event.set()
# 等待线程结束
viewer_thread.join(timeout=1.0)
sim_thread.join(timeout=1.0)
# 停止发布线程
self.stop_publishing()
# 关闭 viewer
try:
self.viewer.close()
except:
pass
# 绘制图表
if self.debug:
print("Plotting sensor data...")
self.plot_sensor_data()
print("Simulation finished")
def set_keyframe(self, key_name):
"""
将机器人设置为指定的关键帧姿态
参数:
key_name: 关键帧名称 (例如 "zero", "zero_height", "zero_standup")
"""
# 查找关键帧的索引
key_id = mujoco.mj_name2id(self.model, mujoco.mjtObj.mjOBJ_KEY,
key_name)
if key_id == -1:
print(f"关键帧 '{key_name}' 未找到")
return False
# # # 将关键帧的qpos值复制到当前状态
self.data.qpos[:] = self.model.key_qpos[key_id]
# 将关键帧的ctrl值复制到控制向量
if self.model.key_ctrl is not None:
self.data.ctrl[:] = self.model.key_ctrl[key_id]
mujoco.mj_forward(self.model, self.data)
# 更新系统状态
# mujoco.mj_resetDataKeyframe(self.model, self.data, key_id)
return True
def get_available_keyframes(self):
"""
获取所有可用的关键帧名称
"""
keyframes = []
for i in range(self.model.nkey):
# 获取关键帧名称
key_name = mujoco.mj_id2name(self.model, mujoco.mjtObj.mjOBJ_KEY,
i)
if key_name:
keyframes.append(key_name)
return keyframes
def _spin_wrapper(node):
"""ROS2 spin wrapper for graceful shutdown."""
try:
while rclpy.ok():
rclpy.spin_once(node, timeout_sec=0.001)
except Exception as e:
print(f"Error in ROS2 spin thread: {e}")
finally:
print("ROS2 spin thread terminated.")
def main():
# 设置模型注册表
model_registry = {
'evt2': '../resources/evt2/urdf/evt2.xml'
}
# 解析命令行参数
parser = argparse.ArgumentParser(description='MuJoCo机器人仿真器')
parser.add_argument('--model', '-m', type=str, default='evt2',
choices=list(model_registry.keys()),
help='要加载的机器人模型')
parser.add_argument('--config', '-c', type=str, default='full',
choices=['full', '21'],
help='机器人配置 (full=完整配置, 21=21自由度配置)')
args = parser.parse_args()
# 设置模型路径
script_dir = os.path.dirname(os.path.abspath(__file__))
model_path = os.path.join(script_dir, model_registry[args.model])
model_path = os.path.abspath(model_path)
print(f"正在加载模型: {model_path}")
print(f"机器人配置: {args.config}")
# 创建控制器
rclpy.init()
node = rclpy.create_node('mujoco_simulator_dex')
spin_thread = threading.Thread(target=_spin_wrapper,
args=(node, ),
daemon=True,
name="ROS2_Spin_Thread")
spin_thread.start()
simulator = RobotSimulator(model_path, node, debug=False, robot_config=args.config)
# 运行仿真
simulator.start()
if __name__ == "__main__":
main()