Compare commits
2 Commits
dev
...
eb18f29c02
| Author | SHA1 | Date | |
|---|---|---|---|
| eb18f29c02 | |||
| 64edabbb87 |
@@ -77,34 +77,3 @@ python3 udp_loopback/omnisocket_xbox_sender.py
|
||||
- The original UDP files remain unchanged, so you can switch back by restoring `control_tool: udp_loopback`.
|
||||
- OmniSocket keyboard/Xbox mappings are aligned with the cleaned walk-only FSM flow: `ZERO`, `STOP`, and `WALKAMP`.
|
||||
- Keyboard sender supports `4/5/6` for clearing `x/y/yaw` speed independently, and `r` still clears all three axes.
|
||||
|
||||
## B-side OmniDaemon
|
||||
|
||||
The B-side stack now supports a local daemon that owns OmniSocket control receive and manages the video sender worker.
|
||||
|
||||
Start the daemon:
|
||||
|
||||
```bash
|
||||
cd OmniSocketGo
|
||||
make b_side_video_sender
|
||||
python3 -m pip install -e ./python
|
||||
python3 -m omnisocket_b_side.daemon --config config/b_side_omnidaemon.yaml
|
||||
```
|
||||
|
||||
Or with the helper script:
|
||||
|
||||
```bash
|
||||
cd OmniSocketGo
|
||||
./scripts/start_b_side.sh
|
||||
```
|
||||
|
||||
With `OMNI_TRANSPORT_BACKEND=daemon` (the default), `OmniSocketFSMController` connects to `/tmp/omnisocket-b-ctrl.sock` and receives raw `ControlPacket` bytes from the daemon instead of creating its own OmniSocket session.
|
||||
|
||||
Fallback to the previous direct mode when needed:
|
||||
|
||||
```bash
|
||||
export OMNI_TRANSPORT_BACKEND=direct
|
||||
python3 rl_control_node_sim.py
|
||||
```
|
||||
|
||||
In simulation / MuJoCo mode, keep `video_sender.enabled: false` in `OmniSocketGo/config/b_side_omnidaemon.yaml` so the daemon only provides the control path and does not require `/dev/video0`.
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
transport:
|
||||
server_addr: ""
|
||||
server_addr: 81.70.156.140:10909
|
||||
relay_via: 106.55.173.235:10909
|
||||
bind_ip: ""
|
||||
bind_device: ""
|
||||
|
||||
@@ -2,11 +2,9 @@
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
from pathlib import Path
|
||||
import queue
|
||||
import struct
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
from typing import Dict, Optional
|
||||
@@ -32,24 +30,6 @@ def _load_omnisocket_api():
|
||||
return CONTROL_DEFAULTS, MSG_TYPE_BINARY, Session
|
||||
|
||||
|
||||
def _load_b_side_control_client():
|
||||
try:
|
||||
from omnisocket_b_side.client import BSideControlClient
|
||||
except ImportError:
|
||||
workspace_root = Path(__file__).resolve().parents[3]
|
||||
python_root = workspace_root / "OmniSocketGo" / "python"
|
||||
if str(python_root) not in sys.path:
|
||||
sys.path.insert(0, str(python_root))
|
||||
try:
|
||||
from omnisocket_b_side.client import BSideControlClient
|
||||
except ImportError as exc: # pragma: no cover - environment dependent
|
||||
raise RuntimeError(
|
||||
"omnisocket_b_side is not installed. Install it before using "
|
||||
"OMNI_TRANSPORT_BACKEND=daemon."
|
||||
) from exc
|
||||
return BSideControlClient
|
||||
|
||||
|
||||
class OmniSocketFSMFlag(ControlFlag):
|
||||
"""FSM-facing flag produced from decoded OmniSocket control packets."""
|
||||
|
||||
@@ -74,11 +54,7 @@ class OmniSocketFSMController:
|
||||
self.recv_running = False
|
||||
self.recv_thread: Optional[threading.Thread] = None
|
||||
self.session = None
|
||||
self.daemon_client = None
|
||||
self._msg_type_binary = None
|
||||
self.transport_backend = str(
|
||||
os.getenv("OMNI_TRANSPORT_BACKEND", "daemon")
|
||||
).strip().lower() or "daemon"
|
||||
|
||||
def _load_config(self) -> None:
|
||||
config_path = Path(__file__).resolve().parent / "config" / "omnisocket_demo.yaml"
|
||||
@@ -97,9 +73,6 @@ class OmniSocketFSMController:
|
||||
self.bind_ip = str(transport_cfg.get("bind_ip", ""))
|
||||
self.bind_device = str(transport_cfg.get("bind_device", ""))
|
||||
self.peer_id = str(receiver_cfg.get("peer_id", "peer-b-ctrl"))
|
||||
self.ctrl_socket_path = str(
|
||||
os.getenv("OMNIBDAEMON_CTRL_SOCKET", "/tmp/omnisocket-b-ctrl.sock")
|
||||
)
|
||||
|
||||
self.initial_lift = float(motion_cfg.get("initial_lift", 0.89))
|
||||
self.lift_step = float(motion_cfg.get("lift_step", 0.05))
|
||||
@@ -121,36 +94,25 @@ class OmniSocketFSMController:
|
||||
self.last_fsm_command_time = 0.0
|
||||
|
||||
def start(self) -> None:
|
||||
if self.transport_backend == "daemon":
|
||||
daemon_client_cls = _load_b_side_control_client()
|
||||
self.daemon_client = daemon_client_cls(socket_path=self.ctrl_socket_path)
|
||||
self.daemon_client.connect()
|
||||
else:
|
||||
control_defaults, msg_type_binary, session_cls = _load_omnisocket_api()
|
||||
self._msg_type_binary = msg_type_binary
|
||||
self.session = session_cls()
|
||||
self.session.connect(
|
||||
server_addr=self.server_addr,
|
||||
peer_id=self.peer_id,
|
||||
relay_via=self.relay_via,
|
||||
bind_ip=self.bind_ip,
|
||||
bind_device=self.bind_device,
|
||||
**control_defaults,
|
||||
)
|
||||
control_defaults, msg_type_binary, session_cls = _load_omnisocket_api()
|
||||
self._msg_type_binary = msg_type_binary
|
||||
self.session = session_cls()
|
||||
self.session.connect(
|
||||
server_addr=self.server_addr,
|
||||
peer_id=self.peer_id,
|
||||
relay_via=self.relay_via,
|
||||
bind_ip=self.bind_ip,
|
||||
bind_device=self.bind_device,
|
||||
**control_defaults,
|
||||
)
|
||||
|
||||
self.recv_running = True
|
||||
self.recv_thread = threading.Thread(target=self._recv_loop, daemon=True)
|
||||
self.recv_thread.start()
|
||||
if self.transport_backend == "daemon":
|
||||
print(
|
||||
"OmniSocket FSM controller connected to B-side daemon "
|
||||
f"via {self.ctrl_socket_path}"
|
||||
)
|
||||
else:
|
||||
print(
|
||||
f"OmniSocket FSM controller listening as {self.peer_id} "
|
||||
f"via {self.server_addr}"
|
||||
)
|
||||
print(
|
||||
f"OmniSocket FSM controller listening as {self.peer_id} "
|
||||
f"via {self.server_addr}"
|
||||
)
|
||||
|
||||
def stop(self) -> None:
|
||||
self.recv_running = False
|
||||
@@ -159,18 +121,9 @@ class OmniSocketFSMController:
|
||||
if self.session is not None:
|
||||
self.session.close()
|
||||
self.session = None
|
||||
if self.daemon_client is not None:
|
||||
self.daemon_client.close()
|
||||
self.daemon_client = None
|
||||
print("OmniSocket FSM controller stopped")
|
||||
|
||||
def _recv_loop(self) -> None:
|
||||
if self.transport_backend == "daemon":
|
||||
self._recv_loop_daemon()
|
||||
else:
|
||||
self._recv_loop_direct()
|
||||
|
||||
def _recv_loop_direct(self) -> None:
|
||||
while self.recv_running and self.session is not None:
|
||||
item = self.session.recv(timeout_ms=200)
|
||||
if item is None:
|
||||
@@ -184,53 +137,20 @@ class OmniSocketFSMController:
|
||||
)
|
||||
continue
|
||||
|
||||
self._enqueue_payload(payload, from_peer=from_peer)
|
||||
|
||||
def _recv_loop_daemon(self) -> None:
|
||||
while self.recv_running:
|
||||
client = self.daemon_client
|
||||
if client is None:
|
||||
return
|
||||
try:
|
||||
payload = client.recv_control_packet(timeout_ms=200)
|
||||
except OSError as exc:
|
||||
print(f"[omnisocket_fsm] daemon control socket error: {exc}")
|
||||
try:
|
||||
client.close()
|
||||
except OSError:
|
||||
pass
|
||||
self.daemon_client = None
|
||||
if not self.recv_running:
|
||||
return
|
||||
time.sleep(0.5)
|
||||
try:
|
||||
daemon_client_cls = _load_b_side_control_client()
|
||||
self.daemon_client = daemon_client_cls(socket_path=self.ctrl_socket_path)
|
||||
self.daemon_client.connect()
|
||||
except OSError as reconnect_error:
|
||||
print(f"[omnisocket_fsm] reconnect daemon socket failed: {reconnect_error}")
|
||||
time.sleep(0.5)
|
||||
packet = ControlPacket.decode(payload)
|
||||
except (ValueError, struct.error) as exc:
|
||||
print(f"[omnisocket_fsm] drop invalid payload from {from_peer}: {exc}")
|
||||
continue
|
||||
|
||||
if payload is None:
|
||||
continue
|
||||
self._enqueue_payload(payload, from_peer="daemon")
|
||||
|
||||
def _enqueue_payload(self, payload: bytes, *, from_peer: str) -> None:
|
||||
try:
|
||||
packet = ControlPacket.decode(payload)
|
||||
except (ValueError, struct.error) as exc:
|
||||
print(f"[omnisocket_fsm] drop invalid payload from {from_peer}: {exc}")
|
||||
return
|
||||
|
||||
try:
|
||||
self.packet_queue.put_nowait(packet)
|
||||
except queue.Full:
|
||||
try:
|
||||
self.packet_queue.get_nowait()
|
||||
self.packet_queue.put_nowait(packet)
|
||||
except queue.Empty:
|
||||
pass
|
||||
except queue.Full:
|
||||
try:
|
||||
self.packet_queue.get_nowait()
|
||||
self.packet_queue.put_nowait(packet)
|
||||
except queue.Empty:
|
||||
pass
|
||||
|
||||
def update_flag(self) -> None:
|
||||
while not self.packet_queue.empty():
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
"""Keyboard sender that emits control events through OmniDaemon or OmniSocket."""
|
||||
"""Keyboard sender that emits binary control packets over OmniSocket."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
@@ -9,9 +9,8 @@ import signal
|
||||
import sys
|
||||
import termios
|
||||
import threading
|
||||
import time
|
||||
import tty
|
||||
from typing import Dict
|
||||
from typing import Dict, Optional
|
||||
|
||||
import yaml
|
||||
|
||||
@@ -21,55 +20,32 @@ except ImportError: # pragma: no cover - direct script execution fallback
|
||||
from omnisocket_control import make_control_packet
|
||||
|
||||
|
||||
WORKSPACE_ROOT = Path(__file__).resolve().parents[3]
|
||||
DEFAULT_BACKEND = "daemon"
|
||||
|
||||
|
||||
def _load_daemon_client_api():
|
||||
try:
|
||||
from omnisocket_a_side.client import OmniDaemonClient
|
||||
except ImportError:
|
||||
python_dir = WORKSPACE_ROOT / "OmniSocketGo" / "python"
|
||||
if python_dir.exists():
|
||||
sys.path.insert(0, str(python_dir))
|
||||
from omnisocket_a_side.client import OmniDaemonClient
|
||||
return OmniDaemonClient
|
||||
|
||||
|
||||
def _load_omnisocket_api():
|
||||
try:
|
||||
from omnisocket import CONTROL_DEFAULTS, Session
|
||||
except ImportError as exc: # pragma: no cover - environment dependent
|
||||
raise RuntimeError(
|
||||
"omnisocket is not installed. Install it before using direct transport mode."
|
||||
"omnisocket is not installed. Install it before using "
|
||||
"omnisocket_keyboard_sender.py."
|
||||
) from exc
|
||||
return CONTROL_DEFAULTS, Session
|
||||
|
||||
|
||||
class OmniSocketKeyboardSender:
|
||||
"""Standalone keyboard sender for A-side control-plane testing."""
|
||||
"""Standalone keyboard sender for OmniSocket control-plane testing."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self.config: Dict[str, object] = {}
|
||||
self.backend = self._resolve_backend()
|
||||
self.seq_id = 0
|
||||
self.running = False
|
||||
self.session = None
|
||||
self.daemon_client = None
|
||||
self.input_thread: threading.Thread | None = None
|
||||
self.input_thread: Optional[threading.Thread] = None
|
||||
self.original_terminal_settings = None
|
||||
|
||||
self._load_config()
|
||||
self._init_transport()
|
||||
self._init_session()
|
||||
self._print_help()
|
||||
|
||||
@staticmethod
|
||||
def _resolve_backend() -> str:
|
||||
backend = os.getenv("OMNI_TRANSPORT_BACKEND", DEFAULT_BACKEND).strip().lower()
|
||||
if backend not in {"daemon", "direct"}:
|
||||
return DEFAULT_BACKEND
|
||||
return backend
|
||||
|
||||
def _load_config(self) -> None:
|
||||
config_path = Path(__file__).resolve().parent / "config" / "omnisocket_demo.yaml"
|
||||
if config_path.exists():
|
||||
@@ -88,12 +64,7 @@ class OmniSocketKeyboardSender:
|
||||
self.peer_id = str(sender_cfg.get("peer_id", "peer-a-ctrl"))
|
||||
self.target_peer = str(sender_cfg.get("target_peer", "peer-b-ctrl"))
|
||||
|
||||
def _init_transport(self) -> None:
|
||||
if self.backend == "daemon":
|
||||
daemon_client_cls = _load_daemon_client_api()
|
||||
self.daemon_client = daemon_client_cls()
|
||||
return
|
||||
|
||||
def _init_session(self) -> None:
|
||||
control_defaults, session_cls = _load_omnisocket_api()
|
||||
self.session = session_cls()
|
||||
self.session.connect(
|
||||
@@ -107,11 +78,7 @@ class OmniSocketKeyboardSender:
|
||||
|
||||
def _print_help(self) -> None:
|
||||
print("OmniSocket keyboard sender ready")
|
||||
print(f"Backend: {self.backend}")
|
||||
if self.backend == "daemon":
|
||||
print(f"Daemon socket: {os.getenv('OMNIDAEMON_SOCKET', '/tmp/omnisocket-a-side.sock')}")
|
||||
else:
|
||||
print(f"Peer: {self.peer_id} -> {self.target_peer} via {self.server_addr}")
|
||||
print(f"Peer: {self.peer_id} -> {self.target_peer} via {self.server_addr}")
|
||||
print("Keys:")
|
||||
print(" z -> pose_home")
|
||||
print(" c -> pose_hold")
|
||||
@@ -161,11 +128,6 @@ class OmniSocketKeyboardSender:
|
||||
except KeyboardInterrupt:
|
||||
self._handle_ctrl_c()
|
||||
finally:
|
||||
if self.daemon_client is not None:
|
||||
try:
|
||||
self.daemon_client.close()
|
||||
except Exception:
|
||||
pass
|
||||
if self.original_terminal_settings is not None:
|
||||
termios.tcsetattr(
|
||||
sys.stdin, termios.TCSADRAIN, self.original_terminal_settings
|
||||
@@ -224,34 +186,16 @@ class OmniSocketKeyboardSender:
|
||||
def _send_event(
|
||||
self, event_code: str, key_name: str, drive_value: float = 1.0
|
||||
) -> None:
|
||||
try:
|
||||
if self.backend == "daemon":
|
||||
assert self.daemon_client is not None
|
||||
result = self.daemon_client.send_control_event(
|
||||
source="keyboard-sender",
|
||||
event_code=event_code,
|
||||
drive_value=drive_value,
|
||||
client_time_ms=int(time.time() * 1000),
|
||||
)
|
||||
print(
|
||||
f"sent seq={result.get('assigned_seq_id')} event={event_code} "
|
||||
f"key={key_name} backend=daemon"
|
||||
)
|
||||
else:
|
||||
if self.session is None:
|
||||
return
|
||||
packet = make_control_packet(self.seq_id, event_code, drive_value)
|
||||
payload = packet.encode()
|
||||
self.seq_id += 1
|
||||
self.session.send(to=self.target_peer, data=payload)
|
||||
print(
|
||||
f"sent seq={packet.seq_id} event={event_code} key={key_name} "
|
||||
f"bytes={len(payload)} backend=direct"
|
||||
)
|
||||
except Exception as error:
|
||||
print(f"send failed for {event_code}: {error}")
|
||||
if self.session is None:
|
||||
return
|
||||
|
||||
packet = make_control_packet(self.seq_id, event_code, drive_value)
|
||||
payload = packet.encode()
|
||||
self.seq_id += 1
|
||||
self.session.send(to=self.target_peer, data=payload)
|
||||
print(
|
||||
f"sent seq={packet.seq_id} event={event_code} key={key_name} "
|
||||
f"bytes={len(payload)}"
|
||||
)
|
||||
if event_code == "session_quit":
|
||||
self.running = False
|
||||
|
||||
|
||||
@@ -1,11 +1,8 @@
|
||||
"""ROS2 Joy bridge for OmniDaemon or direct OmniSocket control."""
|
||||
"""ROS2 Joy -> OmniSocket bridge for Xbox control."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
from pathlib import Path
|
||||
import sys
|
||||
import time
|
||||
from typing import Dict
|
||||
|
||||
import rclpy
|
||||
@@ -20,48 +17,30 @@ except ImportError: # pragma: no cover - direct script execution fallback
|
||||
from omnisocket_control import make_control_packet
|
||||
|
||||
|
||||
WORKSPACE_ROOT = Path(__file__).resolve().parents[3]
|
||||
DEFAULT_BACKEND = "daemon"
|
||||
|
||||
|
||||
def _load_daemon_client_api():
|
||||
try:
|
||||
from omnisocket_a_side.client import OmniDaemonClient
|
||||
except ImportError:
|
||||
python_dir = WORKSPACE_ROOT / "OmniSocketGo" / "python"
|
||||
if python_dir.exists():
|
||||
sys.path.insert(0, str(python_dir))
|
||||
from omnisocket_a_side.client import OmniDaemonClient
|
||||
return OmniDaemonClient
|
||||
|
||||
|
||||
def _load_omnisocket_api():
|
||||
try:
|
||||
from omnisocket import CONTROL_DEFAULTS, Session
|
||||
except ImportError as exc: # pragma: no cover - environment dependent
|
||||
raise RuntimeError(
|
||||
"omnisocket is not installed. Install it before using direct transport mode."
|
||||
"omnisocket is not installed. Install it before using "
|
||||
"omnisocket_xbox_sender.py."
|
||||
) from exc
|
||||
return CONTROL_DEFAULTS, Session
|
||||
|
||||
|
||||
class OmniSocketXboxSender(Node):
|
||||
"""Subscribe to Joy messages and forward them through the selected backend."""
|
||||
"""Subscribe to Joy messages and forward them through OmniSocket."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
super().__init__("omnisocket_xbox_sender")
|
||||
self.config: Dict[str, object] = {}
|
||||
self.backend = self._resolve_backend()
|
||||
self.seq_id = 0
|
||||
self.last_buttons: Dict[str, int] = {}
|
||||
self.last_dpad_h = 0.0
|
||||
self.session = None
|
||||
self.daemon_client = None
|
||||
self._last_transport_error = ""
|
||||
self._last_transport_error_at = 0.0
|
||||
|
||||
self._load_config()
|
||||
self._init_transport()
|
||||
self._init_session()
|
||||
|
||||
qos_profile = QoSProfile(
|
||||
reliability=ReliabilityPolicy.RELIABLE,
|
||||
@@ -73,25 +52,17 @@ class OmniSocketXboxSender(Node):
|
||||
)
|
||||
|
||||
self.get_logger().info(
|
||||
f"Forwarding {self.joy_topic} via {self.backend} "
|
||||
f"from {self.peer_id} to {self.target_peer}"
|
||||
f"Forwarding {self.joy_topic} -> OmniSocket "
|
||||
f"{self.peer_id} -> {self.target_peer}"
|
||||
)
|
||||
self.get_logger().info(
|
||||
"Buttons: A=WALKAMP X=ZERO Y=STOP START=reset"
|
||||
)
|
||||
self.get_logger().info("Buttons: A=WALKAMP X=ZERO Y=STOP START=reset")
|
||||
|
||||
@staticmethod
|
||||
def _resolve_backend() -> str:
|
||||
backend = os.getenv("OMNI_TRANSPORT_BACKEND", DEFAULT_BACKEND).strip().lower()
|
||||
if backend not in {"daemon", "direct"}:
|
||||
return DEFAULT_BACKEND
|
||||
return backend
|
||||
|
||||
def destroy_node(self) -> bool:
|
||||
if self.session is not None:
|
||||
self.session.close()
|
||||
self.session = None
|
||||
if self.daemon_client is not None:
|
||||
self.daemon_client.close()
|
||||
self.daemon_client = None
|
||||
return super().destroy_node()
|
||||
|
||||
def _load_config(self) -> None:
|
||||
@@ -184,12 +155,7 @@ class OmniSocketXboxSender(Node):
|
||||
except (TypeError, ValueError):
|
||||
pass
|
||||
|
||||
def _init_transport(self) -> None:
|
||||
if self.backend == "daemon":
|
||||
daemon_client_cls = _load_daemon_client_api()
|
||||
self.daemon_client = daemon_client_cls()
|
||||
return
|
||||
|
||||
def _init_session(self) -> None:
|
||||
control_defaults, session_cls = _load_omnisocket_api()
|
||||
self.session = session_cls()
|
||||
self.session.connect(
|
||||
@@ -291,40 +257,14 @@ class OmniSocketXboxSender(Node):
|
||||
def _send_event(
|
||||
self, event_code: str, key_name: str, drive_value: float = 1.0
|
||||
) -> None:
|
||||
try:
|
||||
if self.backend == "daemon":
|
||||
assert self.daemon_client is not None
|
||||
result = self.daemon_client.send_control_event(
|
||||
source="xbox-sender",
|
||||
event_code=event_code,
|
||||
drive_value=drive_value,
|
||||
client_time_ms=int(time.time() * 1000),
|
||||
)
|
||||
self.get_logger().debug(
|
||||
f"sent seq={result.get('assigned_seq_id')} event={event_code} key={key_name}"
|
||||
)
|
||||
else:
|
||||
if self.session is None:
|
||||
return
|
||||
packet = make_control_packet(self.seq_id, event_code, drive_value)
|
||||
self.seq_id += 1
|
||||
self.session.send(to=self.target_peer, data=packet.encode())
|
||||
self.get_logger().debug(
|
||||
f"sent seq={packet.seq_id} event={event_code} key={key_name}"
|
||||
)
|
||||
self._last_transport_error = ""
|
||||
except Exception as error:
|
||||
self._report_transport_error(str(error))
|
||||
|
||||
def _report_transport_error(self, message: str) -> None:
|
||||
now = time.monotonic()
|
||||
if (
|
||||
message != self._last_transport_error
|
||||
or now - self._last_transport_error_at >= 2.0
|
||||
):
|
||||
self.get_logger().warning(f"control send failed via {self.backend}: {message}")
|
||||
self._last_transport_error = message
|
||||
self._last_transport_error_at = now
|
||||
if self.session is None:
|
||||
return
|
||||
packet = make_control_packet(self.seq_id, event_code, drive_value)
|
||||
self.seq_id += 1
|
||||
self.session.send(to=self.target_peer, data=packet.encode())
|
||||
self.get_logger().debug(
|
||||
f"sent seq={packet.seq_id} event={event_code} key={key_name}"
|
||||
)
|
||||
|
||||
|
||||
def main(args: list[str] | None = None) -> None:
|
||||
|
||||
Reference in New Issue
Block a user