diff --git a/Deploy_Tienkung/udp_loopback/config/omnisocket_demo.yaml b/Deploy_Tienkung/udp_loopback/config/omnisocket_demo.yaml index 8b614a1..e612d79 100644 --- a/Deploy_Tienkung/udp_loopback/config/omnisocket_demo.yaml +++ b/Deploy_Tienkung/udp_loopback/config/omnisocket_demo.yaml @@ -1,5 +1,5 @@ transport: - server_addr: 81.70.156.140:10909 + server_addr: "" relay_via: 106.55.173.235:10909 bind_ip: "" bind_device: "" diff --git a/Deploy_Tienkung/udp_loopback/omnisocket_keyboard_sender.py b/Deploy_Tienkung/udp_loopback/omnisocket_keyboard_sender.py index 89b3e5d..c37f7db 100644 --- a/Deploy_Tienkung/udp_loopback/omnisocket_keyboard_sender.py +++ b/Deploy_Tienkung/udp_loopback/omnisocket_keyboard_sender.py @@ -1,4 +1,4 @@ -"""Keyboard sender that emits binary control packets over OmniSocket.""" +"""Keyboard sender that emits control events through OmniDaemon or OmniSocket.""" from __future__ import annotations @@ -9,8 +9,9 @@ import signal import sys import termios import threading +import time import tty -from typing import Dict, Optional +from typing import Dict import yaml @@ -20,32 +21,55 @@ except ImportError: # pragma: no cover - direct script execution fallback from omnisocket_control import make_control_packet +WORKSPACE_ROOT = Path(__file__).resolve().parents[3] +DEFAULT_BACKEND = "daemon" + + +def _load_daemon_client_api(): + try: + from omnisocket_a_side.client import OmniDaemonClient + except ImportError: + python_dir = WORKSPACE_ROOT / "OmniSocketGo" / "python" + if python_dir.exists(): + sys.path.insert(0, str(python_dir)) + from omnisocket_a_side.client import OmniDaemonClient + return OmniDaemonClient + + def _load_omnisocket_api(): try: from omnisocket import CONTROL_DEFAULTS, Session except ImportError as exc: # pragma: no cover - environment dependent raise RuntimeError( - "omnisocket is not installed. Install it before using " - "omnisocket_keyboard_sender.py." + "omnisocket is not installed. Install it before using direct transport mode." ) from exc return CONTROL_DEFAULTS, Session class OmniSocketKeyboardSender: - """Standalone keyboard sender for OmniSocket control-plane testing.""" + """Standalone keyboard sender for A-side control-plane testing.""" def __init__(self) -> None: self.config: Dict[str, object] = {} + self.backend = self._resolve_backend() self.seq_id = 0 self.running = False self.session = None - self.input_thread: Optional[threading.Thread] = None + self.daemon_client = None + self.input_thread: threading.Thread | None = None self.original_terminal_settings = None self._load_config() - self._init_session() + self._init_transport() self._print_help() + @staticmethod + def _resolve_backend() -> str: + backend = os.getenv("OMNI_TRANSPORT_BACKEND", DEFAULT_BACKEND).strip().lower() + if backend not in {"daemon", "direct"}: + return DEFAULT_BACKEND + return backend + def _load_config(self) -> None: config_path = Path(__file__).resolve().parent / "config" / "omnisocket_demo.yaml" if config_path.exists(): @@ -64,7 +88,12 @@ class OmniSocketKeyboardSender: self.peer_id = str(sender_cfg.get("peer_id", "peer-a-ctrl")) self.target_peer = str(sender_cfg.get("target_peer", "peer-b-ctrl")) - def _init_session(self) -> None: + def _init_transport(self) -> None: + if self.backend == "daemon": + daemon_client_cls = _load_daemon_client_api() + self.daemon_client = daemon_client_cls() + return + control_defaults, session_cls = _load_omnisocket_api() self.session = session_cls() self.session.connect( @@ -78,7 +107,11 @@ class OmniSocketKeyboardSender: def _print_help(self) -> None: print("OmniSocket keyboard sender ready") - print(f"Peer: {self.peer_id} -> {self.target_peer} via {self.server_addr}") + print(f"Backend: {self.backend}") + if self.backend == "daemon": + print(f"Daemon socket: {os.getenv('OMNIDAEMON_SOCKET', '/tmp/omnisocket-a-side.sock')}") + else: + print(f"Peer: {self.peer_id} -> {self.target_peer} via {self.server_addr}") print("Keys:") print(" z -> pose_home") print(" c -> pose_hold") @@ -128,6 +161,11 @@ class OmniSocketKeyboardSender: except KeyboardInterrupt: self._handle_ctrl_c() finally: + if self.daemon_client is not None: + try: + self.daemon_client.close() + except Exception: + pass if self.original_terminal_settings is not None: termios.tcsetattr( sys.stdin, termios.TCSADRAIN, self.original_terminal_settings @@ -186,16 +224,34 @@ class OmniSocketKeyboardSender: def _send_event( self, event_code: str, key_name: str, drive_value: float = 1.0 ) -> None: - if self.session is None: + try: + if self.backend == "daemon": + assert self.daemon_client is not None + result = self.daemon_client.send_control_event( + source="keyboard-sender", + event_code=event_code, + drive_value=drive_value, + client_time_ms=int(time.time() * 1000), + ) + print( + f"sent seq={result.get('assigned_seq_id')} event={event_code} " + f"key={key_name} backend=daemon" + ) + else: + if self.session is None: + return + packet = make_control_packet(self.seq_id, event_code, drive_value) + payload = packet.encode() + self.seq_id += 1 + self.session.send(to=self.target_peer, data=payload) + print( + f"sent seq={packet.seq_id} event={event_code} key={key_name} " + f"bytes={len(payload)} backend=direct" + ) + except Exception as error: + print(f"send failed for {event_code}: {error}") return - packet = make_control_packet(self.seq_id, event_code, drive_value) - payload = packet.encode() - self.seq_id += 1 - self.session.send(to=self.target_peer, data=payload) - print( - f"sent seq={packet.seq_id} event={event_code} key={key_name} " - f"bytes={len(payload)}" - ) + if event_code == "session_quit": self.running = False diff --git a/Deploy_Tienkung/udp_loopback/omnisocket_xbox_sender.py b/Deploy_Tienkung/udp_loopback/omnisocket_xbox_sender.py index 328ffac..ed324eb 100644 --- a/Deploy_Tienkung/udp_loopback/omnisocket_xbox_sender.py +++ b/Deploy_Tienkung/udp_loopback/omnisocket_xbox_sender.py @@ -1,8 +1,11 @@ -"""ROS2 Joy -> OmniSocket bridge for Xbox control.""" +"""ROS2 Joy bridge for OmniDaemon or direct OmniSocket control.""" from __future__ import annotations +import os from pathlib import Path +import sys +import time from typing import Dict import rclpy @@ -17,30 +20,48 @@ except ImportError: # pragma: no cover - direct script execution fallback from omnisocket_control import make_control_packet +WORKSPACE_ROOT = Path(__file__).resolve().parents[3] +DEFAULT_BACKEND = "daemon" + + +def _load_daemon_client_api(): + try: + from omnisocket_a_side.client import OmniDaemonClient + except ImportError: + python_dir = WORKSPACE_ROOT / "OmniSocketGo" / "python" + if python_dir.exists(): + sys.path.insert(0, str(python_dir)) + from omnisocket_a_side.client import OmniDaemonClient + return OmniDaemonClient + + def _load_omnisocket_api(): try: from omnisocket import CONTROL_DEFAULTS, Session except ImportError as exc: # pragma: no cover - environment dependent raise RuntimeError( - "omnisocket is not installed. Install it before using " - "omnisocket_xbox_sender.py." + "omnisocket is not installed. Install it before using direct transport mode." ) from exc return CONTROL_DEFAULTS, Session class OmniSocketXboxSender(Node): - """Subscribe to Joy messages and forward them through OmniSocket.""" + """Subscribe to Joy messages and forward them through the selected backend.""" def __init__(self) -> None: super().__init__("omnisocket_xbox_sender") self.config: Dict[str, object] = {} + self.backend = self._resolve_backend() self.seq_id = 0 self.last_buttons: Dict[str, int] = {} self.last_dpad_h = 0.0 self.session = None + self.daemon_client = None + self._last_transport_error = "" + self._last_transport_error_at = 0.0 self._load_config() - self._init_session() + self._init_transport() qos_profile = QoSProfile( reliability=ReliabilityPolicy.RELIABLE, @@ -52,17 +73,25 @@ class OmniSocketXboxSender(Node): ) self.get_logger().info( - f"Forwarding {self.joy_topic} -> OmniSocket " - f"{self.peer_id} -> {self.target_peer}" - ) - self.get_logger().info( - "Buttons: A=WALKAMP X=ZERO Y=STOP START=reset" + f"Forwarding {self.joy_topic} via {self.backend} " + f"from {self.peer_id} to {self.target_peer}" ) + self.get_logger().info("Buttons: A=WALKAMP X=ZERO Y=STOP START=reset") + + @staticmethod + def _resolve_backend() -> str: + backend = os.getenv("OMNI_TRANSPORT_BACKEND", DEFAULT_BACKEND).strip().lower() + if backend not in {"daemon", "direct"}: + return DEFAULT_BACKEND + return backend def destroy_node(self) -> bool: if self.session is not None: self.session.close() self.session = None + if self.daemon_client is not None: + self.daemon_client.close() + self.daemon_client = None return super().destroy_node() def _load_config(self) -> None: @@ -155,7 +184,12 @@ class OmniSocketXboxSender(Node): except (TypeError, ValueError): pass - def _init_session(self) -> None: + def _init_transport(self) -> None: + if self.backend == "daemon": + daemon_client_cls = _load_daemon_client_api() + self.daemon_client = daemon_client_cls() + return + control_defaults, session_cls = _load_omnisocket_api() self.session = session_cls() self.session.connect( @@ -257,14 +291,40 @@ class OmniSocketXboxSender(Node): def _send_event( self, event_code: str, key_name: str, drive_value: float = 1.0 ) -> None: - if self.session is None: - return - packet = make_control_packet(self.seq_id, event_code, drive_value) - self.seq_id += 1 - self.session.send(to=self.target_peer, data=packet.encode()) - self.get_logger().debug( - f"sent seq={packet.seq_id} event={event_code} key={key_name}" - ) + try: + if self.backend == "daemon": + assert self.daemon_client is not None + result = self.daemon_client.send_control_event( + source="xbox-sender", + event_code=event_code, + drive_value=drive_value, + client_time_ms=int(time.time() * 1000), + ) + self.get_logger().debug( + f"sent seq={result.get('assigned_seq_id')} event={event_code} key={key_name}" + ) + else: + if self.session is None: + return + packet = make_control_packet(self.seq_id, event_code, drive_value) + self.seq_id += 1 + self.session.send(to=self.target_peer, data=packet.encode()) + self.get_logger().debug( + f"sent seq={packet.seq_id} event={event_code} key={key_name}" + ) + self._last_transport_error = "" + except Exception as error: + self._report_transport_error(str(error)) + + def _report_transport_error(self, message: str) -> None: + now = time.monotonic() + if ( + message != self._last_transport_error + or now - self._last_transport_error_at >= 2.0 + ): + self.get_logger().warning(f"control send failed via {self.backend}: {message}") + self._last_transport_error = message + self._last_transport_error_at = now def main(args: list[str] | None = None) -> None: