Files
tienkung-szu/Deploy_Tienkung/udp_loopback/udp_keyboard_sender.py
2026-03-30 15:30:30 +08:00

206 lines
6.6 KiB
Python

"""Keyboard sender that encodes key events and emits them over localhost UDP."""
from __future__ import annotations
import os
import select
import signal
import socket
import sys
import termios
import threading
import tty
from pathlib import Path
from typing import Dict, Optional
import yaml
try:
from .protocol import InputEnvelope
except ImportError: # pragma: no cover - direct script execution fallback
from protocol import InputEnvelope
class UDPKeyboardSender:
"""Standalone keyboard sender for localhost UDP testing."""
def __init__(self) -> None:
self.config: Dict[str, object] = {}
self.seq_id = 0
self.running = False
self.socket: Optional[socket.socket] = None
self.input_thread: Optional[threading.Thread] = None
self.original_terminal_settings = None
self._load_config()
self._init_socket()
self._print_help()
def _load_config(self) -> None:
config_path = Path(__file__).resolve().parent / "config" / "udp_loopback.yaml"
with config_path.open("r", encoding="utf-8") as file:
self.config = yaml.safe_load(file) or {}
sender_cfg = self.config.get("sender", {})
self.target_host = sender_cfg.get("target_host", "127.0.0.1")
self.target_port = int(sender_cfg.get("target_port", 31000))
self.source_tag = sender_cfg.get("source_tag", "local_keys")
def _init_socket(self) -> None:
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
def _print_help(self) -> None:
print("UDP keyboard sender ready")
print(f"Target: {self.target_host}:{self.target_port}")
print("Keys:")
print(" z -> pose_home")
print(" c -> pose_hold")
print(" m -> mode_stride")
print(" w/s -> surge +/-")
print(" a/d -> sway +/-")
print(" q/e -> spin +/-")
print(" Left/Right -> lift +/-")
print(" Up/Down -> surge +/-")
print(" r -> trim_reset")
print(" 4 -> clear x speed")
print(" 5 -> clear y speed")
print(" 6 -> clear yaw speed")
print(" x -> session_quit")
def start(self) -> None:
self.running = True
self.input_thread = threading.Thread(target=self._input_loop, daemon=True)
self.input_thread.start()
print("UDP keyboard sender thread started")
def stop(self) -> None:
self.running = False
if self.input_thread and self.input_thread.is_alive():
self.input_thread.join(timeout=1.0)
if self.original_terminal_settings is not None:
try:
termios.tcsetattr(
sys.stdin, termios.TCSADRAIN, self.original_terminal_settings
)
except termios.error:
pass
self.original_terminal_settings = None
if self.socket is not None:
self.socket.close()
self.socket = None
print("UDP keyboard sender stopped")
def _input_loop(self) -> None:
self.original_terminal_settings = termios.tcgetattr(sys.stdin)
try:
tty.setraw(sys.stdin.fileno())
while self.running:
if select.select([sys.stdin], [], [], 0.1)[0]:
key = sys.stdin.read(1)
self._process_key(key)
except KeyboardInterrupt:
self._handle_ctrl_c()
finally:
if self.original_terminal_settings is not None:
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, self.original_terminal_settings)
self.original_terminal_settings = None
def _process_key(self, key: str) -> None:
event_map = {
"w": ("surge_up", "w", 1.0),
"s": ("surge_down", "s", 1.0),
"a": ("sway_left", "a", 1.0),
"d": ("sway_right", "d", 1.0),
"q": ("spin_left", "q", 1.0),
"e": ("spin_right", "e", 1.0),
"z": ("pose_home", "z", 1.0),
"c": ("pose_hold", "c", 1.0),
"m": ("mode_stride", "m", 1.0),
"r": ("trim_reset", "r", 1.0),
"4": ("set_surge", "4", 0.0),
"5": ("set_sway", "5", 0.0),
"6": ("set_spin", "6", 0.0),
"x": ("session_quit", "x", 1.0),
}
if key == "\x03":
self._handle_ctrl_c()
return
if key == "\x1b":
self._handle_arrow_key()
return
if key in event_map:
event_code, key_name, drive_value = event_map[key]
self._send_event(event_code, key_name, drive_value=drive_value)
def _handle_arrow_key(self) -> None:
if not select.select([sys.stdin], [], [], 0.1)[0]:
return
key2 = sys.stdin.read(1)
if key2 != "[":
return
if not select.select([sys.stdin], [], [], 0.1)[0]:
return
key3 = sys.stdin.read(1)
arrow_map = {
"A": ("surge_up", "arrow_up"),
"B": ("surge_down", "arrow_down"),
"C": ("lift_down", "arrow_right"),
"D": ("lift_up", "arrow_left"),
}
if key3 in arrow_map:
event_code, key_name = arrow_map[key3]
self._send_event(event_code, key_name)
def _send_event(
self, event_code: str, key_name: str, drive_value: float = 1.0
) -> None:
if self.socket is None:
return
envelope = InputEnvelope(
seq_id=self.seq_id,
event_code=event_code,
key_name=key_name,
drive_value=drive_value,
source_tag=self.source_tag,
)
self.seq_id += 1
self.socket.sendto(envelope.encode(), (self.target_host, self.target_port))
print(
f"sent seq={envelope.seq_id} event={envelope.event_code} "
f"key={envelope.key_name}"
)
if event_code == "session_quit":
self.running = False
def _handle_ctrl_c(self) -> None:
self.running = False
if self.original_terminal_settings is not None:
try:
termios.tcsetattr(
sys.stdin, termios.TCSADRAIN, self.original_terminal_settings
)
except termios.error:
pass
self.original_terminal_settings = None
os.kill(os.getpid(), signal.SIGINT)
def main() -> None:
sender = UDPKeyboardSender()
sender.start()
try:
while sender.running:
if sender.input_thread is not None:
sender.input_thread.join(timeout=0.2)
except KeyboardInterrupt:
pass
finally:
sender.stop()
if __name__ == "__main__":
main()