Files
tienkung-szu/Deploy_Tienkung/udp_loopback/omnisocket_control.py
2026-03-30 23:04:39 +08:00

107 lines
2.8 KiB
Python

"""Binary control packet codec and motion helpers for OmniSocket demos."""
from __future__ import annotations
from dataclasses import dataclass
import struct
import time
CONTROL_PACKET_VERSION = 1
CONTROL_PACKET_STRUCT = struct.Struct("!BBHIfQ")
EVENT_NAME_TO_ID = {
"pose_home": 1,
"pose_hold": 2,
"mode_stride": 3,
"surge_up": 6,
"surge_down": 7,
"sway_left": 8,
"sway_right": 9,
"spin_left": 10,
"spin_right": 11,
"set_surge": 12,
"set_sway": 13,
"set_spin": 14,
"set_lift": 15,
"lift_up": 16,
"lift_down": 17,
"trim_reset": 18,
"session_quit": 19,
}
EVENT_ID_TO_NAME = {value: key for key, value in EVENT_NAME_TO_ID.items()}
@dataclass(slots=True)
class ControlPacket:
seq_id: int
event_id: int
drive_value: float = 1.0
sent_at_ns: int = 0
@property
def event_name(self) -> str:
return EVENT_ID_TO_NAME.get(self.event_id, f"unknown_{self.event_id}")
def encode(self) -> bytes:
sent_at_ns = self.sent_at_ns or time.time_ns()
return CONTROL_PACKET_STRUCT.pack(
CONTROL_PACKET_VERSION,
self.event_id,
0,
int(self.seq_id),
float(self.drive_value),
int(sent_at_ns),
)
@classmethod
def decode(cls, payload: bytes) -> "ControlPacket":
if len(payload) != CONTROL_PACKET_STRUCT.size:
raise ValueError(
f"invalid control packet length {len(payload)}; "
f"want {CONTROL_PACKET_STRUCT.size}"
)
version, event_id, _reserved, seq_id, drive_value, sent_at_ns = (
CONTROL_PACKET_STRUCT.unpack(payload)
)
if version != CONTROL_PACKET_VERSION:
raise ValueError(f"unsupported control packet version {version}")
return cls(
seq_id=int(seq_id),
event_id=int(event_id),
drive_value=float(drive_value),
sent_at_ns=int(sent_at_ns),
)
def make_control_packet(
seq_id: int, event_name: str, drive_value: float = 1.0, sent_at_ns: int | None = None
) -> ControlPacket:
event_id = EVENT_NAME_TO_ID[event_name]
return ControlPacket(
seq_id=seq_id,
event_id=event_id,
drive_value=drive_value,
sent_at_ns=time.time_ns() if sent_at_ns is None else sent_at_ns,
)
@dataclass(slots=True)
class MotionFrame:
mode_tag: str = "pose_hold"
surge_goal: float = 0.0
sway_goal: float = 0.0
spin_goal: float = 0.0
lift_goal: float = 0.89
relay_on: bool = True
last_event_code: str = ""
last_rx_time: float = 0.0
def format_motion_frame(frame: MotionFrame) -> str:
return (
f"mode={frame.mode_tag} surge={frame.surge_goal:.3f} "
f"sway={frame.sway_goal:.3f} spin={frame.spin_goal:.3f} "
f"lift={frame.lift_goal:.3f}"
)