From 557590f2bfcc677608241c34c0248d6e029a3149 Mon Sep 17 00:00:00 2001 From: Mock Date: Sat, 18 Apr 2026 14:10:57 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20a-control-acks=20=E4=B8=BA=E7=A9=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/monitoring/control.py | 129 ++++++++++++++++++++++++++++++++-- 1 file changed, 122 insertions(+), 7 deletions(-) diff --git a/backend/monitoring/control.py b/backend/monitoring/control.py index 81122f6..0202e98 100644 --- a/backend/monitoring/control.py +++ b/backend/monitoring/control.py @@ -20,6 +20,15 @@ from .common import ( from .video import safe_kcp_stats +def _payload_preview(payload: bytes, limit: int = 160) -> str: + if not payload: + return "" + preview = payload[:limit].decode("utf-8", errors="replace") + if len(payload) > limit: + return f"{preview}..." + return preview + + class ControlAckTracker: def __init__(self) -> None: self._lock = threading.Lock() @@ -62,24 +71,24 @@ class ControlAckTracker: self._prune_locked(issued_at_mono_ns) self._event_logger.write(event) - def handle_ack(self, ack_payload: dict[str, Any], received_unix_ns: int, received_mono_ns: int) -> None: + def handle_ack(self, ack_payload: dict[str, Any], received_unix_ns: int, received_mono_ns: int) -> str: try: message_id = int(ack_payload["message_id"]) except (KeyError, TypeError, ValueError): - return + return "invalid_message_id" with self._lock: event = self._pending.pop(message_id, None) self._prune_locked(received_mono_ns) if event is None: - return + return "pending_missing" try: control_loop_rtt_ms = round((received_unix_ns - int(event["issued_at_unix_ns"])) / 1_000_000.0, 3) b_recv_to_persist_ms = round(float(ack_payload.get("b_recv_to_persist_us", 0)) / 1000.0, 3) except (TypeError, ValueError): - return + return "invalid_timing" control_oneway_network_est_ms = round(max(0.0, (control_loop_rtt_ms - b_recv_to_persist_ms) / 2.0), 3) control_to_persist_est_ms = round(control_oneway_network_est_ms + b_recv_to_persist_ms, 3) @@ -111,6 +120,7 @@ class ControlAckTracker: "control_to_persist_est_ms": control_to_persist_est_ms, "sample_reason": ack_record["sample_reason"], } + return "accepted" def get_latest_estimate(self) -> dict[str, Any]: with self._lock: @@ -392,8 +402,24 @@ class OmniSocketControlAckReceiver: self._control_defaults: dict[str, Any] = {} self._closing = threading.Event() self._last_error = "" + self._last_server_error = "" + self._registered = False self._reconnect_count = 0 self._ever_connected = False + self._received_messages = 0 + self._received_bytes = 0 + self._accepted_count = 0 + self._pending_missing_count = 0 + self._invalid_message_id_count = 0 + self._invalid_timing_count = 0 + self._unexpected_message_type_count = 0 + self._unexpected_sender_count = 0 + self._sender_mismatch_accepted_count = 0 + self._payload_decode_errors = 0 + self._last_msg_type: int | None = None + self._last_from_peer = "" + self._last_payload_preview = "" + self._last_ack_result = "" self._load_backend() def _load_backend(self) -> None: @@ -444,6 +470,22 @@ class OmniSocketControlAckReceiver: self._thread = threading.Thread(target=self._run, name="omnisocket-control-ack", daemon=True) self._thread.start() + @staticmethod + def _looks_like_control_ack(ack_payload: Any) -> bool: + if not isinstance(ack_payload, dict): + return False + if "message_id" not in ack_payload: + return False + return any(field in ack_payload for field in ("ack_phase", "b_recv_to_persist_us", "unix_send_ok", "sample_reason")) + + @staticmethod + def _sender_matches(expected_sender: str, from_peer: str) -> bool: + normalized_expected = expected_sender.strip() + normalized_from = from_peer.strip() + if not normalized_expected: + return True + return normalized_from == normalized_expected + def _run(self) -> None: while not self._closing.is_set(): expected_sender = "" @@ -460,21 +502,65 @@ class OmniSocketControlAckReceiver: while not self._closing.is_set(): result = session.recv(timeout_ms=1000) if result is None: + try: + session_stats = dict(session.stats()) + except Exception: + session_stats = {} + with self._lock: + self._registered = bool(session_stats.get("registered", 0)) + self._last_server_error = str(session_stats.get("last_server_error", "") or "") continue from_peer, msg_type, payload = result + with self._lock: + self._received_messages += 1 + self._received_bytes += len(payload) + self._last_from_peer = str(from_peer or "") + self._last_msg_type = int(msg_type) + self._last_payload_preview = _payload_preview(payload) + try: + session_stats = dict(session.stats()) + except Exception: + session_stats = {} + self._registered = bool(session_stats.get("registered", 0)) + self._last_server_error = str(session_stats.get("last_server_error", "") or "") if msg_type == self._msg_type_error: with self._lock: self._last_error = f"ack session error from {from_peer}: {payload.decode('utf-8', errors='replace')}" + self._last_ack_result = "server_error" continue if msg_type != self._msg_type_text: - continue - if expected_sender and from_peer != expected_sender: + with self._lock: + self._unexpected_message_type_count += 1 + self._last_ack_result = "unexpected_message_type" continue try: ack_payload = json.loads(payload.decode("utf-8")) except (UnicodeDecodeError, json.JSONDecodeError): + with self._lock: + self._payload_decode_errors += 1 + self._last_ack_result = "payload_decode_error" continue - self._ack_tracker.handle_ack(ack_payload, time.time_ns(), time.monotonic_ns()) + sender_matches = self._sender_matches(expected_sender, from_peer) + if not sender_matches: + with self._lock: + self._unexpected_sender_count += 1 + if not self._looks_like_control_ack(ack_payload): + with self._lock: + self._last_ack_result = "unexpected_sender" + continue + ack_result = self._ack_tracker.handle_ack(ack_payload, time.time_ns(), time.monotonic_ns()) + with self._lock: + self._last_ack_result = ack_result + if ack_result == "accepted": + self._accepted_count += 1 + if not sender_matches: + self._sender_mismatch_accepted_count += 1 + elif ack_result == "pending_missing": + self._pending_missing_count += 1 + elif ack_result == "invalid_message_id": + self._invalid_message_id_count += 1 + elif ack_result == "invalid_timing": + self._invalid_timing_count += 1 except Exception as error: # pragma: no cover if not self._closing.is_set(): with self._lock: @@ -493,14 +579,43 @@ class OmniSocketControlAckReceiver: def get_status(self) -> dict[str, Any]: config = load_omnisocket_config().get("control_ack_receiver", {}) + with self._lock: + session = self._session + if session is not None: + try: + session_stats = dict(session.stats()) + except Exception: + session_stats = {} + else: + session_stats = {} with self._lock: return { "backend_ready": self._session_cls is not None, "started": self._started, "connected": self._session is not None, + "registered": bool(session_stats.get("registered", 0) or self._registered), "peer_id": str(config.get("peer_id", "")), "expected_sender": str(config.get("expected_sender", "")), + "recv_calls": int(session_stats.get("recv_calls", 0)), + "recv_bytes": int(session_stats.get("recv_bytes", 0)), + "recv_timeouts": int(session_stats.get("recv_timeouts", 0)), + "recv_errors": int(session_stats.get("recv_errors", 0)), + "received_messages": self._received_messages, + "received_bytes": self._received_bytes, + "accepted_count": self._accepted_count, + "pending_missing_count": self._pending_missing_count, + "invalid_message_id_count": self._invalid_message_id_count, + "invalid_timing_count": self._invalid_timing_count, + "unexpected_message_type_count": self._unexpected_message_type_count, + "unexpected_sender_count": self._unexpected_sender_count, + "sender_mismatch_accepted_count": self._sender_mismatch_accepted_count, + "payload_decode_errors": self._payload_decode_errors, + "last_msg_type": self._last_msg_type, + "last_from_peer": self._last_from_peer, + "last_payload_preview": self._last_payload_preview, + "last_ack_result": self._last_ack_result, "reconnect_count": self._reconnect_count, + "last_server_error": str(session_stats.get("last_server_error", "") or self._last_server_error), "last_error": self._last_error, }