feat: 把 A 端的 Session/KCP/视频/控制 都收口到一个本地 daemon 进程里,Django 和输入发送端都改成通过本机 UDS HTTP 去访问它,同时补齐了观测、性能和可用性上的几个关键问题。
This commit is contained in:
9
python/omnisocket_a_side/__init__.py
Normal file
9
python/omnisocket_a_side/__init__.py
Normal file
@@ -0,0 +1,9 @@
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
PACKAGE_ROOT = Path(__file__).resolve().parent
|
||||
PYTHON_ROOT = PACKAGE_ROOT.parent
|
||||
REPO_ROOT = PYTHON_ROOT.parent
|
||||
DEFAULT_SOCKET_PATH = "/tmp/omnisocket-a-side.sock"
|
||||
DEFAULT_CONFIG_PATH = REPO_ROOT / "config" / "a_side_omnidaemon.yaml"
|
||||
VERSION = "0.1.0"
|
||||
5
python/omnisocket_a_side/__main__.py
Normal file
5
python/omnisocket_a_side/__main__.py
Normal file
@@ -0,0 +1,5 @@
|
||||
from .daemon import main
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
149
python/omnisocket_a_side/client.py
Normal file
149
python/omnisocket_a_side/client.py
Normal file
@@ -0,0 +1,149 @@
|
||||
"""Local Unix-domain HTTP client for the A-side OmniDaemon."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import http.client
|
||||
import json
|
||||
import os
|
||||
import socket
|
||||
import threading
|
||||
from typing import Any
|
||||
|
||||
from . import DEFAULT_SOCKET_PATH
|
||||
|
||||
|
||||
class OmniDaemonError(RuntimeError):
|
||||
def __init__(self, message: str, status_code: int | None = None) -> None:
|
||||
super().__init__(message)
|
||||
self.status_code = status_code
|
||||
|
||||
|
||||
class UnixHTTPConnection(http.client.HTTPConnection):
|
||||
def __init__(self, socket_path: str, timeout: float = 2.0) -> None:
|
||||
super().__init__("localhost", timeout=timeout)
|
||||
self.socket_path = socket_path
|
||||
|
||||
def connect(self) -> None: # pragma: no cover - runtime depends on Linux socket support
|
||||
if not hasattr(socket, "AF_UNIX"):
|
||||
raise OSError("AF_UNIX sockets are not available on this platform")
|
||||
self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
||||
self.sock.settimeout(self.timeout)
|
||||
self.sock.connect(self.socket_path)
|
||||
|
||||
|
||||
class OmniDaemonClient:
|
||||
def __init__(self, socket_path: str | None = None, timeout: float = 2.0) -> None:
|
||||
self.socket_path = socket_path or os.getenv("OMNIDAEMON_SOCKET", DEFAULT_SOCKET_PATH)
|
||||
self.timeout = timeout
|
||||
self._local = threading.local()
|
||||
|
||||
def get_health(self) -> dict[str, Any]:
|
||||
return self._request_json("GET", "/v1/health")
|
||||
|
||||
def get_state(self) -> dict[str, Any]:
|
||||
return self._request_json("GET", "/v1/state")
|
||||
|
||||
def get_video_frame(self) -> bytes:
|
||||
return self._request_bytes("GET", "/v1/video/frame")
|
||||
|
||||
def get_control_status(self) -> dict[str, Any]:
|
||||
return self._request_json("GET", "/v1/control/status")
|
||||
|
||||
def send_control_event(
|
||||
self,
|
||||
*,
|
||||
source: str,
|
||||
event_code: str,
|
||||
drive_value: float = 1.0,
|
||||
client_time_ms: int | None = None,
|
||||
) -> dict[str, Any]:
|
||||
payload = {
|
||||
"source": source,
|
||||
"event_code": event_code,
|
||||
"drive_value": float(drive_value),
|
||||
"client_time_ms": client_time_ms,
|
||||
}
|
||||
return self._request_json("POST", "/v1/control/event", payload)
|
||||
|
||||
def close(self) -> None:
|
||||
self._reset_connection()
|
||||
|
||||
def _request_json(
|
||||
self,
|
||||
method: str,
|
||||
path: str,
|
||||
payload: dict[str, Any] | None = None,
|
||||
) -> dict[str, Any]:
|
||||
raw = self._request_bytes(method, path, payload)
|
||||
if not raw:
|
||||
return {}
|
||||
try:
|
||||
return json.loads(raw.decode("utf-8"))
|
||||
except json.JSONDecodeError as error:
|
||||
raise OmniDaemonError(f"invalid daemon JSON response: {error}") from error
|
||||
|
||||
def _request_bytes(
|
||||
self,
|
||||
method: str,
|
||||
path: str,
|
||||
payload: dict[str, Any] | None = None,
|
||||
) -> bytes:
|
||||
body = b""
|
||||
headers: dict[str, str] = {}
|
||||
if payload is not None:
|
||||
body = json.dumps(payload).encode("utf-8")
|
||||
headers["Content-Type"] = "application/json"
|
||||
headers["Content-Length"] = str(len(body))
|
||||
headers.setdefault("Connection", "keep-alive")
|
||||
|
||||
for attempt in range(2):
|
||||
connection = self._get_connection()
|
||||
try:
|
||||
connection.request(method, path, body=body, headers=headers)
|
||||
response = connection.getresponse()
|
||||
raw = response.read()
|
||||
except FileNotFoundError as error:
|
||||
self._reset_connection()
|
||||
raise OmniDaemonError(
|
||||
f"daemon socket not found: {self.socket_path}"
|
||||
) from error
|
||||
except (OSError, http.client.HTTPException) as error:
|
||||
self._reset_connection()
|
||||
if attempt == 0:
|
||||
continue
|
||||
raise OmniDaemonError(
|
||||
f"daemon request failed via {self.socket_path}: {error}"
|
||||
) from error
|
||||
|
||||
if getattr(response, "will_close", False):
|
||||
self._reset_connection()
|
||||
|
||||
if response.status >= 400:
|
||||
message = raw.decode("utf-8", errors="replace").strip() or response.reason
|
||||
try:
|
||||
parsed = json.loads(message)
|
||||
if isinstance(parsed, dict) and "error" in parsed:
|
||||
message = str(parsed["error"])
|
||||
except json.JSONDecodeError:
|
||||
pass
|
||||
raise OmniDaemonError(message, status_code=response.status)
|
||||
return raw
|
||||
|
||||
raise OmniDaemonError(f"daemon request failed via {self.socket_path}: retry exhausted")
|
||||
|
||||
def _get_connection(self) -> UnixHTTPConnection:
|
||||
connection = getattr(self._local, "connection", None)
|
||||
if connection is None:
|
||||
connection = UnixHTTPConnection(self.socket_path, timeout=self.timeout)
|
||||
self._local.connection = connection
|
||||
return connection
|
||||
|
||||
def _reset_connection(self) -> None:
|
||||
connection = getattr(self._local, "connection", None)
|
||||
if connection is None:
|
||||
return
|
||||
try:
|
||||
connection.close()
|
||||
except OSError:
|
||||
pass
|
||||
self._local.connection = None
|
||||
90
python/omnisocket_a_side/control_codec.py
Normal file
90
python/omnisocket_a_side/control_codec.py
Normal file
@@ -0,0 +1,90 @@
|
||||
"""Binary control packet codec shared by the daemon and local clients."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
import struct
|
||||
import time
|
||||
|
||||
|
||||
CONTROL_PACKET_VERSION = 1
|
||||
CONTROL_PACKET_STRUCT = struct.Struct("!BBHIfQ")
|
||||
|
||||
EVENT_NAME_TO_ID = {
|
||||
"pose_home": 1,
|
||||
"pose_hold": 2,
|
||||
"mode_stride": 3,
|
||||
"surge_up": 6,
|
||||
"surge_down": 7,
|
||||
"sway_left": 8,
|
||||
"sway_right": 9,
|
||||
"spin_left": 10,
|
||||
"spin_right": 11,
|
||||
"set_surge": 12,
|
||||
"set_sway": 13,
|
||||
"set_spin": 14,
|
||||
"set_lift": 15,
|
||||
"lift_up": 16,
|
||||
"lift_down": 17,
|
||||
"trim_reset": 18,
|
||||
"session_quit": 19,
|
||||
}
|
||||
EVENT_ID_TO_NAME = {value: key for key, value in EVENT_NAME_TO_ID.items()}
|
||||
ANALOG_EVENT_CODES = {"set_surge", "set_sway", "set_spin"}
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class ControlPacket:
|
||||
seq_id: int
|
||||
event_id: int
|
||||
drive_value: float = 1.0
|
||||
sent_at_ns: int = 0
|
||||
|
||||
@property
|
||||
def event_name(self) -> str:
|
||||
return EVENT_ID_TO_NAME.get(self.event_id, f"unknown_{self.event_id}")
|
||||
|
||||
def encode(self) -> bytes:
|
||||
sent_at_ns = self.sent_at_ns or time.time_ns()
|
||||
return CONTROL_PACKET_STRUCT.pack(
|
||||
CONTROL_PACKET_VERSION,
|
||||
self.event_id,
|
||||
0,
|
||||
int(self.seq_id),
|
||||
float(self.drive_value),
|
||||
int(sent_at_ns),
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def decode(cls, payload: bytes) -> "ControlPacket":
|
||||
if len(payload) != CONTROL_PACKET_STRUCT.size:
|
||||
raise ValueError(
|
||||
f"invalid control packet length {len(payload)}; "
|
||||
f"want {CONTROL_PACKET_STRUCT.size}"
|
||||
)
|
||||
version, event_id, _reserved, seq_id, drive_value, sent_at_ns = (
|
||||
CONTROL_PACKET_STRUCT.unpack(payload)
|
||||
)
|
||||
if version != CONTROL_PACKET_VERSION:
|
||||
raise ValueError(f"unsupported control packet version {version}")
|
||||
return cls(
|
||||
seq_id=int(seq_id),
|
||||
event_id=int(event_id),
|
||||
drive_value=float(drive_value),
|
||||
sent_at_ns=int(sent_at_ns),
|
||||
)
|
||||
|
||||
|
||||
def make_control_packet(
|
||||
seq_id: int,
|
||||
event_name: str,
|
||||
drive_value: float = 1.0,
|
||||
sent_at_ns: int | None = None,
|
||||
) -> ControlPacket:
|
||||
event_id = EVENT_NAME_TO_ID[event_name]
|
||||
return ControlPacket(
|
||||
seq_id=seq_id,
|
||||
event_id=event_id,
|
||||
drive_value=drive_value,
|
||||
sent_at_ns=time.time_ns() if sent_at_ns is None else sent_at_ns,
|
||||
)
|
||||
1063
python/omnisocket_a_side/daemon.py
Normal file
1063
python/omnisocket_a_side/daemon.py
Normal file
File diff suppressed because it is too large
Load Diff
Reference in New Issue
Block a user