Compare commits

..

2 Commits

5 changed files with 62 additions and 289 deletions

View File

@@ -77,34 +77,3 @@ python3 udp_loopback/omnisocket_xbox_sender.py
- The original UDP files remain unchanged, so you can switch back by restoring `control_tool: udp_loopback`. - The original UDP files remain unchanged, so you can switch back by restoring `control_tool: udp_loopback`.
- OmniSocket keyboard/Xbox mappings are aligned with the cleaned walk-only FSM flow: `ZERO`, `STOP`, and `WALKAMP`. - OmniSocket keyboard/Xbox mappings are aligned with the cleaned walk-only FSM flow: `ZERO`, `STOP`, and `WALKAMP`.
- Keyboard sender supports `4/5/6` for clearing `x/y/yaw` speed independently, and `r` still clears all three axes. - Keyboard sender supports `4/5/6` for clearing `x/y/yaw` speed independently, and `r` still clears all three axes.
## B-side OmniDaemon
The B-side stack now supports a local daemon that owns OmniSocket control receive and manages the video sender worker.
Start the daemon:
```bash
cd OmniSocketGo
make b_side_video_sender
python3 -m pip install -e ./python
python3 -m omnisocket_b_side.daemon --config config/b_side_omnidaemon.yaml
```
Or with the helper script:
```bash
cd OmniSocketGo
./scripts/start_b_side.sh
```
With `OMNI_TRANSPORT_BACKEND=daemon` (the default), `OmniSocketFSMController` connects to `/tmp/omnisocket-b-ctrl.sock` and receives raw `ControlPacket` bytes from the daemon instead of creating its own OmniSocket session.
Fallback to the previous direct mode when needed:
```bash
export OMNI_TRANSPORT_BACKEND=direct
python3 rl_control_node_sim.py
```
In simulation / MuJoCo mode, keep `video_sender.enabled: false` in `OmniSocketGo/config/b_side_omnidaemon.yaml` so the daemon only provides the control path and does not require `/dev/video0`.

View File

@@ -1,5 +1,5 @@
transport: transport:
server_addr: "" server_addr: 81.70.156.140:10909
relay_via: 106.55.173.235:10909 relay_via: 106.55.173.235:10909
bind_ip: "" bind_ip: ""
bind_device: "" bind_device: ""

View File

@@ -2,11 +2,9 @@
from __future__ import annotations from __future__ import annotations
import os
from pathlib import Path from pathlib import Path
import queue import queue
import struct import struct
import sys
import threading import threading
import time import time
from typing import Dict, Optional from typing import Dict, Optional
@@ -32,24 +30,6 @@ def _load_omnisocket_api():
return CONTROL_DEFAULTS, MSG_TYPE_BINARY, Session return CONTROL_DEFAULTS, MSG_TYPE_BINARY, Session
def _load_b_side_control_client():
try:
from omnisocket_b_side.client import BSideControlClient
except ImportError:
workspace_root = Path(__file__).resolve().parents[3]
python_root = workspace_root / "OmniSocketGo" / "python"
if str(python_root) not in sys.path:
sys.path.insert(0, str(python_root))
try:
from omnisocket_b_side.client import BSideControlClient
except ImportError as exc: # pragma: no cover - environment dependent
raise RuntimeError(
"omnisocket_b_side is not installed. Install it before using "
"OMNI_TRANSPORT_BACKEND=daemon."
) from exc
return BSideControlClient
class OmniSocketFSMFlag(ControlFlag): class OmniSocketFSMFlag(ControlFlag):
"""FSM-facing flag produced from decoded OmniSocket control packets.""" """FSM-facing flag produced from decoded OmniSocket control packets."""
@@ -74,11 +54,7 @@ class OmniSocketFSMController:
self.recv_running = False self.recv_running = False
self.recv_thread: Optional[threading.Thread] = None self.recv_thread: Optional[threading.Thread] = None
self.session = None self.session = None
self.daemon_client = None
self._msg_type_binary = None self._msg_type_binary = None
self.transport_backend = str(
os.getenv("OMNI_TRANSPORT_BACKEND", "daemon")
).strip().lower() or "daemon"
def _load_config(self) -> None: def _load_config(self) -> None:
config_path = Path(__file__).resolve().parent / "config" / "omnisocket_demo.yaml" config_path = Path(__file__).resolve().parent / "config" / "omnisocket_demo.yaml"
@@ -97,9 +73,6 @@ class OmniSocketFSMController:
self.bind_ip = str(transport_cfg.get("bind_ip", "")) self.bind_ip = str(transport_cfg.get("bind_ip", ""))
self.bind_device = str(transport_cfg.get("bind_device", "")) self.bind_device = str(transport_cfg.get("bind_device", ""))
self.peer_id = str(receiver_cfg.get("peer_id", "peer-b-ctrl")) self.peer_id = str(receiver_cfg.get("peer_id", "peer-b-ctrl"))
self.ctrl_socket_path = str(
os.getenv("OMNIBDAEMON_CTRL_SOCKET", "/tmp/omnisocket-b-ctrl.sock")
)
self.initial_lift = float(motion_cfg.get("initial_lift", 0.89)) self.initial_lift = float(motion_cfg.get("initial_lift", 0.89))
self.lift_step = float(motion_cfg.get("lift_step", 0.05)) self.lift_step = float(motion_cfg.get("lift_step", 0.05))
@@ -121,36 +94,25 @@ class OmniSocketFSMController:
self.last_fsm_command_time = 0.0 self.last_fsm_command_time = 0.0
def start(self) -> None: def start(self) -> None:
if self.transport_backend == "daemon": control_defaults, msg_type_binary, session_cls = _load_omnisocket_api()
daemon_client_cls = _load_b_side_control_client() self._msg_type_binary = msg_type_binary
self.daemon_client = daemon_client_cls(socket_path=self.ctrl_socket_path) self.session = session_cls()
self.daemon_client.connect() self.session.connect(
else: server_addr=self.server_addr,
control_defaults, msg_type_binary, session_cls = _load_omnisocket_api() peer_id=self.peer_id,
self._msg_type_binary = msg_type_binary relay_via=self.relay_via,
self.session = session_cls() bind_ip=self.bind_ip,
self.session.connect( bind_device=self.bind_device,
server_addr=self.server_addr, **control_defaults,
peer_id=self.peer_id, )
relay_via=self.relay_via,
bind_ip=self.bind_ip,
bind_device=self.bind_device,
**control_defaults,
)
self.recv_running = True self.recv_running = True
self.recv_thread = threading.Thread(target=self._recv_loop, daemon=True) self.recv_thread = threading.Thread(target=self._recv_loop, daemon=True)
self.recv_thread.start() self.recv_thread.start()
if self.transport_backend == "daemon": print(
print( f"OmniSocket FSM controller listening as {self.peer_id} "
"OmniSocket FSM controller connected to B-side daemon " f"via {self.server_addr}"
f"via {self.ctrl_socket_path}" )
)
else:
print(
f"OmniSocket FSM controller listening as {self.peer_id} "
f"via {self.server_addr}"
)
def stop(self) -> None: def stop(self) -> None:
self.recv_running = False self.recv_running = False
@@ -159,18 +121,9 @@ class OmniSocketFSMController:
if self.session is not None: if self.session is not None:
self.session.close() self.session.close()
self.session = None self.session = None
if self.daemon_client is not None:
self.daemon_client.close()
self.daemon_client = None
print("OmniSocket FSM controller stopped") print("OmniSocket FSM controller stopped")
def _recv_loop(self) -> None: def _recv_loop(self) -> None:
if self.transport_backend == "daemon":
self._recv_loop_daemon()
else:
self._recv_loop_direct()
def _recv_loop_direct(self) -> None:
while self.recv_running and self.session is not None: while self.recv_running and self.session is not None:
item = self.session.recv(timeout_ms=200) item = self.session.recv(timeout_ms=200)
if item is None: if item is None:
@@ -184,53 +137,20 @@ class OmniSocketFSMController:
) )
continue continue
self._enqueue_payload(payload, from_peer=from_peer)
def _recv_loop_daemon(self) -> None:
while self.recv_running:
client = self.daemon_client
if client is None:
return
try: try:
payload = client.recv_control_packet(timeout_ms=200) packet = ControlPacket.decode(payload)
except OSError as exc: except (ValueError, struct.error) as exc:
print(f"[omnisocket_fsm] daemon control socket error: {exc}") print(f"[omnisocket_fsm] drop invalid payload from {from_peer}: {exc}")
try:
client.close()
except OSError:
pass
self.daemon_client = None
if not self.recv_running:
return
time.sleep(0.5)
try:
daemon_client_cls = _load_b_side_control_client()
self.daemon_client = daemon_client_cls(socket_path=self.ctrl_socket_path)
self.daemon_client.connect()
except OSError as reconnect_error:
print(f"[omnisocket_fsm] reconnect daemon socket failed: {reconnect_error}")
time.sleep(0.5)
continue continue
if payload is None:
continue
self._enqueue_payload(payload, from_peer="daemon")
def _enqueue_payload(self, payload: bytes, *, from_peer: str) -> None:
try:
packet = ControlPacket.decode(payload)
except (ValueError, struct.error) as exc:
print(f"[omnisocket_fsm] drop invalid payload from {from_peer}: {exc}")
return
try:
self.packet_queue.put_nowait(packet)
except queue.Full:
try: try:
self.packet_queue.get_nowait()
self.packet_queue.put_nowait(packet) self.packet_queue.put_nowait(packet)
except queue.Empty: except queue.Full:
pass try:
self.packet_queue.get_nowait()
self.packet_queue.put_nowait(packet)
except queue.Empty:
pass
def update_flag(self) -> None: def update_flag(self) -> None:
while not self.packet_queue.empty(): while not self.packet_queue.empty():

View File

@@ -1,4 +1,4 @@
"""Keyboard sender that emits control events through OmniDaemon or OmniSocket.""" """Keyboard sender that emits binary control packets over OmniSocket."""
from __future__ import annotations from __future__ import annotations
@@ -9,9 +9,8 @@ import signal
import sys import sys
import termios import termios
import threading import threading
import time
import tty import tty
from typing import Dict from typing import Dict, Optional
import yaml import yaml
@@ -21,55 +20,32 @@ except ImportError: # pragma: no cover - direct script execution fallback
from omnisocket_control import make_control_packet from omnisocket_control import make_control_packet
WORKSPACE_ROOT = Path(__file__).resolve().parents[3]
DEFAULT_BACKEND = "daemon"
def _load_daemon_client_api():
try:
from omnisocket_a_side.client import OmniDaemonClient
except ImportError:
python_dir = WORKSPACE_ROOT / "OmniSocketGo" / "python"
if python_dir.exists():
sys.path.insert(0, str(python_dir))
from omnisocket_a_side.client import OmniDaemonClient
return OmniDaemonClient
def _load_omnisocket_api(): def _load_omnisocket_api():
try: try:
from omnisocket import CONTROL_DEFAULTS, Session from omnisocket import CONTROL_DEFAULTS, Session
except ImportError as exc: # pragma: no cover - environment dependent except ImportError as exc: # pragma: no cover - environment dependent
raise RuntimeError( raise RuntimeError(
"omnisocket is not installed. Install it before using direct transport mode." "omnisocket is not installed. Install it before using "
"omnisocket_keyboard_sender.py."
) from exc ) from exc
return CONTROL_DEFAULTS, Session return CONTROL_DEFAULTS, Session
class OmniSocketKeyboardSender: class OmniSocketKeyboardSender:
"""Standalone keyboard sender for A-side control-plane testing.""" """Standalone keyboard sender for OmniSocket control-plane testing."""
def __init__(self) -> None: def __init__(self) -> None:
self.config: Dict[str, object] = {} self.config: Dict[str, object] = {}
self.backend = self._resolve_backend()
self.seq_id = 0 self.seq_id = 0
self.running = False self.running = False
self.session = None self.session = None
self.daemon_client = None self.input_thread: Optional[threading.Thread] = None
self.input_thread: threading.Thread | None = None
self.original_terminal_settings = None self.original_terminal_settings = None
self._load_config() self._load_config()
self._init_transport() self._init_session()
self._print_help() self._print_help()
@staticmethod
def _resolve_backend() -> str:
backend = os.getenv("OMNI_TRANSPORT_BACKEND", DEFAULT_BACKEND).strip().lower()
if backend not in {"daemon", "direct"}:
return DEFAULT_BACKEND
return backend
def _load_config(self) -> None: def _load_config(self) -> None:
config_path = Path(__file__).resolve().parent / "config" / "omnisocket_demo.yaml" config_path = Path(__file__).resolve().parent / "config" / "omnisocket_demo.yaml"
if config_path.exists(): if config_path.exists():
@@ -88,12 +64,7 @@ class OmniSocketKeyboardSender:
self.peer_id = str(sender_cfg.get("peer_id", "peer-a-ctrl")) self.peer_id = str(sender_cfg.get("peer_id", "peer-a-ctrl"))
self.target_peer = str(sender_cfg.get("target_peer", "peer-b-ctrl")) self.target_peer = str(sender_cfg.get("target_peer", "peer-b-ctrl"))
def _init_transport(self) -> None: def _init_session(self) -> None:
if self.backend == "daemon":
daemon_client_cls = _load_daemon_client_api()
self.daemon_client = daemon_client_cls()
return
control_defaults, session_cls = _load_omnisocket_api() control_defaults, session_cls = _load_omnisocket_api()
self.session = session_cls() self.session = session_cls()
self.session.connect( self.session.connect(
@@ -107,11 +78,7 @@ class OmniSocketKeyboardSender:
def _print_help(self) -> None: def _print_help(self) -> None:
print("OmniSocket keyboard sender ready") print("OmniSocket keyboard sender ready")
print(f"Backend: {self.backend}") print(f"Peer: {self.peer_id} -> {self.target_peer} via {self.server_addr}")
if self.backend == "daemon":
print(f"Daemon socket: {os.getenv('OMNIDAEMON_SOCKET', '/tmp/omnisocket-a-side.sock')}")
else:
print(f"Peer: {self.peer_id} -> {self.target_peer} via {self.server_addr}")
print("Keys:") print("Keys:")
print(" z -> pose_home") print(" z -> pose_home")
print(" c -> pose_hold") print(" c -> pose_hold")
@@ -161,11 +128,6 @@ class OmniSocketKeyboardSender:
except KeyboardInterrupt: except KeyboardInterrupt:
self._handle_ctrl_c() self._handle_ctrl_c()
finally: finally:
if self.daemon_client is not None:
try:
self.daemon_client.close()
except Exception:
pass
if self.original_terminal_settings is not None: if self.original_terminal_settings is not None:
termios.tcsetattr( termios.tcsetattr(
sys.stdin, termios.TCSADRAIN, self.original_terminal_settings sys.stdin, termios.TCSADRAIN, self.original_terminal_settings
@@ -224,34 +186,16 @@ class OmniSocketKeyboardSender:
def _send_event( def _send_event(
self, event_code: str, key_name: str, drive_value: float = 1.0 self, event_code: str, key_name: str, drive_value: float = 1.0
) -> None: ) -> None:
try: if self.session is None:
if self.backend == "daemon":
assert self.daemon_client is not None
result = self.daemon_client.send_control_event(
source="keyboard-sender",
event_code=event_code,
drive_value=drive_value,
client_time_ms=int(time.time() * 1000),
)
print(
f"sent seq={result.get('assigned_seq_id')} event={event_code} "
f"key={key_name} backend=daemon"
)
else:
if self.session is None:
return
packet = make_control_packet(self.seq_id, event_code, drive_value)
payload = packet.encode()
self.seq_id += 1
self.session.send(to=self.target_peer, data=payload)
print(
f"sent seq={packet.seq_id} event={event_code} key={key_name} "
f"bytes={len(payload)} backend=direct"
)
except Exception as error:
print(f"send failed for {event_code}: {error}")
return return
packet = make_control_packet(self.seq_id, event_code, drive_value)
payload = packet.encode()
self.seq_id += 1
self.session.send(to=self.target_peer, data=payload)
print(
f"sent seq={packet.seq_id} event={event_code} key={key_name} "
f"bytes={len(payload)}"
)
if event_code == "session_quit": if event_code == "session_quit":
self.running = False self.running = False

View File

@@ -1,11 +1,8 @@
"""ROS2 Joy bridge for OmniDaemon or direct OmniSocket control.""" """ROS2 Joy -> OmniSocket bridge for Xbox control."""
from __future__ import annotations from __future__ import annotations
import os
from pathlib import Path from pathlib import Path
import sys
import time
from typing import Dict from typing import Dict
import rclpy import rclpy
@@ -20,48 +17,30 @@ except ImportError: # pragma: no cover - direct script execution fallback
from omnisocket_control import make_control_packet from omnisocket_control import make_control_packet
WORKSPACE_ROOT = Path(__file__).resolve().parents[3]
DEFAULT_BACKEND = "daemon"
def _load_daemon_client_api():
try:
from omnisocket_a_side.client import OmniDaemonClient
except ImportError:
python_dir = WORKSPACE_ROOT / "OmniSocketGo" / "python"
if python_dir.exists():
sys.path.insert(0, str(python_dir))
from omnisocket_a_side.client import OmniDaemonClient
return OmniDaemonClient
def _load_omnisocket_api(): def _load_omnisocket_api():
try: try:
from omnisocket import CONTROL_DEFAULTS, Session from omnisocket import CONTROL_DEFAULTS, Session
except ImportError as exc: # pragma: no cover - environment dependent except ImportError as exc: # pragma: no cover - environment dependent
raise RuntimeError( raise RuntimeError(
"omnisocket is not installed. Install it before using direct transport mode." "omnisocket is not installed. Install it before using "
"omnisocket_xbox_sender.py."
) from exc ) from exc
return CONTROL_DEFAULTS, Session return CONTROL_DEFAULTS, Session
class OmniSocketXboxSender(Node): class OmniSocketXboxSender(Node):
"""Subscribe to Joy messages and forward them through the selected backend.""" """Subscribe to Joy messages and forward them through OmniSocket."""
def __init__(self) -> None: def __init__(self) -> None:
super().__init__("omnisocket_xbox_sender") super().__init__("omnisocket_xbox_sender")
self.config: Dict[str, object] = {} self.config: Dict[str, object] = {}
self.backend = self._resolve_backend()
self.seq_id = 0 self.seq_id = 0
self.last_buttons: Dict[str, int] = {} self.last_buttons: Dict[str, int] = {}
self.last_dpad_h = 0.0 self.last_dpad_h = 0.0
self.session = None self.session = None
self.daemon_client = None
self._last_transport_error = ""
self._last_transport_error_at = 0.0
self._load_config() self._load_config()
self._init_transport() self._init_session()
qos_profile = QoSProfile( qos_profile = QoSProfile(
reliability=ReliabilityPolicy.RELIABLE, reliability=ReliabilityPolicy.RELIABLE,
@@ -73,25 +52,17 @@ class OmniSocketXboxSender(Node):
) )
self.get_logger().info( self.get_logger().info(
f"Forwarding {self.joy_topic} via {self.backend} " f"Forwarding {self.joy_topic} -> OmniSocket "
f"from {self.peer_id} to {self.target_peer}" f"{self.peer_id} -> {self.target_peer}"
)
self.get_logger().info(
"Buttons: A=WALKAMP X=ZERO Y=STOP START=reset"
) )
self.get_logger().info("Buttons: A=WALKAMP X=ZERO Y=STOP START=reset")
@staticmethod
def _resolve_backend() -> str:
backend = os.getenv("OMNI_TRANSPORT_BACKEND", DEFAULT_BACKEND).strip().lower()
if backend not in {"daemon", "direct"}:
return DEFAULT_BACKEND
return backend
def destroy_node(self) -> bool: def destroy_node(self) -> bool:
if self.session is not None: if self.session is not None:
self.session.close() self.session.close()
self.session = None self.session = None
if self.daemon_client is not None:
self.daemon_client.close()
self.daemon_client = None
return super().destroy_node() return super().destroy_node()
def _load_config(self) -> None: def _load_config(self) -> None:
@@ -184,12 +155,7 @@ class OmniSocketXboxSender(Node):
except (TypeError, ValueError): except (TypeError, ValueError):
pass pass
def _init_transport(self) -> None: def _init_session(self) -> None:
if self.backend == "daemon":
daemon_client_cls = _load_daemon_client_api()
self.daemon_client = daemon_client_cls()
return
control_defaults, session_cls = _load_omnisocket_api() control_defaults, session_cls = _load_omnisocket_api()
self.session = session_cls() self.session = session_cls()
self.session.connect( self.session.connect(
@@ -291,40 +257,14 @@ class OmniSocketXboxSender(Node):
def _send_event( def _send_event(
self, event_code: str, key_name: str, drive_value: float = 1.0 self, event_code: str, key_name: str, drive_value: float = 1.0
) -> None: ) -> None:
try: if self.session is None:
if self.backend == "daemon": return
assert self.daemon_client is not None packet = make_control_packet(self.seq_id, event_code, drive_value)
result = self.daemon_client.send_control_event( self.seq_id += 1
source="xbox-sender", self.session.send(to=self.target_peer, data=packet.encode())
event_code=event_code, self.get_logger().debug(
drive_value=drive_value, f"sent seq={packet.seq_id} event={event_code} key={key_name}"
client_time_ms=int(time.time() * 1000), )
)
self.get_logger().debug(
f"sent seq={result.get('assigned_seq_id')} event={event_code} key={key_name}"
)
else:
if self.session is None:
return
packet = make_control_packet(self.seq_id, event_code, drive_value)
self.seq_id += 1
self.session.send(to=self.target_peer, data=packet.encode())
self.get_logger().debug(
f"sent seq={packet.seq_id} event={event_code} key={key_name}"
)
self._last_transport_error = ""
except Exception as error:
self._report_transport_error(str(error))
def _report_transport_error(self, message: str) -> None:
now = time.monotonic()
if (
message != self._last_transport_error
or now - self._last_transport_error_at >= 2.0
):
self.get_logger().warning(f"control send failed via {self.backend}: {message}")
self._last_transport_error = message
self._last_transport_error_at = now
def main(args: list[str] | None = None) -> None: def main(args: list[str] | None = None) -> None: