Files
OmniSocketGo/ros-control-py/udp_teleop_bridge/udp_teleop_bridge/protocol.py

75 lines
2.5 KiB
Python

"""Shared teleop protocol helpers and transport defaults."""
from __future__ import annotations
import math
import struct
from typing import Iterable, Tuple
COMMAND_STRUCT = struct.Struct('<6f')
PACKET_SIZE = COMMAND_STRUCT.size
SUPPORTED_TRANSPORTS = ('udp', 'kcp')
DEFAULT_TRANSPORT = 'udp'
DEFAULT_OMNI_UDP_SERVER_ADDR = '127.0.0.1:9001'
DEFAULT_OMNI_KCP_SERVER_ADDR = '127.0.0.1:9002'
DEFAULT_KEYBOARD_PEER_ID = 'ros-keyboard-ctrl'
DEFAULT_GAMEPAD_PEER_ID = 'ros-gamepad-ctrl'
DEFAULT_BRIDGE_PEER_ID = 'ros-bridge-ctrl'
DEFAULT_TARGET_PEER = DEFAULT_BRIDGE_PEER_ID
DEFAULT_FRAME_ID = 'pelvis'
DEFAULT_INPUT_TOPIC = '/teleop/cmd_vel'
DEFAULT_OUTPUT_TOPIC = '/hric/robot/cmd_vel'
DEFAULT_SEND_RATE_HZ = 20.0
DEFAULT_INPUT_TIMEOUT = 0.75
DEFAULT_WATCHDOG_TIMEOUT = 0.5
DEFAULT_PUBLISH_RATE_HZ = 100.0
DEFAULT_QUEUE_DEPTH = 10
DEFAULT_EXIT_ZERO_PACKETS = 3
DEFAULT_RECV_BUFFER_BYTES = 2048
ZERO_COMMAND = (0.0, 0.0, 0.0, 0.0, 0.0, 0.0)
def normalize_transport(value: object) -> str:
"""Return a supported transport name."""
transport = str(value).strip().lower()
if transport not in SUPPORTED_TRANSPORTS:
supported = ', '.join(SUPPORTED_TRANSPORTS)
raise ValueError(f"Unsupported transport '{transport}', expected one of: {supported}")
return transport
def default_server_addr_for_transport(transport: str) -> str:
"""Return the default OmniSocket server for the chosen transport."""
transport = normalize_transport(transport)
if transport == 'udp':
return DEFAULT_OMNI_UDP_SERVER_ADDR
return DEFAULT_OMNI_KCP_SERVER_ADDR
def normalize_command(values: Iterable[float]) -> Tuple[float, float, float, float, float, float]:
"""Return a finite six-float command tuple."""
command = tuple(float(value) for value in values)
if len(command) != 6:
raise ValueError(f'Expected 6 command values, got {len(command)}')
if any(not math.isfinite(value) for value in command):
raise ValueError('Command contains a non-finite value')
return command
def pack_command(values: Iterable[float]) -> bytes:
"""Pack six floats into the wire format."""
return COMMAND_STRUCT.pack(*normalize_command(values))
def unpack_command(payload: bytes) -> Tuple[float, float, float, float, float, float]:
"""Decode a control packet into a six-float command tuple."""
if len(payload) != PACKET_SIZE:
raise ValueError(f'Expected {PACKET_SIZE} bytes, got {len(payload)}')
return normalize_command(COMMAND_STRUCT.unpack(payload))