feat: 自启动与自恢复机制
This commit is contained in:
@@ -2,6 +2,7 @@
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
import socket
|
||||
import threading
|
||||
@@ -90,8 +91,14 @@ class UdpCmdVelReceiver(Node):
|
||||
self._last_published_command: CommandTuple = ZERO_COMMAND
|
||||
self._closing = threading.Event()
|
||||
self._recv_buffer = bytearray(DEFAULT_RECV_BUFFER_BYTES)
|
||||
self._runtime_dir = os.getenv('BLITZ_RUNTIME_DIR', '/run/blitz-robot').strip() or '/run/blitz-robot'
|
||||
self._status_path = os.path.join(self._runtime_dir, 'ros-receiver.status.json')
|
||||
self._transport_reconnect_count = 0
|
||||
self._recv_thread_heartbeat_epoch_ms = self._now_epoch_ms()
|
||||
self._runtime_last_error = ''
|
||||
|
||||
self.create_timer(1.0 / self._publish_rate_hz, self._publish_tick)
|
||||
self.create_timer(1.0, self._write_status_tick)
|
||||
|
||||
recv_target = self._recv_loop_unix_dgram if self._transport_name == 'unix_dgram' else self._recv_loop
|
||||
self._recv_thread = threading.Thread(target=recv_target, daemon=True)
|
||||
@@ -174,6 +181,8 @@ class UdpCmdVelReceiver(Node):
|
||||
pass
|
||||
try:
|
||||
self._transport = self._create_transport()
|
||||
self._transport_reconnect_count += 1
|
||||
self._set_runtime_last_error('')
|
||||
if self._should_log('transport_reconnected', 1.0):
|
||||
self.get_logger().info(
|
||||
'Reconnected OmniSocket transport %s://%s as %s'
|
||||
@@ -182,6 +191,7 @@ class UdpCmdVelReceiver(Node):
|
||||
return True
|
||||
except OSError as exc:
|
||||
self._transport = None
|
||||
self._set_runtime_last_error(str(exc))
|
||||
if self._should_log('transport_reconnect_error', 2.0):
|
||||
self.get_logger().error(f'Failed to reconnect OmniSocket transport: {exc}')
|
||||
time.sleep(0.5)
|
||||
@@ -192,10 +202,13 @@ class UdpCmdVelReceiver(Node):
|
||||
self._close_unix_socket()
|
||||
try:
|
||||
self._setup_unix_socket()
|
||||
self._transport_reconnect_count += 1
|
||||
self._set_runtime_last_error('')
|
||||
if self._should_log('unix_rebound', 1.0):
|
||||
self.get_logger().info(f'Rebound unix datagram socket at {self._local_socket_path}')
|
||||
return True
|
||||
except OSError as exc:
|
||||
self._set_runtime_last_error(str(exc))
|
||||
if self._should_log('unix_rebind_error', 2.0):
|
||||
self.get_logger().error(f'Failed to rebind unix datagram socket: {exc}')
|
||||
time.sleep(0.5)
|
||||
@@ -209,6 +222,61 @@ class UdpCmdVelReceiver(Node):
|
||||
return True
|
||||
return False
|
||||
|
||||
def _now_epoch_ms(self) -> int:
|
||||
return time.time_ns() // 1_000_000
|
||||
|
||||
def _update_recv_heartbeat(self) -> None:
|
||||
with self._lock:
|
||||
self._recv_thread_heartbeat_epoch_ms = self._now_epoch_ms()
|
||||
|
||||
def _last_packet_age_ms(self) -> int | None:
|
||||
with self._lock:
|
||||
last_packet_monotonic = self._last_packet_monotonic
|
||||
if last_packet_monotonic is None:
|
||||
return None
|
||||
return max(0, int((time.monotonic() - last_packet_monotonic) * 1000.0))
|
||||
|
||||
def _socket_bound(self) -> bool:
|
||||
if self._transport_name == 'unix_dgram':
|
||||
return self._unix_socket is not None and os.path.exists(self._local_socket_path)
|
||||
return self._transport is not None
|
||||
|
||||
def _set_runtime_last_error(self, message: str) -> None:
|
||||
self._runtime_last_error = message
|
||||
|
||||
def _status_payload(self) -> dict[str, object]:
|
||||
with self._lock:
|
||||
recv_thread_heartbeat_epoch_ms = self._recv_thread_heartbeat_epoch_ms
|
||||
return {
|
||||
'updated_at_epoch_ms': self._now_epoch_ms(),
|
||||
'pid': os.getpid(),
|
||||
'recv_thread_heartbeat_epoch_ms': recv_thread_heartbeat_epoch_ms,
|
||||
'transport': self._transport_name,
|
||||
'local_socket_path': self._local_socket_path,
|
||||
'socket_bound': self._socket_bound(),
|
||||
'transport_reconnect_count': self._transport_reconnect_count,
|
||||
'last_packet_age_ms': self._last_packet_age_ms(),
|
||||
'last_error': self._runtime_last_error,
|
||||
}
|
||||
|
||||
def _write_status_tick(self) -> None:
|
||||
payload = self._status_payload()
|
||||
if self._transport_name == 'unix_dgram':
|
||||
if self._unix_socket is None:
|
||||
payload['last_error'] = self._runtime_last_error or 'unix datagram socket is not bound'
|
||||
else:
|
||||
if self._transport is None:
|
||||
payload['last_error'] = self._runtime_last_error or 'OmniSocket transport is not connected'
|
||||
try:
|
||||
os.makedirs(self._runtime_dir, exist_ok=True)
|
||||
temp_path = f'{self._status_path}.tmp.{os.getpid()}'
|
||||
with open(temp_path, 'w', encoding='utf-8') as handle:
|
||||
json.dump(payload, handle, ensure_ascii=True, separators=(',', ':'))
|
||||
os.replace(temp_path, self._status_path)
|
||||
except OSError as exc:
|
||||
if self._should_log('status_write_error', 5.0):
|
||||
self.get_logger().warning(f'Failed to write receiver status file: {exc}')
|
||||
|
||||
def _publish_command(self, command: CommandTuple) -> None:
|
||||
msg = TwistStamped()
|
||||
msg.header.stamp = self.get_clock().now().to_msg()
|
||||
@@ -229,32 +297,39 @@ class UdpCmdVelReceiver(Node):
|
||||
|
||||
def _recv_loop(self) -> None:
|
||||
while not self._closing.is_set() and rclpy.ok():
|
||||
self._update_recv_heartbeat()
|
||||
try:
|
||||
assert self._transport is not None
|
||||
meta = self._transport.recv_into(buffer=self._recv_buffer, timeout_ms=100)
|
||||
except BufferError as exc:
|
||||
self._set_runtime_last_error(str(exc))
|
||||
if self._should_log('buffer_error', 2.0):
|
||||
self.get_logger().warning(f'Dropped oversized OmniSocket frame: {exc}')
|
||||
continue
|
||||
except OSError as exc:
|
||||
self._set_runtime_last_error(str(exc))
|
||||
if not self._closing.is_set() and self._should_log('recv_error', 2.0):
|
||||
self.get_logger().error(f'OmniSocket receive loop stopped: {exc}')
|
||||
if not self._reconnect_transport():
|
||||
return
|
||||
continue
|
||||
|
||||
self._update_recv_heartbeat()
|
||||
if meta is None:
|
||||
continue
|
||||
self._set_runtime_last_error('')
|
||||
|
||||
from_peer = str(meta['from'])
|
||||
msg_type = int(meta['msg_type'])
|
||||
body_len = int(meta['body_len'])
|
||||
|
||||
if msg_type == self._msg_type_error:
|
||||
self._set_runtime_last_error(f'server error message from {from_peer}')
|
||||
self._handle_error_message(from_peer, body_len)
|
||||
continue
|
||||
|
||||
if self._expected_sender and from_peer != self._expected_sender:
|
||||
self._set_runtime_last_error(f'unexpected sender {from_peer}')
|
||||
if self._should_log('unexpected_sender', 2.0):
|
||||
self.get_logger().warning(
|
||||
'Ignoring message from unexpected sender %s (expected %s)'
|
||||
@@ -263,6 +338,7 @@ class UdpCmdVelReceiver(Node):
|
||||
continue
|
||||
|
||||
if msg_type != self._msg_type_binary:
|
||||
self._set_runtime_last_error(f'unexpected message type {msg_type}')
|
||||
if self._should_log('unexpected_type', 2.0):
|
||||
self.get_logger().warning(
|
||||
'Ignoring unexpected message type %d from %s (%d bytes)'
|
||||
@@ -271,6 +347,7 @@ class UdpCmdVelReceiver(Node):
|
||||
continue
|
||||
|
||||
if body_len != PACKET_SIZE:
|
||||
self._set_runtime_last_error(f'invalid payload size {body_len}')
|
||||
if self._should_log('packet_size', 2.0):
|
||||
self.get_logger().warning(
|
||||
'Dropped binary payload from %s with invalid size %d (expected %d)'
|
||||
@@ -281,6 +358,7 @@ class UdpCmdVelReceiver(Node):
|
||||
try:
|
||||
command = unpack_command(self._recv_buffer[:PACKET_SIZE])
|
||||
except ValueError as exc:
|
||||
self._set_runtime_last_error(str(exc))
|
||||
if self._should_log('decode_error', 2.0):
|
||||
self.get_logger().warning(f'Dropped malformed command payload: {exc}')
|
||||
continue
|
||||
@@ -288,15 +366,18 @@ class UdpCmdVelReceiver(Node):
|
||||
with self._lock:
|
||||
self._latest_command = command
|
||||
self._last_packet_monotonic = time.monotonic()
|
||||
self._set_runtime_last_error('')
|
||||
|
||||
def _recv_loop_unix_dgram(self) -> None:
|
||||
assert self._unix_socket is not None
|
||||
|
||||
while not self._closing.is_set() and rclpy.ok():
|
||||
self._update_recv_heartbeat()
|
||||
try:
|
||||
payload = self._unix_socket.recv(DEFAULT_RECV_BUFFER_BYTES)
|
||||
except socket.timeout:
|
||||
if not os.path.exists(self._local_socket_path):
|
||||
self._set_runtime_last_error('unix datagram socket path disappeared')
|
||||
if self._should_log('unix_socket_missing', 2.0):
|
||||
self.get_logger().warning(
|
||||
f'Unix datagram socket path disappeared, rebinding {self._local_socket_path}'
|
||||
@@ -305,13 +386,16 @@ class UdpCmdVelReceiver(Node):
|
||||
return
|
||||
continue
|
||||
except OSError as exc:
|
||||
self._set_runtime_last_error(str(exc))
|
||||
if not self._closing.is_set() and self._should_log('unix_recv_error', 2.0):
|
||||
self.get_logger().error(f'Unix datagram receive loop stopped: {exc}')
|
||||
if not self._rebind_unix_socket():
|
||||
return
|
||||
continue
|
||||
|
||||
self._update_recv_heartbeat()
|
||||
if len(payload) != PACKET_SIZE:
|
||||
self._set_runtime_last_error(f'invalid unix datagram payload size {len(payload)}')
|
||||
if self._should_log('unix_packet_size', 2.0):
|
||||
self.get_logger().warning(
|
||||
'Dropped unix datagram payload with invalid size %d (expected %d)'
|
||||
@@ -322,6 +406,7 @@ class UdpCmdVelReceiver(Node):
|
||||
try:
|
||||
command = unpack_command(payload)
|
||||
except ValueError as exc:
|
||||
self._set_runtime_last_error(str(exc))
|
||||
if self._should_log('unix_decode_error', 2.0):
|
||||
self.get_logger().warning(f'Dropped malformed unix datagram payload: {exc}')
|
||||
continue
|
||||
@@ -329,6 +414,7 @@ class UdpCmdVelReceiver(Node):
|
||||
with self._lock:
|
||||
self._latest_command = command
|
||||
self._last_packet_monotonic = time.monotonic()
|
||||
self._set_runtime_last_error('')
|
||||
|
||||
def _command_for_publish_tick(self) -> tuple[CommandTuple, Optional[float], bool]:
|
||||
with self._lock:
|
||||
|
||||
Reference in New Issue
Block a user