Revert "feat: 把 B 端的 视频/控制 都收口到一个本地 daemon 进程里"

This reverts commit d02acdfce2.
This commit is contained in:
2026-04-01 22:17:05 +08:00
parent d02acdfce2
commit 64edabbb87
2 changed files with 24 additions and 135 deletions

View File

@@ -77,34 +77,3 @@ python3 udp_loopback/omnisocket_xbox_sender.py
- The original UDP files remain unchanged, so you can switch back by restoring `control_tool: udp_loopback`. - The original UDP files remain unchanged, so you can switch back by restoring `control_tool: udp_loopback`.
- OmniSocket keyboard/Xbox mappings are aligned with the cleaned walk-only FSM flow: `ZERO`, `STOP`, and `WALKAMP`. - OmniSocket keyboard/Xbox mappings are aligned with the cleaned walk-only FSM flow: `ZERO`, `STOP`, and `WALKAMP`.
- Keyboard sender supports `4/5/6` for clearing `x/y/yaw` speed independently, and `r` still clears all three axes. - Keyboard sender supports `4/5/6` for clearing `x/y/yaw` speed independently, and `r` still clears all three axes.
## B-side OmniDaemon
The B-side stack now supports a local daemon that owns OmniSocket control receive and manages the video sender worker.
Start the daemon:
```bash
cd OmniSocketGo
make b_side_video_sender
python3 -m pip install -e ./python
python3 -m omnisocket_b_side.daemon --config config/b_side_omnidaemon.yaml
```
Or with the helper script:
```bash
cd OmniSocketGo
./scripts/start_b_side.sh
```
With `OMNI_TRANSPORT_BACKEND=daemon` (the default), `OmniSocketFSMController` connects to `/tmp/omnisocket-b-ctrl.sock` and receives raw `ControlPacket` bytes from the daemon instead of creating its own OmniSocket session.
Fallback to the previous direct mode when needed:
```bash
export OMNI_TRANSPORT_BACKEND=direct
python3 rl_control_node_sim.py
```
In simulation / MuJoCo mode, keep `video_sender.enabled: false` in `OmniSocketGo/config/b_side_omnidaemon.yaml` so the daemon only provides the control path and does not require `/dev/video0`.

View File

@@ -2,11 +2,9 @@
from __future__ import annotations from __future__ import annotations
import os
from pathlib import Path from pathlib import Path
import queue import queue
import struct import struct
import sys
import threading import threading
import time import time
from typing import Dict, Optional from typing import Dict, Optional
@@ -32,24 +30,6 @@ def _load_omnisocket_api():
return CONTROL_DEFAULTS, MSG_TYPE_BINARY, Session return CONTROL_DEFAULTS, MSG_TYPE_BINARY, Session
def _load_b_side_control_client():
try:
from omnisocket_b_side.client import BSideControlClient
except ImportError:
workspace_root = Path(__file__).resolve().parents[3]
python_root = workspace_root / "OmniSocketGo" / "python"
if str(python_root) not in sys.path:
sys.path.insert(0, str(python_root))
try:
from omnisocket_b_side.client import BSideControlClient
except ImportError as exc: # pragma: no cover - environment dependent
raise RuntimeError(
"omnisocket_b_side is not installed. Install it before using "
"OMNI_TRANSPORT_BACKEND=daemon."
) from exc
return BSideControlClient
class OmniSocketFSMFlag(ControlFlag): class OmniSocketFSMFlag(ControlFlag):
"""FSM-facing flag produced from decoded OmniSocket control packets.""" """FSM-facing flag produced from decoded OmniSocket control packets."""
@@ -74,11 +54,7 @@ class OmniSocketFSMController:
self.recv_running = False self.recv_running = False
self.recv_thread: Optional[threading.Thread] = None self.recv_thread: Optional[threading.Thread] = None
self.session = None self.session = None
self.daemon_client = None
self._msg_type_binary = None self._msg_type_binary = None
self.transport_backend = str(
os.getenv("OMNI_TRANSPORT_BACKEND", "daemon")
).strip().lower() or "daemon"
def _load_config(self) -> None: def _load_config(self) -> None:
config_path = Path(__file__).resolve().parent / "config" / "omnisocket_demo.yaml" config_path = Path(__file__).resolve().parent / "config" / "omnisocket_demo.yaml"
@@ -97,9 +73,6 @@ class OmniSocketFSMController:
self.bind_ip = str(transport_cfg.get("bind_ip", "")) self.bind_ip = str(transport_cfg.get("bind_ip", ""))
self.bind_device = str(transport_cfg.get("bind_device", "")) self.bind_device = str(transport_cfg.get("bind_device", ""))
self.peer_id = str(receiver_cfg.get("peer_id", "peer-b-ctrl")) self.peer_id = str(receiver_cfg.get("peer_id", "peer-b-ctrl"))
self.ctrl_socket_path = str(
os.getenv("OMNIBDAEMON_CTRL_SOCKET", "/tmp/omnisocket-b-ctrl.sock")
)
self.initial_lift = float(motion_cfg.get("initial_lift", 0.89)) self.initial_lift = float(motion_cfg.get("initial_lift", 0.89))
self.lift_step = float(motion_cfg.get("lift_step", 0.05)) self.lift_step = float(motion_cfg.get("lift_step", 0.05))
@@ -121,36 +94,25 @@ class OmniSocketFSMController:
self.last_fsm_command_time = 0.0 self.last_fsm_command_time = 0.0
def start(self) -> None: def start(self) -> None:
if self.transport_backend == "daemon": control_defaults, msg_type_binary, session_cls = _load_omnisocket_api()
daemon_client_cls = _load_b_side_control_client() self._msg_type_binary = msg_type_binary
self.daemon_client = daemon_client_cls(socket_path=self.ctrl_socket_path) self.session = session_cls()
self.daemon_client.connect() self.session.connect(
else: server_addr=self.server_addr,
control_defaults, msg_type_binary, session_cls = _load_omnisocket_api() peer_id=self.peer_id,
self._msg_type_binary = msg_type_binary relay_via=self.relay_via,
self.session = session_cls() bind_ip=self.bind_ip,
self.session.connect( bind_device=self.bind_device,
server_addr=self.server_addr, **control_defaults,
peer_id=self.peer_id, )
relay_via=self.relay_via,
bind_ip=self.bind_ip,
bind_device=self.bind_device,
**control_defaults,
)
self.recv_running = True self.recv_running = True
self.recv_thread = threading.Thread(target=self._recv_loop, daemon=True) self.recv_thread = threading.Thread(target=self._recv_loop, daemon=True)
self.recv_thread.start() self.recv_thread.start()
if self.transport_backend == "daemon": print(
print( f"OmniSocket FSM controller listening as {self.peer_id} "
"OmniSocket FSM controller connected to B-side daemon " f"via {self.server_addr}"
f"via {self.ctrl_socket_path}" )
)
else:
print(
f"OmniSocket FSM controller listening as {self.peer_id} "
f"via {self.server_addr}"
)
def stop(self) -> None: def stop(self) -> None:
self.recv_running = False self.recv_running = False
@@ -159,18 +121,9 @@ class OmniSocketFSMController:
if self.session is not None: if self.session is not None:
self.session.close() self.session.close()
self.session = None self.session = None
if self.daemon_client is not None:
self.daemon_client.close()
self.daemon_client = None
print("OmniSocket FSM controller stopped") print("OmniSocket FSM controller stopped")
def _recv_loop(self) -> None: def _recv_loop(self) -> None:
if self.transport_backend == "daemon":
self._recv_loop_daemon()
else:
self._recv_loop_direct()
def _recv_loop_direct(self) -> None:
while self.recv_running and self.session is not None: while self.recv_running and self.session is not None:
item = self.session.recv(timeout_ms=200) item = self.session.recv(timeout_ms=200)
if item is None: if item is None:
@@ -184,53 +137,20 @@ class OmniSocketFSMController:
) )
continue continue
self._enqueue_payload(payload, from_peer=from_peer)
def _recv_loop_daemon(self) -> None:
while self.recv_running:
client = self.daemon_client
if client is None:
return
try: try:
payload = client.recv_control_packet(timeout_ms=200) packet = ControlPacket.decode(payload)
except OSError as exc: except (ValueError, struct.error) as exc:
print(f"[omnisocket_fsm] daemon control socket error: {exc}") print(f"[omnisocket_fsm] drop invalid payload from {from_peer}: {exc}")
try:
client.close()
except OSError:
pass
self.daemon_client = None
if not self.recv_running:
return
time.sleep(0.5)
try:
daemon_client_cls = _load_b_side_control_client()
self.daemon_client = daemon_client_cls(socket_path=self.ctrl_socket_path)
self.daemon_client.connect()
except OSError as reconnect_error:
print(f"[omnisocket_fsm] reconnect daemon socket failed: {reconnect_error}")
time.sleep(0.5)
continue continue
if payload is None:
continue
self._enqueue_payload(payload, from_peer="daemon")
def _enqueue_payload(self, payload: bytes, *, from_peer: str) -> None:
try:
packet = ControlPacket.decode(payload)
except (ValueError, struct.error) as exc:
print(f"[omnisocket_fsm] drop invalid payload from {from_peer}: {exc}")
return
try:
self.packet_queue.put_nowait(packet)
except queue.Full:
try: try:
self.packet_queue.get_nowait()
self.packet_queue.put_nowait(packet) self.packet_queue.put_nowait(packet)
except queue.Empty: except queue.Full:
pass try:
self.packet_queue.get_nowait()
self.packet_queue.put_nowait(packet)
except queue.Empty:
pass
def update_flag(self) -> None: def update_flag(self) -> None:
while not self.packet_queue.empty(): while not self.packet_queue.empty():