"""Protocol and state definitions for the standalone UDP loopback.""" from __future__ import annotations from dataclasses import asdict, dataclass, field import json import time from typing import Any, Dict @dataclass class InputEnvelope: """Small UDP payload carrying one encoded input event.""" seq_id: int event_code: str key_name: str sent_at: float = field(default_factory=time.time) drive_value: float = 1.0 source_tag: str = "local_keys" def encode(self) -> bytes: return json.dumps(asdict(self), separators=(",", ":")).encode("utf-8") @classmethod def decode(cls, payload: bytes) -> "InputEnvelope": data = json.loads(payload.decode("utf-8")) return cls( seq_id=int(data["seq_id"]), event_code=str(data["event_code"]), key_name=str(data["key_name"]), sent_at=float(data.get("sent_at", time.time())), drive_value=float(data.get("drive_value", 1.0)), source_tag=str(data.get("source_tag", "unknown")), ) @dataclass class MotionFrame: """Receiver-side motion targets computed from decoded events.""" mode_tag: str = "pose_home" relay_on: bool = True surge_goal: float = 0.0 sway_goal: float = 0.0 spin_goal: float = 0.0 lift_goal: float = 0.89 last_event_code: str = "boot" last_rx_time: float = 0.0 def as_dict(self) -> Dict[str, Any]: return asdict(self) def format_motion_frame(frame: MotionFrame) -> str: return ( f"mode={frame.mode_tag} relay_on={frame.relay_on} " f"surge={frame.surge_goal:.2f} sway={frame.sway_goal:.2f} " f"spin={frame.spin_goal:.2f} lift={frame.lift_goal:.2f} " f"event={frame.last_event_code}" )