"""Binary control packet codec shared by the daemon and local clients.""" from __future__ import annotations from dataclasses import dataclass import struct import time CONTROL_PACKET_VERSION = 1 CONTROL_PACKET_STRUCT = struct.Struct("!BBHIfQ") EVENT_NAME_TO_ID = { "pose_home": 1, "pose_hold": 2, "mode_stride": 3, "surge_up": 6, "surge_down": 7, "sway_left": 8, "sway_right": 9, "spin_left": 10, "spin_right": 11, "set_surge": 12, "set_sway": 13, "set_spin": 14, "set_lift": 15, "lift_up": 16, "lift_down": 17, "trim_reset": 18, "session_quit": 19, } EVENT_ID_TO_NAME = {value: key for key, value in EVENT_NAME_TO_ID.items()} ANALOG_EVENT_CODES = {"set_surge", "set_sway", "set_spin"} @dataclass(slots=True) class ControlPacket: seq_id: int event_id: int drive_value: float = 1.0 sent_at_ns: int = 0 @property def event_name(self) -> str: return EVENT_ID_TO_NAME.get(self.event_id, f"unknown_{self.event_id}") def encode(self) -> bytes: sent_at_ns = self.sent_at_ns or time.time_ns() return CONTROL_PACKET_STRUCT.pack( CONTROL_PACKET_VERSION, self.event_id, 0, int(self.seq_id), float(self.drive_value), int(sent_at_ns), ) @classmethod def decode(cls, payload: bytes) -> "ControlPacket": if len(payload) != CONTROL_PACKET_STRUCT.size: raise ValueError( f"invalid control packet length {len(payload)}; " f"want {CONTROL_PACKET_STRUCT.size}" ) version, event_id, _reserved, seq_id, drive_value, sent_at_ns = ( CONTROL_PACKET_STRUCT.unpack(payload) ) if version != CONTROL_PACKET_VERSION: raise ValueError(f"unsupported control packet version {version}") return cls( seq_id=int(seq_id), event_id=int(event_id), drive_value=float(drive_value), sent_at_ns=int(sent_at_ns), ) def make_control_packet( seq_id: int, event_name: str, drive_value: float = 1.0, sent_at_ns: int | None = None, ) -> ControlPacket: event_id = EVENT_NAME_TO_ID[event_name] return ControlPacket( seq_id=seq_id, event_id=event_id, drive_value=drive_value, sent_at_ns=time.time_ns() if sent_at_ns is None else sent_at_ns, )