Files
robot-command-center/backend/monitoring/consumers.py

43 lines
1.3 KiB
Python

from __future__ import annotations
import json
from channels.generic.websocket import WebsocketConsumer
from .common import CONTROL_PACKET_SIZE, CONTROL_SOURCE_WEB
from .services import control_arbiter, native_control_ingress
class ControlConsumer(WebsocketConsumer):
def connect(self) -> None:
control_arbiter.ensure_started()
native_control_ingress.ensure_started()
self.accept()
self.send(
text_data=json.dumps(
{
"type": "ready",
"packet_bytes": CONTROL_PACKET_SIZE,
}
)
)
def receive(self, text_data: str | None = None, bytes_data: bytes | None = None) -> None:
if bytes_data is None:
self.send(text_data=json.dumps({"type": "error", "detail": "binary control payload required"}))
return
if len(bytes_data) != CONTROL_PACKET_SIZE:
self.send(
text_data=json.dumps(
{
"type": "error",
"detail": f"expected {CONTROL_PACKET_SIZE} bytes, got {len(bytes_data)}",
}
)
)
return
control_arbiter.ingest_command(CONTROL_SOURCE_WEB, bytes_data)