Files
2026-03-27 21:57:52 +08:00

62 lines
1.7 KiB
Python

"""Protocol and state definitions for the standalone UDP loopback."""
from __future__ import annotations
from dataclasses import asdict, dataclass, field
import json
import time
from typing import Any, Dict
@dataclass
class InputEnvelope:
"""Small UDP payload carrying one encoded input event."""
seq_id: int
event_code: str
key_name: str
sent_at: float = field(default_factory=time.time)
drive_value: float = 1.0
source_tag: str = "local_keys"
def encode(self) -> bytes:
return json.dumps(asdict(self), separators=(",", ":")).encode("utf-8")
@classmethod
def decode(cls, payload: bytes) -> "InputEnvelope":
data = json.loads(payload.decode("utf-8"))
return cls(
seq_id=int(data["seq_id"]),
event_code=str(data["event_code"]),
key_name=str(data["key_name"]),
sent_at=float(data.get("sent_at", time.time())),
drive_value=float(data.get("drive_value", 1.0)),
source_tag=str(data.get("source_tag", "unknown")),
)
@dataclass
class MotionFrame:
"""Receiver-side motion targets computed from decoded events."""
mode_tag: str = "pose_home"
relay_on: bool = True
surge_goal: float = 0.0
sway_goal: float = 0.0
spin_goal: float = 0.0
lift_goal: float = 0.89
last_event_code: str = "boot"
last_rx_time: float = 0.0
def as_dict(self) -> Dict[str, Any]:
return asdict(self)
def format_motion_frame(frame: MotionFrame) -> str:
return (
f"mode={frame.mode_tag} relay_on={frame.relay_on} "
f"surge={frame.surge_goal:.2f} sway={frame.sway_goal:.2f} "
f"spin={frame.spin_goal:.2f} lift={frame.lift_goal:.2f} "
f"event={frame.last_event_code}"
)