52 lines
1.3 KiB
Python
52 lines
1.3 KiB
Python
from __future__ import annotations
|
|
|
|
import sys
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
|
|
PROJECT_ROOT = Path(__file__).resolve().parents[2]
|
|
WORKSPACE_ROOT = PROJECT_ROOT.parent
|
|
|
|
|
|
def _load_client_api():
|
|
try:
|
|
from omnisocket_a_side.client import OmniDaemonClient, OmniDaemonError
|
|
except ImportError:
|
|
python_dir = WORKSPACE_ROOT / "OmniSocketGo" / "python"
|
|
if python_dir.exists():
|
|
sys.path.insert(0, str(python_dir))
|
|
from omnisocket_a_side.client import OmniDaemonClient, OmniDaemonError
|
|
return OmniDaemonClient, OmniDaemonError
|
|
|
|
|
|
_OmniDaemonClient, OmniDaemonError = _load_client_api()
|
|
_daemon_client = _OmniDaemonClient()
|
|
|
|
|
|
def get_daemon_client():
|
|
return _daemon_client
|
|
|
|
|
|
class ControlProxyService:
|
|
def get_status(self) -> dict[str, Any]:
|
|
return get_daemon_client().get_control_status()
|
|
|
|
def send_event(
|
|
self,
|
|
*,
|
|
event_code: str,
|
|
drive_value: float = 1.0,
|
|
source: str = "django-api",
|
|
client_time_ms: int | None = None,
|
|
) -> dict[str, Any]:
|
|
return get_daemon_client().send_control_event(
|
|
source=source,
|
|
event_code=event_code,
|
|
drive_value=drive_value,
|
|
client_time_ms=client_time_ms,
|
|
)
|
|
|
|
|
|
control_service = ControlProxyService()
|