150 lines
5.1 KiB
Python
150 lines
5.1 KiB
Python
"""Local Unix-domain HTTP client for the A-side OmniDaemon."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import http.client
|
|
import json
|
|
import os
|
|
import socket
|
|
import threading
|
|
from typing import Any
|
|
|
|
from . import DEFAULT_SOCKET_PATH
|
|
|
|
|
|
class OmniDaemonError(RuntimeError):
|
|
def __init__(self, message: str, status_code: int | None = None) -> None:
|
|
super().__init__(message)
|
|
self.status_code = status_code
|
|
|
|
|
|
class UnixHTTPConnection(http.client.HTTPConnection):
|
|
def __init__(self, socket_path: str, timeout: float = 2.0) -> None:
|
|
super().__init__("localhost", timeout=timeout)
|
|
self.socket_path = socket_path
|
|
|
|
def connect(self) -> None: # pragma: no cover - runtime depends on Linux socket support
|
|
if not hasattr(socket, "AF_UNIX"):
|
|
raise OSError("AF_UNIX sockets are not available on this platform")
|
|
self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
|
self.sock.settimeout(self.timeout)
|
|
self.sock.connect(self.socket_path)
|
|
|
|
|
|
class OmniDaemonClient:
|
|
def __init__(self, socket_path: str | None = None, timeout: float = 2.0) -> None:
|
|
self.socket_path = socket_path or os.getenv("OMNIDAEMON_SOCKET", DEFAULT_SOCKET_PATH)
|
|
self.timeout = timeout
|
|
self._local = threading.local()
|
|
|
|
def get_health(self) -> dict[str, Any]:
|
|
return self._request_json("GET", "/v1/health")
|
|
|
|
def get_state(self) -> dict[str, Any]:
|
|
return self._request_json("GET", "/v1/state")
|
|
|
|
def get_video_frame(self) -> bytes:
|
|
return self._request_bytes("GET", "/v1/video/frame")
|
|
|
|
def get_control_status(self) -> dict[str, Any]:
|
|
return self._request_json("GET", "/v1/control/status")
|
|
|
|
def send_control_event(
|
|
self,
|
|
*,
|
|
source: str,
|
|
event_code: str,
|
|
drive_value: float = 1.0,
|
|
client_time_ms: int | None = None,
|
|
) -> dict[str, Any]:
|
|
payload = {
|
|
"source": source,
|
|
"event_code": event_code,
|
|
"drive_value": float(drive_value),
|
|
"client_time_ms": client_time_ms,
|
|
}
|
|
return self._request_json("POST", "/v1/control/event", payload)
|
|
|
|
def close(self) -> None:
|
|
self._reset_connection()
|
|
|
|
def _request_json(
|
|
self,
|
|
method: str,
|
|
path: str,
|
|
payload: dict[str, Any] | None = None,
|
|
) -> dict[str, Any]:
|
|
raw = self._request_bytes(method, path, payload)
|
|
if not raw:
|
|
return {}
|
|
try:
|
|
return json.loads(raw.decode("utf-8"))
|
|
except json.JSONDecodeError as error:
|
|
raise OmniDaemonError(f"invalid daemon JSON response: {error}") from error
|
|
|
|
def _request_bytes(
|
|
self,
|
|
method: str,
|
|
path: str,
|
|
payload: dict[str, Any] | None = None,
|
|
) -> bytes:
|
|
body = b""
|
|
headers: dict[str, str] = {}
|
|
if payload is not None:
|
|
body = json.dumps(payload).encode("utf-8")
|
|
headers["Content-Type"] = "application/json"
|
|
headers["Content-Length"] = str(len(body))
|
|
headers.setdefault("Connection", "keep-alive")
|
|
|
|
for attempt in range(2):
|
|
connection = self._get_connection()
|
|
try:
|
|
connection.request(method, path, body=body, headers=headers)
|
|
response = connection.getresponse()
|
|
raw = response.read()
|
|
except FileNotFoundError as error:
|
|
self._reset_connection()
|
|
raise OmniDaemonError(
|
|
f"daemon socket not found: {self.socket_path}"
|
|
) from error
|
|
except (OSError, http.client.HTTPException) as error:
|
|
self._reset_connection()
|
|
if attempt == 0:
|
|
continue
|
|
raise OmniDaemonError(
|
|
f"daemon request failed via {self.socket_path}: {error}"
|
|
) from error
|
|
|
|
if getattr(response, "will_close", False):
|
|
self._reset_connection()
|
|
|
|
if response.status >= 400:
|
|
message = raw.decode("utf-8", errors="replace").strip() or response.reason
|
|
try:
|
|
parsed = json.loads(message)
|
|
if isinstance(parsed, dict) and "error" in parsed:
|
|
message = str(parsed["error"])
|
|
except json.JSONDecodeError:
|
|
pass
|
|
raise OmniDaemonError(message, status_code=response.status)
|
|
return raw
|
|
|
|
raise OmniDaemonError(f"daemon request failed via {self.socket_path}: retry exhausted")
|
|
|
|
def _get_connection(self) -> UnixHTTPConnection:
|
|
connection = getattr(self._local, "connection", None)
|
|
if connection is None:
|
|
connection = UnixHTTPConnection(self.socket_path, timeout=self.timeout)
|
|
self._local.connection = connection
|
|
return connection
|
|
|
|
def _reset_connection(self) -> None:
|
|
connection = getattr(self._local, "connection", None)
|
|
if connection is None:
|
|
return
|
|
try:
|
|
connection.close()
|
|
except OSError:
|
|
pass
|
|
self._local.connection = None
|