diff --git a/Deploy_Tienkung/udp_loopback/README_omnisocket.md b/Deploy_Tienkung/udp_loopback/README_omnisocket.md index 6b9e8e7..e58a66d 100644 --- a/Deploy_Tienkung/udp_loopback/README_omnisocket.md +++ b/Deploy_Tienkung/udp_loopback/README_omnisocket.md @@ -77,3 +77,34 @@ python3 udp_loopback/omnisocket_xbox_sender.py - The original UDP files remain unchanged, so you can switch back by restoring `control_tool: udp_loopback`. - OmniSocket keyboard/Xbox mappings are aligned with the cleaned walk-only FSM flow: `ZERO`, `STOP`, and `WALKAMP`. - Keyboard sender supports `4/5/6` for clearing `x/y/yaw` speed independently, and `r` still clears all three axes. + +## B-side OmniDaemon + +The B-side stack now supports a local daemon that owns OmniSocket control receive and manages the video sender worker. + +Start the daemon: + +```bash +cd OmniSocketGo +make b_side_video_sender +python3 -m pip install -e ./python +python3 -m omnisocket_b_side.daemon --config config/b_side_omnidaemon.yaml +``` + +Or with the helper script: + +```bash +cd OmniSocketGo +./scripts/start_b_side.sh +``` + +With `OMNI_TRANSPORT_BACKEND=daemon` (the default), `OmniSocketFSMController` connects to `/tmp/omnisocket-b-ctrl.sock` and receives raw `ControlPacket` bytes from the daemon instead of creating its own OmniSocket session. + +Fallback to the previous direct mode when needed: + +```bash +export OMNI_TRANSPORT_BACKEND=direct +python3 rl_control_node_sim.py +``` + +In simulation / MuJoCo mode, keep `video_sender.enabled: false` in `OmniSocketGo/config/b_side_omnidaemon.yaml` so the daemon only provides the control path and does not require `/dev/video0`. diff --git a/Deploy_Tienkung/udp_loopback/omnisocket_fsm_controller.py b/Deploy_Tienkung/udp_loopback/omnisocket_fsm_controller.py index 7d2eec2..54230c6 100644 --- a/Deploy_Tienkung/udp_loopback/omnisocket_fsm_controller.py +++ b/Deploy_Tienkung/udp_loopback/omnisocket_fsm_controller.py @@ -2,9 +2,11 @@ from __future__ import annotations +import os from pathlib import Path import queue import struct +import sys import threading import time from typing import Dict, Optional @@ -30,6 +32,24 @@ def _load_omnisocket_api(): return CONTROL_DEFAULTS, MSG_TYPE_BINARY, Session +def _load_b_side_control_client(): + try: + from omnisocket_b_side.client import BSideControlClient + except ImportError: + workspace_root = Path(__file__).resolve().parents[3] + python_root = workspace_root / "OmniSocketGo" / "python" + if str(python_root) not in sys.path: + sys.path.insert(0, str(python_root)) + try: + from omnisocket_b_side.client import BSideControlClient + except ImportError as exc: # pragma: no cover - environment dependent + raise RuntimeError( + "omnisocket_b_side is not installed. Install it before using " + "OMNI_TRANSPORT_BACKEND=daemon." + ) from exc + return BSideControlClient + + class OmniSocketFSMFlag(ControlFlag): """FSM-facing flag produced from decoded OmniSocket control packets.""" @@ -54,7 +74,11 @@ class OmniSocketFSMController: self.recv_running = False self.recv_thread: Optional[threading.Thread] = None self.session = None + self.daemon_client = None self._msg_type_binary = None + self.transport_backend = str( + os.getenv("OMNI_TRANSPORT_BACKEND", "daemon") + ).strip().lower() or "daemon" def _load_config(self) -> None: config_path = Path(__file__).resolve().parent / "config" / "omnisocket_demo.yaml" @@ -73,6 +97,9 @@ class OmniSocketFSMController: self.bind_ip = str(transport_cfg.get("bind_ip", "")) self.bind_device = str(transport_cfg.get("bind_device", "")) self.peer_id = str(receiver_cfg.get("peer_id", "peer-b-ctrl")) + self.ctrl_socket_path = str( + os.getenv("OMNIBDAEMON_CTRL_SOCKET", "/tmp/omnisocket-b-ctrl.sock") + ) self.initial_lift = float(motion_cfg.get("initial_lift", 0.89)) self.lift_step = float(motion_cfg.get("lift_step", 0.05)) @@ -94,25 +121,36 @@ class OmniSocketFSMController: self.last_fsm_command_time = 0.0 def start(self) -> None: - control_defaults, msg_type_binary, session_cls = _load_omnisocket_api() - self._msg_type_binary = msg_type_binary - self.session = session_cls() - self.session.connect( - server_addr=self.server_addr, - peer_id=self.peer_id, - relay_via=self.relay_via, - bind_ip=self.bind_ip, - bind_device=self.bind_device, - **control_defaults, - ) + if self.transport_backend == "daemon": + daemon_client_cls = _load_b_side_control_client() + self.daemon_client = daemon_client_cls(socket_path=self.ctrl_socket_path) + self.daemon_client.connect() + else: + control_defaults, msg_type_binary, session_cls = _load_omnisocket_api() + self._msg_type_binary = msg_type_binary + self.session = session_cls() + self.session.connect( + server_addr=self.server_addr, + peer_id=self.peer_id, + relay_via=self.relay_via, + bind_ip=self.bind_ip, + bind_device=self.bind_device, + **control_defaults, + ) self.recv_running = True self.recv_thread = threading.Thread(target=self._recv_loop, daemon=True) self.recv_thread.start() - print( - f"OmniSocket FSM controller listening as {self.peer_id} " - f"via {self.server_addr}" - ) + if self.transport_backend == "daemon": + print( + "OmniSocket FSM controller connected to B-side daemon " + f"via {self.ctrl_socket_path}" + ) + else: + print( + f"OmniSocket FSM controller listening as {self.peer_id} " + f"via {self.server_addr}" + ) def stop(self) -> None: self.recv_running = False @@ -121,9 +159,18 @@ class OmniSocketFSMController: if self.session is not None: self.session.close() self.session = None + if self.daemon_client is not None: + self.daemon_client.close() + self.daemon_client = None print("OmniSocket FSM controller stopped") def _recv_loop(self) -> None: + if self.transport_backend == "daemon": + self._recv_loop_daemon() + else: + self._recv_loop_direct() + + def _recv_loop_direct(self) -> None: while self.recv_running and self.session is not None: item = self.session.recv(timeout_ms=200) if item is None: @@ -137,20 +184,53 @@ class OmniSocketFSMController: ) continue + self._enqueue_payload(payload, from_peer=from_peer) + + def _recv_loop_daemon(self) -> None: + while self.recv_running: + client = self.daemon_client + if client is None: + return try: - packet = ControlPacket.decode(payload) - except (ValueError, struct.error) as exc: - print(f"[omnisocket_fsm] drop invalid payload from {from_peer}: {exc}") + payload = client.recv_control_packet(timeout_ms=200) + except OSError as exc: + print(f"[omnisocket_fsm] daemon control socket error: {exc}") + try: + client.close() + except OSError: + pass + self.daemon_client = None + if not self.recv_running: + return + time.sleep(0.5) + try: + daemon_client_cls = _load_b_side_control_client() + self.daemon_client = daemon_client_cls(socket_path=self.ctrl_socket_path) + self.daemon_client.connect() + except OSError as reconnect_error: + print(f"[omnisocket_fsm] reconnect daemon socket failed: {reconnect_error}") + time.sleep(0.5) continue + if payload is None: + continue + self._enqueue_payload(payload, from_peer="daemon") + + def _enqueue_payload(self, payload: bytes, *, from_peer: str) -> None: + try: + packet = ControlPacket.decode(payload) + except (ValueError, struct.error) as exc: + print(f"[omnisocket_fsm] drop invalid payload from {from_peer}: {exc}") + return + + try: + self.packet_queue.put_nowait(packet) + except queue.Full: try: + self.packet_queue.get_nowait() self.packet_queue.put_nowait(packet) - except queue.Full: - try: - self.packet_queue.get_nowait() - self.packet_queue.put_nowait(packet) - except queue.Empty: - pass + except queue.Empty: + pass def update_flag(self) -> None: while not self.packet_queue.empty():