91 lines
2.4 KiB
Python
91 lines
2.4 KiB
Python
"""Binary control packet codec shared by the daemon and local clients."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass
|
|
import struct
|
|
import time
|
|
|
|
|
|
CONTROL_PACKET_VERSION = 1
|
|
CONTROL_PACKET_STRUCT = struct.Struct("!BBHIfQ")
|
|
|
|
EVENT_NAME_TO_ID = {
|
|
"pose_home": 1,
|
|
"pose_hold": 2,
|
|
"mode_stride": 3,
|
|
"surge_up": 6,
|
|
"surge_down": 7,
|
|
"sway_left": 8,
|
|
"sway_right": 9,
|
|
"spin_left": 10,
|
|
"spin_right": 11,
|
|
"set_surge": 12,
|
|
"set_sway": 13,
|
|
"set_spin": 14,
|
|
"set_lift": 15,
|
|
"lift_up": 16,
|
|
"lift_down": 17,
|
|
"trim_reset": 18,
|
|
"session_quit": 19,
|
|
}
|
|
EVENT_ID_TO_NAME = {value: key for key, value in EVENT_NAME_TO_ID.items()}
|
|
ANALOG_EVENT_CODES = {"set_surge", "set_sway", "set_spin"}
|
|
|
|
|
|
@dataclass(slots=True)
|
|
class ControlPacket:
|
|
seq_id: int
|
|
event_id: int
|
|
drive_value: float = 1.0
|
|
sent_at_ns: int = 0
|
|
|
|
@property
|
|
def event_name(self) -> str:
|
|
return EVENT_ID_TO_NAME.get(self.event_id, f"unknown_{self.event_id}")
|
|
|
|
def encode(self) -> bytes:
|
|
sent_at_ns = self.sent_at_ns or time.time_ns()
|
|
return CONTROL_PACKET_STRUCT.pack(
|
|
CONTROL_PACKET_VERSION,
|
|
self.event_id,
|
|
0,
|
|
int(self.seq_id),
|
|
float(self.drive_value),
|
|
int(sent_at_ns),
|
|
)
|
|
|
|
@classmethod
|
|
def decode(cls, payload: bytes) -> "ControlPacket":
|
|
if len(payload) != CONTROL_PACKET_STRUCT.size:
|
|
raise ValueError(
|
|
f"invalid control packet length {len(payload)}; "
|
|
f"want {CONTROL_PACKET_STRUCT.size}"
|
|
)
|
|
version, event_id, _reserved, seq_id, drive_value, sent_at_ns = (
|
|
CONTROL_PACKET_STRUCT.unpack(payload)
|
|
)
|
|
if version != CONTROL_PACKET_VERSION:
|
|
raise ValueError(f"unsupported control packet version {version}")
|
|
return cls(
|
|
seq_id=int(seq_id),
|
|
event_id=int(event_id),
|
|
drive_value=float(drive_value),
|
|
sent_at_ns=int(sent_at_ns),
|
|
)
|
|
|
|
|
|
def make_control_packet(
|
|
seq_id: int,
|
|
event_name: str,
|
|
drive_value: float = 1.0,
|
|
sent_at_ns: int | None = None,
|
|
) -> ControlPacket:
|
|
event_id = EVENT_NAME_TO_ID[event_name]
|
|
return ControlPacket(
|
|
seq_id=seq_id,
|
|
event_id=event_id,
|
|
drive_value=drive_value,
|
|
sent_at_ns=time.time_ns() if sent_at_ns is None else sent_at_ns,
|
|
)
|