xbox_udp链路
This commit is contained in:
@@ -5,6 +5,7 @@
|
||||
包含:
|
||||
|
||||
- `udp_keyboard_sender.py`:从终端读取按键,编码 UDP 报文并发送
|
||||
- `udp_xbox_sender.py`:订阅 `/xbox_data`,把 Xbox 摇杆/按键转成 UDP 报文
|
||||
- `udp_loopback_node.py`:接收 UDP 报文,解码事件并计算目标值
|
||||
- `protocol.py`:自定义协议和状态结构
|
||||
- `config/udp_loopback.yaml`:本地测试配置
|
||||
@@ -40,6 +41,56 @@ cd /home/meiqi/tienkung/Deploy_Tienkung
|
||||
python3 udp_loopback/udp_keyboard_sender.py
|
||||
```
|
||||
|
||||
如果改成 Xbox 经 UDP 转发,则启动方式是:
|
||||
|
||||
1. 把 [dex_config.yaml](/home/meiqi/tienkung/Deploy_Tienkung/config/dex_config.yaml) 里的 `control_tool` 改成 `udp_loopback`
|
||||
2. 启动控制节点:
|
||||
|
||||
```bash
|
||||
cd /home/meiqi/tienkung/Deploy_Tienkung
|
||||
source /opt/ros/jazzy/setup.bash
|
||||
export ROS_DOMAIN_ID=10
|
||||
python3 rl_control_node_sim.py
|
||||
```
|
||||
|
||||
3. 启动 MuJoCo:
|
||||
|
||||
```bash
|
||||
cd /home/meiqi/tienkung/xSIM_MUJOCO
|
||||
source /opt/ros/jazzy/setup.bash
|
||||
export ROS_DOMAIN_ID=10
|
||||
python3 scripts/simulator_view_asyn.py -m evt2
|
||||
```
|
||||
|
||||
4. 启动手柄节点:
|
||||
|
||||
```bash
|
||||
source /opt/ros/jazzy/setup.bash
|
||||
export ROS_DOMAIN_ID=10
|
||||
ros2 run joy joy_node --ros-args -r joy:=/xbox_data
|
||||
```
|
||||
|
||||
5. 启动 UDP Xbox 转发:
|
||||
|
||||
```bash
|
||||
cd /home/meiqi/tienkung/Deploy_Tienkung
|
||||
source /opt/ros/jazzy/setup.bash
|
||||
export ROS_DOMAIN_ID=10
|
||||
python3 udp_loopback/udp_xbox_sender.py
|
||||
```
|
||||
|
||||
默认按键映射:
|
||||
|
||||
- `A -> mode_stride -> gotoWALKAMP`
|
||||
- `X -> pose_home -> gotoZERO`
|
||||
- `Y -> pose_hold -> gotoSTOP`
|
||||
- `B -> mode_dash -> gotoMYPOLICY`
|
||||
- `START -> trim_reset`
|
||||
- 左摇杆 Y -> 连续前后速度
|
||||
- 左摇杆 X -> 连续横移速度
|
||||
- 右摇杆 X -> 连续转向速度
|
||||
- 十字键左右 -> 高度增减
|
||||
|
||||
此时 UDP 接收结果会在接收侧被映射回现有 FSM 命令:
|
||||
|
||||
- `pose_home -> gotoZERO`
|
||||
@@ -60,6 +111,10 @@ python3 udp_loopback/udp_keyboard_sender.py
|
||||
- `sway_right`
|
||||
- `spin_left`
|
||||
- `spin_right`
|
||||
- `set_surge`
|
||||
- `set_sway`
|
||||
- `set_spin`
|
||||
- `set_lift`
|
||||
- `lift_up`
|
||||
- `lift_down`
|
||||
- `trim_reset`
|
||||
|
||||
@@ -5,6 +5,14 @@ sender:
|
||||
target_port: 31000
|
||||
source_tag: local_keys
|
||||
|
||||
xbox_sender:
|
||||
joy_topic: /xbox_data
|
||||
source_tag: xbox_udp
|
||||
deadzone: 0.10
|
||||
analog_epsilon: 0.01
|
||||
dpad_threshold: 0.50
|
||||
trigger_pressed_threshold: -0.50
|
||||
|
||||
receiver:
|
||||
listen_host: 127.0.0.1
|
||||
listen_port: 31000
|
||||
|
||||
@@ -10,7 +10,7 @@ from typing import Any, Dict
|
||||
|
||||
@dataclass
|
||||
class InputEnvelope:
|
||||
"""Small UDP payload carrying one encoded keyboard event."""
|
||||
"""Small UDP payload carrying one encoded input event."""
|
||||
|
||||
seq_id: int
|
||||
event_code: str
|
||||
|
||||
@@ -177,6 +177,22 @@ class UDPFSMController:
|
||||
self.motion_frame.spin_goal = min(
|
||||
self.max_spin, self.motion_frame.spin_goal + self.spin_step
|
||||
)
|
||||
elif event_code == "set_surge":
|
||||
self.motion_frame.surge_goal = max(
|
||||
-self.max_surge, min(self.max_surge, packet.drive_value)
|
||||
)
|
||||
elif event_code == "set_sway":
|
||||
self.motion_frame.sway_goal = max(
|
||||
-self.max_sway, min(self.max_sway, packet.drive_value)
|
||||
)
|
||||
elif event_code == "set_spin":
|
||||
self.motion_frame.spin_goal = max(
|
||||
-self.max_spin, min(self.max_spin, packet.drive_value)
|
||||
)
|
||||
elif event_code == "set_lift":
|
||||
self.motion_frame.lift_goal = max(
|
||||
self.min_lift, min(self.max_lift, packet.drive_value)
|
||||
)
|
||||
elif event_code == "lift_up":
|
||||
self.motion_frame.lift_goal = min(
|
||||
self.max_lift, self.motion_frame.lift_goal + self.lift_step
|
||||
|
||||
253
Deploy_Tienkung/udp_loopback/udp_xbox_sender.py
Normal file
253
Deploy_Tienkung/udp_loopback/udp_xbox_sender.py
Normal file
@@ -0,0 +1,253 @@
|
||||
"""ROS2 Joy -> localhost UDP bridge for Xbox control."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import socket
|
||||
from pathlib import Path
|
||||
from typing import Dict
|
||||
|
||||
import rclpy
|
||||
from rclpy.node import Node
|
||||
from rclpy.qos import HistoryPolicy, QoSProfile, ReliabilityPolicy
|
||||
from sensor_msgs.msg import Joy
|
||||
import yaml
|
||||
|
||||
try:
|
||||
from .protocol import InputEnvelope
|
||||
except ImportError: # pragma: no cover - direct script execution fallback
|
||||
from protocol import InputEnvelope
|
||||
|
||||
|
||||
class UDPXboxSender(Node):
|
||||
"""Subscribe to Joy messages and forward them as UDP loopback events."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
super().__init__("udp_xbox_sender")
|
||||
self.config: Dict[str, object] = {}
|
||||
self.seq_id = 0
|
||||
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
self.last_buttons: Dict[str, int] = {}
|
||||
self.last_dpad_h = 0.0
|
||||
|
||||
self._load_config()
|
||||
|
||||
qos_profile = QoSProfile(
|
||||
reliability=ReliabilityPolicy.RELIABLE,
|
||||
history=HistoryPolicy.KEEP_LAST,
|
||||
depth=10,
|
||||
)
|
||||
self.subscription = self.create_subscription(
|
||||
Joy, self.joy_topic, self._joy_callback, qos_profile
|
||||
)
|
||||
|
||||
self.get_logger().info(
|
||||
f"Forwarding {self.joy_topic} -> udp://{self.target_host}:{self.target_port}"
|
||||
)
|
||||
self.get_logger().info(
|
||||
"Buttons: A=WALKAMP X=ZERO Y=STOP B=MYPOLICY START=reset"
|
||||
)
|
||||
|
||||
def destroy_node(self) -> bool:
|
||||
if self.socket is not None:
|
||||
self.socket.close()
|
||||
return super().destroy_node()
|
||||
|
||||
def _load_config(self) -> None:
|
||||
udp_config_path = Path(__file__).resolve().parent / "config" / "udp_loopback.yaml"
|
||||
main_config_path = Path(__file__).resolve().parents[1] / "config" / "dex_config.yaml"
|
||||
|
||||
with udp_config_path.open("r", encoding="utf-8") as file:
|
||||
udp_config = yaml.safe_load(file) or {}
|
||||
with main_config_path.open("r", encoding="utf-8") as file:
|
||||
main_config = yaml.safe_load(file) or {}
|
||||
|
||||
sender_cfg = udp_config.get("sender", {})
|
||||
xbox_sender_cfg = udp_config.get("xbox_sender", {})
|
||||
xbox_cfg = main_config.get("xbox", {})
|
||||
|
||||
self.target_host = sender_cfg.get("target_host", "127.0.0.1")
|
||||
self.target_port = int(sender_cfg.get("target_port", 31000))
|
||||
self.source_tag = xbox_sender_cfg.get("source_tag", "xbox_udp")
|
||||
self.joy_topic = xbox_sender_cfg.get("joy_topic", "/xbox_data")
|
||||
|
||||
self.deadzone = float(xbox_sender_cfg.get("deadzone", 0.10))
|
||||
self.analog_epsilon = float(xbox_sender_cfg.get("analog_epsilon", 0.01))
|
||||
self.dpad_threshold = float(xbox_sender_cfg.get("dpad_threshold", 0.50))
|
||||
self.trigger_pressed_threshold = float(
|
||||
xbox_sender_cfg.get("trigger_pressed_threshold", -0.50)
|
||||
)
|
||||
|
||||
self.forward_command_offset = float(
|
||||
xbox_cfg.get("forward_command_offset", 0.0)
|
||||
)
|
||||
self.lateral_command_offset = float(
|
||||
xbox_cfg.get("lateral_command_offset", 0.0)
|
||||
)
|
||||
self.rotation_command_offset = float(
|
||||
xbox_cfg.get("rotation_command_offset", 0.0)
|
||||
)
|
||||
|
||||
self.button_map = {
|
||||
"a": 0,
|
||||
"b": 1,
|
||||
"x": 2,
|
||||
"y": 3,
|
||||
"lb": 4,
|
||||
"rb": 5,
|
||||
"select": 6,
|
||||
"start": 7,
|
||||
"home": 8,
|
||||
}
|
||||
self.axis_map = {
|
||||
"lx": 0,
|
||||
"ly": 1,
|
||||
"l_trigger": 2,
|
||||
"rx": 3,
|
||||
"ry": 4,
|
||||
"r_trigger": 5,
|
||||
"dpad_h": 6,
|
||||
"dpad_v": 7,
|
||||
}
|
||||
|
||||
self._merge_mapping(self.button_map, xbox_cfg.get("button_map"))
|
||||
self._merge_mapping(self.axis_map, xbox_cfg.get("axis_map"))
|
||||
self._merge_mapping(self.button_map, xbox_sender_cfg.get("button_map"))
|
||||
self._merge_mapping(self.axis_map, xbox_sender_cfg.get("axis_map"))
|
||||
|
||||
def _merge_mapping(self, target: Dict[str, int], override: object) -> None:
|
||||
if not isinstance(override, dict):
|
||||
return
|
||||
for name, index in override.items():
|
||||
if name in target:
|
||||
try:
|
||||
target[name] = int(index)
|
||||
except (TypeError, ValueError):
|
||||
pass
|
||||
|
||||
def _joy_callback(self, msg: Joy) -> None:
|
||||
axes = list(msg.axes) + [0.0] * 16
|
||||
buttons = list(msg.buttons) + [0] * 32
|
||||
|
||||
state = {
|
||||
"a": self._button_value(buttons, "a"),
|
||||
"b": self._button_value(buttons, "b"),
|
||||
"x": self._button_value(buttons, "x"),
|
||||
"y": self._button_value(buttons, "y"),
|
||||
"start": self._button_value(buttons, "start"),
|
||||
"home": self._button_value(buttons, "home"),
|
||||
"lx": self._axis_value(axes, "lx"),
|
||||
"ly": self._axis_value(axes, "ly"),
|
||||
"rx": self._axis_value(axes, "rx"),
|
||||
"l_trigger": self._axis_value(axes, "l_trigger"),
|
||||
"dpad_h": self._axis_value(axes, "dpad_h"),
|
||||
}
|
||||
|
||||
self._send_mode_events(state)
|
||||
self._send_trim_event(state)
|
||||
self._send_lift_events(state)
|
||||
self._send_analog_events(state)
|
||||
|
||||
self.last_buttons = {
|
||||
name: int(state[name]) for name in ("a", "b", "x", "y", "start", "home")
|
||||
}
|
||||
self.last_dpad_h = float(state["dpad_h"])
|
||||
|
||||
def _button_value(self, buttons: list[int], name: str) -> int:
|
||||
index = self.button_map[name]
|
||||
return int(buttons[index]) if index < len(buttons) else 0
|
||||
|
||||
def _axis_value(self, axes: list[float], name: str) -> float:
|
||||
index = self.axis_map[name]
|
||||
return float(axes[index]) if index < len(axes) else 0.0
|
||||
|
||||
def _send_mode_events(self, state: Dict[str, float]) -> None:
|
||||
if self._rising_edge(state, "y"):
|
||||
self._send_event("pose_hold", "y")
|
||||
elif self._rising_edge(state, "x"):
|
||||
self._send_event("pose_home", "x")
|
||||
elif self._rising_edge(state, "a"):
|
||||
self._send_event("mode_stride", "a")
|
||||
elif self._rising_edge(state, "b"):
|
||||
self._send_event("mode_dash", "b")
|
||||
elif (
|
||||
self._rising_edge(state, "home")
|
||||
and state["l_trigger"] < self.trigger_pressed_threshold
|
||||
):
|
||||
self._send_event("mode_xrun", "home")
|
||||
|
||||
def _send_trim_event(self, state: Dict[str, float]) -> None:
|
||||
if self._rising_edge(state, "start"):
|
||||
self._send_event("trim_reset", "start")
|
||||
|
||||
def _send_lift_events(self, state: Dict[str, float]) -> None:
|
||||
dpad_h = float(state["dpad_h"])
|
||||
if dpad_h <= -self.dpad_threshold and self.last_dpad_h > -self.dpad_threshold:
|
||||
self._send_event("lift_up", "dpad_left")
|
||||
elif dpad_h >= self.dpad_threshold and self.last_dpad_h < self.dpad_threshold:
|
||||
self._send_event("lift_down", "dpad_right")
|
||||
|
||||
def _send_analog_events(self, state: Dict[str, float]) -> None:
|
||||
surge = self._compute_surge(state["ly"])
|
||||
sway = self._cleanup_command(
|
||||
self._apply_deadzone(state["lx"]) * -0.4 + self.lateral_command_offset
|
||||
)
|
||||
spin = self._cleanup_command(
|
||||
self._apply_deadzone(state["rx"]) * -0.4 + self.rotation_command_offset
|
||||
)
|
||||
|
||||
self._send_event("set_surge", "left_stick_y", surge)
|
||||
self._send_event("set_sway", "left_stick_x", sway)
|
||||
self._send_event("set_spin", "right_stick_x", spin)
|
||||
|
||||
def _compute_surge(self, ly: float) -> float:
|
||||
ly = self._apply_deadzone(ly)
|
||||
if ly >= 0.0:
|
||||
value = ly * 0.8 + self.forward_command_offset
|
||||
else:
|
||||
value = ly * 0.5
|
||||
return self._cleanup_command(value)
|
||||
|
||||
def _apply_deadzone(self, value: float) -> float:
|
||||
if abs(value) < self.deadzone:
|
||||
return 0.0
|
||||
return float(value)
|
||||
|
||||
def _cleanup_command(self, value: float) -> float:
|
||||
if abs(value) < self.analog_epsilon:
|
||||
return 0.0
|
||||
return float(value)
|
||||
|
||||
def _rising_edge(self, state: Dict[str, float], name: str) -> bool:
|
||||
previous = int(self.last_buttons.get(name, 0))
|
||||
return int(state[name]) == 1 and previous == 0
|
||||
|
||||
def _send_event(
|
||||
self, event_code: str, key_name: str, drive_value: float = 1.0
|
||||
) -> None:
|
||||
envelope = InputEnvelope(
|
||||
seq_id=self.seq_id,
|
||||
event_code=event_code,
|
||||
key_name=key_name,
|
||||
drive_value=drive_value,
|
||||
source_tag=self.source_tag,
|
||||
)
|
||||
self.seq_id += 1
|
||||
self.socket.sendto(
|
||||
envelope.encode(), (self.target_host, self.target_port)
|
||||
)
|
||||
|
||||
|
||||
def main(args: list[str] | None = None) -> None:
|
||||
rclpy.init(args=args)
|
||||
node = UDPXboxSender()
|
||||
try:
|
||||
rclpy.spin(node)
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
finally:
|
||||
node.destroy_node()
|
||||
rclpy.shutdown()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user