feat: 对接KCP-C

This commit is contained in:
meiqi
2026-03-30 23:04:39 +08:00
parent cc1f766009
commit faebc5d44c
9 changed files with 1357 additions and 2 deletions

View File

@@ -6,7 +6,7 @@ sim: false
debug: false
control_tool: [keyboard] # joystick, xbox, keyboard, udp_loopback
control_tool: [omnisocket_loopback] # joystick, xbox, keyboard, udp_loopback, omnisocket_loopback
joystick:
initial_height: 0.957

View File

@@ -47,7 +47,13 @@ def timing_decorator(func):
class XMIGCSControlNode(Node):
"""xMIGCS控制节点Python版本"""
VALID_CONTROL_TOOLS = ("joystick", "xbox", "keyboard", "udp_loopback")
VALID_CONTROL_TOOLS = (
"joystick",
"xbox",
"keyboard",
"udp_loopback",
"omnisocket_loopback",
)
def __init__(self, debug=False):
super().__init__('xmigcs_control_node')
@@ -185,6 +191,13 @@ class XMIGCSControlNode(Node):
self.udp_fsm_controller.init()
self.udp_fsm_controller.start()
if "omnisocket_loopback" in self.control_tools:
from udp_loopback.omnisocket_fsm_controller import OmniSocketFSMController
self.omnisocket_fsm_controller = OmniSocketFSMController()
self.omnisocket_fsm_controller.init()
self.omnisocket_fsm_controller.start()
# Xbox控制器
if "xbox" in self.control_tools:
self.xbox_controller = XBOXController()
@@ -333,6 +346,15 @@ class XMIGCSControlNode(Node):
self.udp_fsm_controller.get_last_fsm_command_time(),
))
if "omnisocket_loopback" in self.control_tools:
self.omnisocket_fsm_controller.update_flag()
source_flags.append((
"omnisocket_loopback",
self.omnisocket_fsm_controller.get_udp_flag(),
self.omnisocket_fsm_controller.get_last_input_time(),
self.omnisocket_fsm_controller.get_last_fsm_command_time(),
))
source_name, active_flag = self._select_control_flag(source_flags)
flag = self._copy_control_flag(active_flag)
flag.fsm_state_command = self._select_fsm_command(source_flags)
@@ -421,6 +443,9 @@ class XMIGCSControlNode(Node):
if hasattr(self,
'udp_fsm_controller') and "udp_loopback" in self.control_tools:
self.udp_fsm_controller.stop()
if hasattr(self,
'omnisocket_fsm_controller') and "omnisocket_loopback" in self.control_tools:
self.omnisocket_fsm_controller.stop()
if self.control_thread and self.control_thread.is_alive():
self.control_thread.join(timeout=1.0)

View File

@@ -0,0 +1,43 @@
# OmniSocket Integration
This repo now supports an OmniSocket-based control path alongside the original UDP loopback.
## Files
- `omnisocket_control.py`: binary control packet codec shared by sender/receiver.
- `omnisocket_keyboard_sender.py`: keyboard -> OmniSocket sender.
- `omnisocket_xbox_sender.py`: ROS2 `/xbox_data` -> OmniSocket sender.
- `omnisocket_fsm_controller.py`: OmniSocket receiver that converts packets into `ControlFlag`.
- `config/omnisocket_demo.yaml`: OmniSocket transport config template.
## Main-node integration
Set `control_tool: omnisocket_loopback` in `config/dex_config.yaml`, then run `rl_control_node.py` or `rl_control_node_sim.py`.
The main node will instantiate `OmniSocketFSMController`, which receives binary control packets and maps them into the same FSM commands used by the UDP loopback path.
## Sender usage
Keyboard sender:
```bash
cd Deploy_Tienkung
python3 udp_loopback/omnisocket_keyboard_sender.py
```
Xbox sender:
```bash
cd Deploy_Tienkung
source /opt/ros/jazzy/setup.bash
ros2 run joy joy_node --ros-args -r joy:=/xbox_data
python3 udp_loopback/omnisocket_xbox_sender.py
```
## Notes
- Install the `omnisocket` Python package before using this path.
- `omnisocket_xbox_sender.py` still reuses `config/dex_config.yaml` for Xbox axis/button overrides so it stays aligned with the existing repo configuration.
- The original UDP files remain unchanged, so you can switch back by restoring `control_tool: udp_loopback`.
- OmniSocket keyboard/Xbox mappings are aligned with the cleaned walk-only FSM flow: `ZERO`, `STOP`, and `WALKAMP`.
- Keyboard sender supports `4/5/6` for clearing `x/y/yaw` speed independently, and `r` still clears all three axes.

View File

@@ -0,0 +1,29 @@
transport:
server_addr: 81.70.156.140:10909
relay_via: 106.55.173.235
bind_ip: ""
bind_device: ""
control_sender:
peer_id: peer-a-ctrl
target_peer: peer-b-ctrl
joy_topic: /xbox_data
deadzone: 0.10
analog_epsilon: 0.01
dpad_threshold: 0.50
trigger_pressed_threshold: -0.50
control_receiver:
peer_id: peer-b-ctrl
motion:
initial_lift: 0.89
min_lift: 0.65
max_lift: 0.90
lift_step: 0.05
surge_step: 0.10
sway_step: 0.10
spin_step: 0.10
max_surge: 1.00
max_sway: 0.50
max_spin: 0.50

View File

@@ -0,0 +1,106 @@
"""Binary control packet codec and motion helpers for OmniSocket demos."""
from __future__ import annotations
from dataclasses import dataclass
import struct
import time
CONTROL_PACKET_VERSION = 1
CONTROL_PACKET_STRUCT = struct.Struct("!BBHIfQ")
EVENT_NAME_TO_ID = {
"pose_home": 1,
"pose_hold": 2,
"mode_stride": 3,
"surge_up": 6,
"surge_down": 7,
"sway_left": 8,
"sway_right": 9,
"spin_left": 10,
"spin_right": 11,
"set_surge": 12,
"set_sway": 13,
"set_spin": 14,
"set_lift": 15,
"lift_up": 16,
"lift_down": 17,
"trim_reset": 18,
"session_quit": 19,
}
EVENT_ID_TO_NAME = {value: key for key, value in EVENT_NAME_TO_ID.items()}
@dataclass(slots=True)
class ControlPacket:
seq_id: int
event_id: int
drive_value: float = 1.0
sent_at_ns: int = 0
@property
def event_name(self) -> str:
return EVENT_ID_TO_NAME.get(self.event_id, f"unknown_{self.event_id}")
def encode(self) -> bytes:
sent_at_ns = self.sent_at_ns or time.time_ns()
return CONTROL_PACKET_STRUCT.pack(
CONTROL_PACKET_VERSION,
self.event_id,
0,
int(self.seq_id),
float(self.drive_value),
int(sent_at_ns),
)
@classmethod
def decode(cls, payload: bytes) -> "ControlPacket":
if len(payload) != CONTROL_PACKET_STRUCT.size:
raise ValueError(
f"invalid control packet length {len(payload)}; "
f"want {CONTROL_PACKET_STRUCT.size}"
)
version, event_id, _reserved, seq_id, drive_value, sent_at_ns = (
CONTROL_PACKET_STRUCT.unpack(payload)
)
if version != CONTROL_PACKET_VERSION:
raise ValueError(f"unsupported control packet version {version}")
return cls(
seq_id=int(seq_id),
event_id=int(event_id),
drive_value=float(drive_value),
sent_at_ns=int(sent_at_ns),
)
def make_control_packet(
seq_id: int, event_name: str, drive_value: float = 1.0, sent_at_ns: int | None = None
) -> ControlPacket:
event_id = EVENT_NAME_TO_ID[event_name]
return ControlPacket(
seq_id=seq_id,
event_id=event_id,
drive_value=drive_value,
sent_at_ns=time.time_ns() if sent_at_ns is None else sent_at_ns,
)
@dataclass(slots=True)
class MotionFrame:
mode_tag: str = "pose_hold"
surge_goal: float = 0.0
sway_goal: float = 0.0
spin_goal: float = 0.0
lift_goal: float = 0.89
relay_on: bool = True
last_event_code: str = ""
last_rx_time: float = 0.0
def format_motion_frame(frame: MotionFrame) -> str:
return (
f"mode={frame.mode_tag} surge={frame.surge_goal:.3f} "
f"sway={frame.sway_goal:.3f} spin={frame.spin_goal:.3f} "
f"lift={frame.lift_goal:.3f}"
)

View File

@@ -0,0 +1,275 @@
"""Adapter that maps OmniSocket control packets onto a ControlFlag interface."""
from __future__ import annotations
from pathlib import Path
import queue
import struct
import threading
import time
from typing import Dict, Optional
import yaml
from common.joystick import ControlFlag
try:
from .omnisocket_control import ControlPacket, MotionFrame, format_motion_frame
except ImportError: # pragma: no cover - direct script execution fallback
from omnisocket_control import ControlPacket, MotionFrame, format_motion_frame
def _load_omnisocket_api():
try:
from omnisocket import CONTROL_DEFAULTS, MSG_TYPE_BINARY, Session
except ImportError as exc: # pragma: no cover - environment dependent
raise RuntimeError(
"omnisocket is not installed. Install it before using "
"control_tool=omnisocket_loopback."
) from exc
return CONTROL_DEFAULTS, MSG_TYPE_BINARY, Session
class OmniSocketFSMFlag(ControlFlag):
"""FSM-facing flag produced from decoded OmniSocket control packets."""
def __init__(self) -> None:
super().__init__()
self.x_speed_command: float = 0.0
self.y_speed_command: float = 0.0
self.yaw_speed_command: float = 0.0
self.height_cmd: float = 0.89
class OmniSocketFSMController:
"""Receive OmniSocket control packets and expose them as ControlFlag."""
def __init__(self) -> None:
self.config: Dict[str, object] = {}
self.data_mutex = threading.Lock()
self._load_config()
self._init_data_structures()
self.recv_running = False
self.recv_thread: Optional[threading.Thread] = None
self.session = None
self._msg_type_binary = None
def _load_config(self) -> None:
config_path = Path(__file__).resolve().parent / "config" / "omnisocket_demo.yaml"
if config_path.exists():
with config_path.open("r", encoding="utf-8") as file:
self.config = yaml.safe_load(file) or {}
else:
self.config = {}
transport_cfg = self.config.get("transport", {})
receiver_cfg = self.config.get("control_receiver", {})
motion_cfg = self.config.get("motion", {})
self.server_addr = str(transport_cfg.get("server_addr", "127.0.0.1:10909"))
self.relay_via = str(transport_cfg.get("relay_via", ""))
self.bind_ip = str(transport_cfg.get("bind_ip", ""))
self.bind_device = str(transport_cfg.get("bind_device", ""))
self.peer_id = str(receiver_cfg.get("peer_id", "peer-b-ctrl"))
self.initial_lift = float(motion_cfg.get("initial_lift", 0.89))
self.lift_step = float(motion_cfg.get("lift_step", 0.05))
self.max_surge = float(motion_cfg.get("max_surge", 1.0))
self.max_sway = float(motion_cfg.get("max_sway", 0.5))
self.max_spin = float(motion_cfg.get("max_spin", 0.5))
self.max_lift = float(motion_cfg.get("max_lift", 0.90))
self.min_lift = float(motion_cfg.get("min_lift", 0.65))
self.surge_step = float(motion_cfg.get("surge_step", 0.1))
self.sway_step = float(motion_cfg.get("sway_step", 0.1))
self.spin_step = float(motion_cfg.get("spin_step", 0.1))
def _init_data_structures(self) -> None:
self.packet_queue: queue.Queue[ControlPacket] = queue.Queue(maxsize=128)
self.motion_frame = MotionFrame(lift_goal=self.initial_lift)
self.udp_flag = OmniSocketFSMFlag()
self.udp_flag.height_cmd = self.initial_lift
self.last_seq_id = -1
self.last_fsm_command_time = 0.0
def start(self) -> None:
control_defaults, msg_type_binary, session_cls = _load_omnisocket_api()
self._msg_type_binary = msg_type_binary
self.session = session_cls()
self.session.connect(
server_addr=self.server_addr,
peer_id=self.peer_id,
relay_via=self.relay_via,
bind_ip=self.bind_ip,
bind_device=self.bind_device,
**control_defaults,
)
self.recv_running = True
self.recv_thread = threading.Thread(target=self._recv_loop, daemon=True)
self.recv_thread.start()
print(
f"OmniSocket FSM controller listening as {self.peer_id} "
f"via {self.server_addr}"
)
def stop(self) -> None:
self.recv_running = False
if self.recv_thread and self.recv_thread.is_alive():
self.recv_thread.join(timeout=1.0)
if self.session is not None:
self.session.close()
self.session = None
print("OmniSocket FSM controller stopped")
def _recv_loop(self) -> None:
while self.recv_running and self.session is not None:
item = self.session.recv(timeout_ms=200)
if item is None:
continue
from_peer, msg_type, payload = item
if msg_type != self._msg_type_binary:
print(
f"[omnisocket_fsm] ignore non-binary message "
f"from {from_peer}: {msg_type}"
)
continue
try:
packet = ControlPacket.decode(payload)
except (ValueError, struct.error) as exc:
print(f"[omnisocket_fsm] drop invalid payload from {from_peer}: {exc}")
continue
try:
self.packet_queue.put_nowait(packet)
except queue.Full:
try:
self.packet_queue.get_nowait()
self.packet_queue.put_nowait(packet)
except queue.Empty:
pass
def update_flag(self) -> None:
while not self.packet_queue.empty():
try:
packet = self.packet_queue.get_nowait()
except queue.Empty:
break
self._apply_packet(packet)
def get_udp_flag(self) -> OmniSocketFSMFlag:
with self.data_mutex:
flag_copy = OmniSocketFSMFlag()
flag_copy.__dict__.update(self.udp_flag.__dict__)
return flag_copy
def get_last_input_time(self) -> float:
with self.data_mutex:
return self.motion_frame.last_rx_time
def get_last_fsm_command_time(self) -> float:
with self.data_mutex:
return self.last_fsm_command_time
def init(self) -> int:
print("OmniSocket FSM controller initialized")
return 0
def _apply_packet(self, packet: ControlPacket) -> None:
event_code = packet.event_name
now = time.time()
with self.data_mutex:
self.last_seq_id = packet.seq_id
self.motion_frame.last_event_code = event_code
self.motion_frame.last_rx_time = now
if event_code == "pose_home":
self.motion_frame.mode_tag = "pose_home"
self.last_fsm_command_time = now
elif event_code == "pose_hold":
self.motion_frame.mode_tag = "pose_hold"
self.last_fsm_command_time = now
elif event_code == "mode_stride":
self.motion_frame.mode_tag = "mode_stride"
self.last_fsm_command_time = now
elif event_code == "surge_up":
self.motion_frame.surge_goal = min(
self.max_surge, self.motion_frame.surge_goal + self.surge_step
)
elif event_code == "surge_down":
self.motion_frame.surge_goal = max(
-self.max_surge, self.motion_frame.surge_goal - self.surge_step
)
elif event_code == "sway_left":
self.motion_frame.sway_goal = max(
-self.max_sway, self.motion_frame.sway_goal - self.sway_step
)
elif event_code == "sway_right":
self.motion_frame.sway_goal = min(
self.max_sway, self.motion_frame.sway_goal + self.sway_step
)
elif event_code == "spin_left":
self.motion_frame.spin_goal = max(
-self.max_spin, self.motion_frame.spin_goal - self.spin_step
)
elif event_code == "spin_right":
self.motion_frame.spin_goal = min(
self.max_spin, self.motion_frame.spin_goal + self.spin_step
)
elif event_code == "set_surge":
self.motion_frame.surge_goal = max(
-self.max_surge, min(self.max_surge, packet.drive_value)
)
elif event_code == "set_sway":
self.motion_frame.sway_goal = max(
-self.max_sway, min(self.max_sway, packet.drive_value)
)
elif event_code == "set_spin":
self.motion_frame.spin_goal = max(
-self.max_spin, min(self.max_spin, packet.drive_value)
)
elif event_code == "set_lift":
self.motion_frame.lift_goal = max(
self.min_lift, min(self.max_lift, packet.drive_value)
)
elif event_code == "lift_up":
self.motion_frame.lift_goal = min(
self.max_lift, self.motion_frame.lift_goal + self.lift_step
)
elif event_code == "lift_down":
self.motion_frame.lift_goal = max(
self.min_lift, self.motion_frame.lift_goal - self.lift_step
)
elif event_code == "trim_reset":
self.motion_frame.surge_goal = 0.0
self.motion_frame.sway_goal = 0.0
self.motion_frame.spin_goal = 0.0
elif event_code == "session_quit":
self.motion_frame.relay_on = False
self.motion_frame.mode_tag = "pose_hold"
self.recv_running = False
self._sync_motion_frame_to_flag()
print(
f"[omnisocket_fsm] seq={packet.seq_id} event={event_code} "
f"{format_motion_frame(self.motion_frame)} "
f"fsm={self.udp_flag.fsm_state_command}"
)
def _sync_motion_frame_to_flag(self) -> None:
mode_to_fsm_command = {
"pose_home": "gotoZERO",
"pose_hold": "gotoSTOP",
"mode_stride": "gotoWALKAMP",
}
self.udp_flag.enable = self.motion_frame.relay_on
self.udp_flag.fsm_state_command = mode_to_fsm_command.get(
self.motion_frame.mode_tag, self.udp_flag.fsm_state_command
)
self.udp_flag.x_speed_command = self.motion_frame.surge_goal
self.udp_flag.y_speed_command = self.motion_frame.sway_goal
self.udp_flag.yaw_speed_command = self.motion_frame.spin_goal
self.udp_flag.height_cmd = self.motion_frame.lift_goal

View File

@@ -0,0 +1,229 @@
"""Keyboard sender that emits binary control packets over OmniSocket."""
from __future__ import annotations
import os
from pathlib import Path
import select
import signal
import sys
import termios
import threading
import tty
from typing import Dict, Optional
import yaml
try:
from .omnisocket_control import make_control_packet
except ImportError: # pragma: no cover - direct script execution fallback
from omnisocket_control import make_control_packet
def _load_omnisocket_api():
try:
from omnisocket import CONTROL_DEFAULTS, Session
except ImportError as exc: # pragma: no cover - environment dependent
raise RuntimeError(
"omnisocket is not installed. Install it before using "
"omnisocket_keyboard_sender.py."
) from exc
return CONTROL_DEFAULTS, Session
class OmniSocketKeyboardSender:
"""Standalone keyboard sender for OmniSocket control-plane testing."""
def __init__(self) -> None:
self.config: Dict[str, object] = {}
self.seq_id = 0
self.running = False
self.session = None
self.input_thread: Optional[threading.Thread] = None
self.original_terminal_settings = None
self._load_config()
self._init_session()
self._print_help()
def _load_config(self) -> None:
config_path = Path(__file__).resolve().parent / "config" / "omnisocket_demo.yaml"
if config_path.exists():
with config_path.open("r", encoding="utf-8") as file:
self.config = yaml.safe_load(file) or {}
else:
self.config = {}
transport_cfg = self.config.get("transport", {})
sender_cfg = self.config.get("control_sender", {})
self.server_addr = str(transport_cfg.get("server_addr", "127.0.0.1:10909"))
self.relay_via = str(transport_cfg.get("relay_via", ""))
self.bind_ip = str(transport_cfg.get("bind_ip", ""))
self.bind_device = str(transport_cfg.get("bind_device", ""))
self.peer_id = str(sender_cfg.get("peer_id", "peer-a-ctrl"))
self.target_peer = str(sender_cfg.get("target_peer", "peer-b-ctrl"))
def _init_session(self) -> None:
control_defaults, session_cls = _load_omnisocket_api()
self.session = session_cls()
self.session.connect(
server_addr=self.server_addr,
peer_id=self.peer_id,
relay_via=self.relay_via,
bind_ip=self.bind_ip,
bind_device=self.bind_device,
**control_defaults,
)
def _print_help(self) -> None:
print("OmniSocket keyboard sender ready")
print(f"Peer: {self.peer_id} -> {self.target_peer} via {self.server_addr}")
print("Keys:")
print(" z -> pose_home")
print(" c -> pose_hold")
print(" m -> mode_stride")
print(" w/s -> surge +/-")
print(" a/d -> sway +/-")
print(" q/e -> spin +/-")
print(" Left/Right -> lift +/-")
print(" Up/Down -> surge +/-")
print(" r -> trim_reset")
print(" 4 -> clear x speed")
print(" 5 -> clear y speed")
print(" 6 -> clear yaw speed")
print(" x -> session_quit")
def start(self) -> None:
self.running = True
self.input_thread = threading.Thread(target=self._input_loop, daemon=True)
self.input_thread.start()
print("OmniSocket keyboard sender thread started")
def stop(self) -> None:
self.running = False
if self.input_thread and self.input_thread.is_alive():
self.input_thread.join(timeout=1.0)
if self.original_terminal_settings is not None:
try:
termios.tcsetattr(
sys.stdin, termios.TCSADRAIN, self.original_terminal_settings
)
except termios.error:
pass
self.original_terminal_settings = None
if self.session is not None:
self.session.close()
self.session = None
print("OmniSocket keyboard sender stopped")
def _input_loop(self) -> None:
self.original_terminal_settings = termios.tcgetattr(sys.stdin)
try:
tty.setraw(sys.stdin.fileno())
while self.running:
if select.select([sys.stdin], [], [], 0.1)[0]:
key = sys.stdin.read(1)
self._process_key(key)
except KeyboardInterrupt:
self._handle_ctrl_c()
finally:
if self.original_terminal_settings is not None:
termios.tcsetattr(
sys.stdin, termios.TCSADRAIN, self.original_terminal_settings
)
self.original_terminal_settings = None
def _process_key(self, key: str) -> None:
event_map = {
"w": ("surge_up", "w", 1.0),
"s": ("surge_down", "s", 1.0),
"a": ("sway_left", "a", 1.0),
"d": ("sway_right", "d", 1.0),
"q": ("spin_left", "q", 1.0),
"e": ("spin_right", "e", 1.0),
"z": ("pose_home", "z", 1.0),
"c": ("pose_hold", "c", 1.0),
"m": ("mode_stride", "m", 1.0),
"r": ("trim_reset", "r", 1.0),
"4": ("set_surge", "4", 0.0),
"5": ("set_sway", "5", 0.0),
"6": ("set_spin", "6", 0.0),
"x": ("session_quit", "x", 1.0),
}
if key == "\x03":
self._handle_ctrl_c()
return
if key == "\x1b":
self._handle_arrow_key()
return
if key in event_map:
event_code, key_name, drive_value = event_map[key]
self._send_event(event_code, key_name, drive_value)
def _handle_arrow_key(self) -> None:
if not select.select([sys.stdin], [], [], 0.1)[0]:
return
key2 = sys.stdin.read(1)
if key2 != "[":
return
if not select.select([sys.stdin], [], [], 0.1)[0]:
return
key3 = sys.stdin.read(1)
arrow_map = {
"A": ("surge_up", "arrow_up"),
"B": ("surge_down", "arrow_down"),
"C": ("lift_down", "arrow_right"),
"D": ("lift_up", "arrow_left"),
}
if key3 in arrow_map:
event_code, key_name = arrow_map[key3]
self._send_event(event_code, key_name)
def _send_event(
self, event_code: str, key_name: str, drive_value: float = 1.0
) -> None:
if self.session is None:
return
packet = make_control_packet(self.seq_id, event_code, drive_value)
payload = packet.encode()
self.seq_id += 1
self.session.send(to=self.target_peer, data=payload)
print(
f"sent seq={packet.seq_id} event={event_code} key={key_name} "
f"bytes={len(payload)}"
)
if event_code == "session_quit":
self.running = False
def _handle_ctrl_c(self) -> None:
self.running = False
if self.original_terminal_settings is not None:
try:
termios.tcsetattr(
sys.stdin, termios.TCSADRAIN, self.original_terminal_settings
)
except termios.error:
pass
self.original_terminal_settings = None
os.kill(os.getpid(), signal.SIGINT)
def main() -> None:
sender = OmniSocketKeyboardSender()
sender.start()
try:
while sender.running:
if sender.input_thread is not None:
sender.input_thread.join(timeout=0.2)
except KeyboardInterrupt:
pass
finally:
sender.stop()
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,283 @@
"""ROS2 Joy -> OmniSocket bridge for Xbox control."""
from __future__ import annotations
from pathlib import Path
from typing import Dict
import rclpy
from rclpy.node import Node
from rclpy.qos import HistoryPolicy, QoSProfile, ReliabilityPolicy
from sensor_msgs.msg import Joy
import yaml
try:
from .omnisocket_control import make_control_packet
except ImportError: # pragma: no cover - direct script execution fallback
from omnisocket_control import make_control_packet
def _load_omnisocket_api():
try:
from omnisocket import CONTROL_DEFAULTS, Session
except ImportError as exc: # pragma: no cover - environment dependent
raise RuntimeError(
"omnisocket is not installed. Install it before using "
"omnisocket_xbox_sender.py."
) from exc
return CONTROL_DEFAULTS, Session
class OmniSocketXboxSender(Node):
"""Subscribe to Joy messages and forward them through OmniSocket."""
def __init__(self) -> None:
super().__init__("omnisocket_xbox_sender")
self.config: Dict[str, object] = {}
self.seq_id = 0
self.last_buttons: Dict[str, int] = {}
self.last_dpad_h = 0.0
self.session = None
self._load_config()
self._init_session()
qos_profile = QoSProfile(
reliability=ReliabilityPolicy.RELIABLE,
history=HistoryPolicy.KEEP_LAST,
depth=10,
)
self.subscription = self.create_subscription(
Joy, self.joy_topic, self._joy_callback, qos_profile
)
self.get_logger().info(
f"Forwarding {self.joy_topic} -> OmniSocket "
f"{self.peer_id} -> {self.target_peer}"
)
self.get_logger().info(
"Buttons: A=WALKAMP X=ZERO Y=STOP START=reset"
)
def destroy_node(self) -> bool:
if self.session is not None:
self.session.close()
self.session = None
return super().destroy_node()
def _load_config(self) -> None:
omni_config_path = (
Path(__file__).resolve().parent / "config" / "omnisocket_demo.yaml"
)
main_config_path = Path(__file__).resolve().parents[1] / "config" / "dex_config.yaml"
if omni_config_path.exists():
with omni_config_path.open("r", encoding="utf-8") as file:
self.config = yaml.safe_load(file) or {}
else:
self.config = {}
with main_config_path.open("r", encoding="utf-8") as file:
main_config = yaml.safe_load(file) or {}
transport_cfg = self.config.get("transport", {})
sender_cfg = self.config.get("control_sender", {})
xbox_cfg = main_config.get("xbox", {})
self.server_addr = str(transport_cfg.get("server_addr", "127.0.0.1:10909"))
self.relay_via = str(transport_cfg.get("relay_via", ""))
self.bind_ip = str(transport_cfg.get("bind_ip", ""))
self.bind_device = str(transport_cfg.get("bind_device", ""))
self.peer_id = str(sender_cfg.get("peer_id", "peer-a-ctrl"))
self.target_peer = str(sender_cfg.get("target_peer", "peer-b-ctrl"))
self.joy_topic = str(sender_cfg.get("joy_topic", "/xbox_data"))
self.deadzone = float(sender_cfg.get("deadzone", 0.10))
self.analog_epsilon = float(sender_cfg.get("analog_epsilon", 0.01))
self.dpad_threshold = float(sender_cfg.get("dpad_threshold", 0.50))
self.trigger_pressed_threshold = float(
sender_cfg.get("trigger_pressed_threshold", -0.50)
)
self.forward_command_offset = float(
sender_cfg.get(
"forward_command_offset",
xbox_cfg.get("forward_command_offset", 0.0),
)
)
self.lateral_command_offset = float(
sender_cfg.get(
"lateral_command_offset",
xbox_cfg.get("lateral_command_offset", 0.0),
)
)
self.rotation_command_offset = float(
sender_cfg.get(
"rotation_command_offset",
xbox_cfg.get("rotation_command_offset", 0.0),
)
)
self.button_map = {
"a": 0,
"b": 1,
"x": 2,
"y": 3,
"lb": 4,
"rb": 5,
"select": 6,
"start": 7,
"home": 8,
}
self.axis_map = {
"lx": 0,
"ly": 1,
"l_trigger": 2,
"rx": 3,
"ry": 4,
"r_trigger": 5,
"dpad_h": 6,
"dpad_v": 7,
}
self._merge_mapping(self.button_map, xbox_cfg.get("button_map"))
self._merge_mapping(self.axis_map, xbox_cfg.get("axis_map"))
self._merge_mapping(self.button_map, sender_cfg.get("button_map"))
self._merge_mapping(self.axis_map, sender_cfg.get("axis_map"))
def _merge_mapping(self, target: Dict[str, int], override: object) -> None:
if not isinstance(override, dict):
return
for name, index in override.items():
if name in target:
try:
target[name] = int(index)
except (TypeError, ValueError):
pass
def _init_session(self) -> None:
control_defaults, session_cls = _load_omnisocket_api()
self.session = session_cls()
self.session.connect(
server_addr=self.server_addr,
peer_id=self.peer_id,
relay_via=self.relay_via,
bind_ip=self.bind_ip,
bind_device=self.bind_device,
**control_defaults,
)
def _joy_callback(self, msg: Joy) -> None:
axes = list(msg.axes) + [0.0] * 16
buttons = list(msg.buttons) + [0] * 32
state = {
"a": self._button_value(buttons, "a"),
"x": self._button_value(buttons, "x"),
"y": self._button_value(buttons, "y"),
"start": self._button_value(buttons, "start"),
"lx": self._axis_value(axes, "lx"),
"ly": self._axis_value(axes, "ly"),
"rx": self._axis_value(axes, "rx"),
"dpad_h": self._axis_value(axes, "dpad_h"),
}
self._send_mode_events(state)
self._send_trim_event(state)
self._send_lift_events(state)
self._send_analog_events(state)
self.last_buttons = {
name: int(state[name]) for name in ("a", "x", "y", "start")
}
self.last_dpad_h = float(state["dpad_h"])
def _button_value(self, buttons: list[int], name: str) -> int:
index = self.button_map[name]
return int(buttons[index]) if index < len(buttons) else 0
def _axis_value(self, axes: list[float], name: str) -> float:
index = self.axis_map[name]
return float(axes[index]) if index < len(axes) else 0.0
def _send_mode_events(self, state: Dict[str, float]) -> None:
if self._rising_edge(state, "y"):
self._send_event("pose_hold", "y")
elif self._rising_edge(state, "x"):
self._send_event("pose_home", "x")
elif self._rising_edge(state, "a"):
self._send_event("mode_stride", "a")
def _send_trim_event(self, state: Dict[str, float]) -> None:
if self._rising_edge(state, "start"):
self._send_event("trim_reset", "start")
def _send_lift_events(self, state: Dict[str, float]) -> None:
dpad_h = float(state["dpad_h"])
if dpad_h <= -self.dpad_threshold and self.last_dpad_h > -self.dpad_threshold:
self._send_event("lift_up", "dpad_left")
elif dpad_h >= self.dpad_threshold and self.last_dpad_h < self.dpad_threshold:
self._send_event("lift_down", "dpad_right")
def _send_analog_events(self, state: Dict[str, float]) -> None:
surge = self._compute_surge(state["ly"])
sway = self._cleanup_command(
self._apply_deadzone(state["lx"]) * -0.4 + self.lateral_command_offset
)
spin = self._cleanup_command(
self._apply_deadzone(state["rx"]) * -0.4 + self.rotation_command_offset
)
self._send_event("set_surge", "left_stick_y", surge)
self._send_event("set_sway", "left_stick_x", sway)
self._send_event("set_spin", "right_stick_x", spin)
def _compute_surge(self, ly: float) -> float:
ly = self._apply_deadzone(ly)
if ly >= 0.0:
value = ly * 0.8 + self.forward_command_offset
else:
value = ly * 0.5
return self._cleanup_command(value)
def _apply_deadzone(self, value: float) -> float:
if abs(value) < self.deadzone:
return 0.0
return float(value)
def _cleanup_command(self, value: float) -> float:
if abs(value) < self.analog_epsilon:
return 0.0
return float(value)
def _rising_edge(self, state: Dict[str, float], name: str) -> bool:
previous = int(self.last_buttons.get(name, 0))
return int(state[name]) == 1 and previous == 0
def _send_event(
self, event_code: str, key_name: str, drive_value: float = 1.0
) -> None:
if self.session is None:
return
packet = make_control_packet(self.seq_id, event_code, drive_value)
self.seq_id += 1
self.session.send(to=self.target_peer, data=packet.encode())
self.get_logger().debug(
f"sent seq={packet.seq_id} event={event_code} key={key_name}"
)
def main(args: list[str] | None = None) -> None:
rclpy.init(args=args)
node = OmniSocketXboxSender()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,365 @@
"""
RL Control Plugin (Python Version)
Main ROS2 node for humanoid robot RL control system
"""
import math
import os
import queue
import threading
import time
import sys
import rclpy
import yaml
from rclpy.node import Node
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy
# ROS messages
from sensor_msgs.msg import Joy
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from common.joystick import JoystickHumanoid, ControlFlag
from common.xbox_control import XBOXController
# Local imports
from common.robot_data import RobotData
from FSM.robot_fsm import get_robot_fsm
from FSM.fsm_base import FSMStateName
from common.robot_interface import get_robot_interface
from common.stdin_keyboard_control import KeyboardController
from udp_loopback.udp_fsm_controller import UDPFSMController
import functools
def timing_decorator(func):
"""
装饰器:记录函数执行时间
"""
@functools.wraps(func)
def wrapper(*args, **kwargs):
start_time = time.perf_counter()
result = func(*args, **kwargs)
end_time = time.perf_counter()
execution_time = end_time - start_time
print(f"[TIMING] {func.__name__} executed in {execution_time:.6f} seconds")
return result
return wrapper
class XMIGCSControlNode(Node):
"""xMIGCS控制节点Python版本"""
def __init__(self, debug=False):
super().__init__('xmigcs_control_node')
# 配置和参数
self.debug = debug
self.whole_joint_num = 35
self.pi = math.pi
self.rpm2rps = math.pi / 30.0
self.config = {}
# 加载配置
self._load_config()
# 初始化数据结构
self._init_data_structures()
# 机器人接口
self.robot_interface = get_robot_interface(self.robot_data,
self.config_file)
self.robot_interface.init(self) # 传入node实例
# 机器人FSM
self.robot_fsm = get_robot_fsm(
self.robot_data,
self.config,
)
# 初始化ROS接口
self._init_ros_interfaces()
# 初始化控制系统
self._init_control_system()
# 启动控制线程
self._start_control_thread()
def _load_config(self):
"""加载配置文件"""
# 获取包路径
self.config_file = os.path.join('.', 'config', 'dex_config.yaml')
with open(self.config_file, 'r') as f:
self.config = yaml.safe_load(f)
print(self.config)
# 获取控制器类型
self.control_tool = self.config.get('control_tool', 'keyboard')
# 提取关键配置参数
self.motor_num = self.config.get('motor_num')
self.dt = self.config.get('dt')
self.sim = self.config.get('sim')
# 检查当前用户名如果是ubuntu则抛出异常
import getpass
user_name = getpass.getuser().lower()
if self.sim and user_name == 'ubuntu':
raise RuntimeError("On ubuntu user, sim must be set to false")
def _init_data_structures(self):
"""初始化数据结构"""
# 机器人数据
self.robot_data = RobotData(self.motor_num, self.whole_joint_num)
self.robot_data.config_file_ = getattr(self, 'config_file', '')
# joysticks 消息队列
self.queue_joy_cmd = queue.Queue(maxsize=1)
self.queue_xbox_cmd = queue.Queue(maxsize=1)
self.control_flag = ControlFlag()
def _init_ros_interfaces(self):
"""初始化ROS接口仅非电机相关"""
qos_profile = QoSProfile(reliability=ReliabilityPolicy.RELIABLE,
history=HistoryPolicy.KEEP_LAST,
depth=5)
# 订阅者(非电机相关)
if self.control_tool == "joystick":
self.sub_joy_cmd = self.create_subscription(
Joy, '/sbus_data', self._joy_callback, qos_profile)
if self.control_tool == "xbox":
self.sub_xbox_cmd = self.create_subscription(
Joy, '/xbox_data', self._xbox_callback, qos_profile)
def _init_control_system(self):
"""初始化控制系统"""
# 手柄控制器
if self.control_tool == "joystick":
self.joystick_humanoid = JoystickHumanoid()
self.joystick_humanoid.init()
# 键盘控制器
if self.control_tool == "keyboard":
self.keyboard_controller = KeyboardController()
self.keyboard_controller.init()
# 如果使用键盘控制,启动键盘监听
self.keyboard_controller.start()
# UDP控制器
if self.control_tool == "udp_loopback":
self.udp_fsm_controller = UDPFSMController()
self.udp_fsm_controller.init()
self.udp_fsm_controller.start()
if self.control_tool == "omnisocket_loopback":
from udp_loopback.omnisocket_fsm_controller import OmniSocketFSMController
self.omnisocket_fsm_controller = OmniSocketFSMController()
self.omnisocket_fsm_controller.init()
self.omnisocket_fsm_controller.start()
# Xbox控制器
if self.control_tool == "xbox":
self.xbox_controller = XBOXController()
self.xbox_controller.init()
# 控制标志
self.control_running = False
self.control_thread = None
def _start_control_thread(self):
"""启动控制线程"""
self.control_running = True
self.control_thread = threading.Thread(target=self._rl_control_loop,
daemon=True)
self.control_thread.start()
self.get_logger().info("Control thread started")
def _rl_control_loop(self):
"""主控制循环"""
self.get_logger().info("RL control loop starting...")
# 初始化时间戳
time_passed = 0.0
# 处理手柄数据
self._process_controller_data()
# 更新机器人数据
self._update_robot_data(self.control_flag, time_passed)
while self.control_running and rclpy.ok():
# 运行FSM
loop_start = time.perf_counter()
self.robot_fsm.run_fsm(self.robot_data.control_flag)
# 发布控制命令
self.robot_interface.update_param(current_state=self.robot_fsm.get_current_state())
self._send_control_commands(self.robot_data.control_flag)
# 更新时间戳
time_passed += self.dt
# 处理手柄数据
self._process_controller_data()
# 更新机器人数据
self._update_robot_data(self.control_flag, time_passed)
# 控制频率
self._precise_sleep_until(loop_start + self.dt)
# print(
# f"current control freq: {1/(time.perf_counter() - loop_start):.2f} Hz"
# )
self.get_logger().info("RL control loop ended")
def _precise_sleep_until(self, target_time):
"""精确睡眠到目标时间"""
current_time = time.perf_counter()
sleep_time = target_time - current_time
if sleep_time <= 0:
return # 已经超时,立即返回
# 分级睡眠策略
if sleep_time > 0.003: # 3ms以上使用混合睡眠
# 先睡眠大部分时间
time.sleep(sleep_time * 0.9)
# 剩余时间忙等待
while time.perf_counter() < target_time:
pass
else: # 3ms以内纯忙等待
while time.perf_counter() < target_time:
pass
# def _wait_for_start_signal(self):
# """等待启动信号"""
# start_file = "/tmp/rl_start_signal"
# self.get_logger().info("Waiting for start signal...")
# self.get_logger().info("Run: touch /tmp/rl_start_signal")
# # 删除可能存在的旧文件
# if os.path.exists(start_file):
# os.remove(start_file)
# # 等待启动文件出现
# while not os.path.exists(start_file) and rclpy.ok():
# time.sleep(0.5)
# self.get_logger().info("Start signal received, beginning RL control!")
# @timing_decorator
def _process_controller_data(self):
# 处理控制器输入
if self.control_tool == "joystick":
# 处理手柄输入
while not self.queue_joy_cmd.empty():
try:
msg = self.queue_joy_cmd.get_nowait()
self.joystick_humanoid.joy_map_read(msg)
self.joystick_humanoid.joy_flag_update()
break
except queue.Empty:
break
if self.control_tool == "xbox":
while not self.queue_xbox_cmd.empty():
try:
msg = self.queue_xbox_cmd.get_nowait()
self.xbox_controller.xbox_map_read(msg)
self.xbox_controller.xbox_flag_update()
break
except queue.Empty:
break
if self.control_tool == "keyboard":
self.keyboard_controller.update_flag()
flag = self.keyboard_controller.get_keyboard_flag()
elif self.control_tool == "udp_loopback":
self.udp_fsm_controller.update_flag()
flag = self.udp_fsm_controller.get_udp_flag()
elif self.control_tool == "omnisocket_loopback":
self.omnisocket_fsm_controller.update_flag()
flag = self.omnisocket_fsm_controller.get_udp_flag()
elif self.control_tool == "joystick":
flag = self.joystick_humanoid.get_joy_flag()
elif self.control_tool == "xbox":
flag = self.xbox_controller.get_xbox_flag()
else:
print("[ERROR] No control tool specified")
print('*' * 30 + f"current flag: {flag}" + '*' * 30)
self.control_flag = flag
# @timing_decorator
def _update_robot_data(self, flag: ControlFlag, time_passed: float):
"""更新机器人数据"""
self.robot_interface.update_robot_data(flag, time_passed)
# @timing_decorator
def _send_control_commands(self, flag: ControlFlag):
"""发布控制命令"""
# 通过robot_interface发布控制命令
self.robot_interface.send_motor_commands(flag)
def _joy_callback(self, msg):
"""云卓手柄输入回调"""
try:
self.queue_joy_cmd.put_nowait(msg)
except queue.Full:
try:
self.queue_joy_cmd.get_nowait() # 移除旧数据
self.queue_joy_cmd.put_nowait(msg) # 加入新数据
except:
pass # 如果仍然无法加入,忽略
def _xbox_callback(self, msg):
"""xbox手柄输入回调"""
try:
self.queue_xbox_cmd.put_nowait(msg)
except queue.Full:
try:
self.queue_xbox_cmd.get_nowait() # 移除旧数据
self.queue_xbox_cmd.put_nowait(msg) # 加入新数据
except:
pass # 如果仍然无法加入,忽略
def destroy_node(self):
"""节点销毁"""
self.control_running = False
# # 先停止键盘控制器(重要!)
if hasattr(self,
'keyboard_controller') and self.control_tool == "keyboard":
self.keyboard_controller.stop()
if hasattr(self,
'udp_fsm_controller') and self.control_tool == "udp_loopback":
self.udp_fsm_controller.stop()
if hasattr(self,
'omnisocket_fsm_controller') and self.control_tool == "omnisocket_loopback":
self.omnisocket_fsm_controller.stop()
if self.control_thread and self.control_thread.is_alive():
self.control_thread.join(timeout=1.0)
super().destroy_node()
def main(args=None):
"""主函数"""
rclpy.init(args=args)
node = None
try:
node = XMIGCSControlNode(debug=False)
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
if 'node' in locals() and node is not None:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()