fix: a-control-acks 为空

This commit is contained in:
2026-04-18 14:10:57 +08:00
parent 5fdb42b5ed
commit 557590f2bf

View File

@@ -20,6 +20,15 @@ from .common import (
from .video import safe_kcp_stats from .video import safe_kcp_stats
def _payload_preview(payload: bytes, limit: int = 160) -> str:
if not payload:
return ""
preview = payload[:limit].decode("utf-8", errors="replace")
if len(payload) > limit:
return f"{preview}..."
return preview
class ControlAckTracker: class ControlAckTracker:
def __init__(self) -> None: def __init__(self) -> None:
self._lock = threading.Lock() self._lock = threading.Lock()
@@ -62,24 +71,24 @@ class ControlAckTracker:
self._prune_locked(issued_at_mono_ns) self._prune_locked(issued_at_mono_ns)
self._event_logger.write(event) self._event_logger.write(event)
def handle_ack(self, ack_payload: dict[str, Any], received_unix_ns: int, received_mono_ns: int) -> None: def handle_ack(self, ack_payload: dict[str, Any], received_unix_ns: int, received_mono_ns: int) -> str:
try: try:
message_id = int(ack_payload["message_id"]) message_id = int(ack_payload["message_id"])
except (KeyError, TypeError, ValueError): except (KeyError, TypeError, ValueError):
return return "invalid_message_id"
with self._lock: with self._lock:
event = self._pending.pop(message_id, None) event = self._pending.pop(message_id, None)
self._prune_locked(received_mono_ns) self._prune_locked(received_mono_ns)
if event is None: if event is None:
return return "pending_missing"
try: try:
control_loop_rtt_ms = round((received_unix_ns - int(event["issued_at_unix_ns"])) / 1_000_000.0, 3) control_loop_rtt_ms = round((received_unix_ns - int(event["issued_at_unix_ns"])) / 1_000_000.0, 3)
b_recv_to_persist_ms = round(float(ack_payload.get("b_recv_to_persist_us", 0)) / 1000.0, 3) b_recv_to_persist_ms = round(float(ack_payload.get("b_recv_to_persist_us", 0)) / 1000.0, 3)
except (TypeError, ValueError): except (TypeError, ValueError):
return return "invalid_timing"
control_oneway_network_est_ms = round(max(0.0, (control_loop_rtt_ms - b_recv_to_persist_ms) / 2.0), 3) control_oneway_network_est_ms = round(max(0.0, (control_loop_rtt_ms - b_recv_to_persist_ms) / 2.0), 3)
control_to_persist_est_ms = round(control_oneway_network_est_ms + b_recv_to_persist_ms, 3) control_to_persist_est_ms = round(control_oneway_network_est_ms + b_recv_to_persist_ms, 3)
@@ -111,6 +120,7 @@ class ControlAckTracker:
"control_to_persist_est_ms": control_to_persist_est_ms, "control_to_persist_est_ms": control_to_persist_est_ms,
"sample_reason": ack_record["sample_reason"], "sample_reason": ack_record["sample_reason"],
} }
return "accepted"
def get_latest_estimate(self) -> dict[str, Any]: def get_latest_estimate(self) -> dict[str, Any]:
with self._lock: with self._lock:
@@ -392,8 +402,24 @@ class OmniSocketControlAckReceiver:
self._control_defaults: dict[str, Any] = {} self._control_defaults: dict[str, Any] = {}
self._closing = threading.Event() self._closing = threading.Event()
self._last_error = "" self._last_error = ""
self._last_server_error = ""
self._registered = False
self._reconnect_count = 0 self._reconnect_count = 0
self._ever_connected = False self._ever_connected = False
self._received_messages = 0
self._received_bytes = 0
self._accepted_count = 0
self._pending_missing_count = 0
self._invalid_message_id_count = 0
self._invalid_timing_count = 0
self._unexpected_message_type_count = 0
self._unexpected_sender_count = 0
self._sender_mismatch_accepted_count = 0
self._payload_decode_errors = 0
self._last_msg_type: int | None = None
self._last_from_peer = ""
self._last_payload_preview = ""
self._last_ack_result = ""
self._load_backend() self._load_backend()
def _load_backend(self) -> None: def _load_backend(self) -> None:
@@ -444,6 +470,22 @@ class OmniSocketControlAckReceiver:
self._thread = threading.Thread(target=self._run, name="omnisocket-control-ack", daemon=True) self._thread = threading.Thread(target=self._run, name="omnisocket-control-ack", daemon=True)
self._thread.start() self._thread.start()
@staticmethod
def _looks_like_control_ack(ack_payload: Any) -> bool:
if not isinstance(ack_payload, dict):
return False
if "message_id" not in ack_payload:
return False
return any(field in ack_payload for field in ("ack_phase", "b_recv_to_persist_us", "unix_send_ok", "sample_reason"))
@staticmethod
def _sender_matches(expected_sender: str, from_peer: str) -> bool:
normalized_expected = expected_sender.strip()
normalized_from = from_peer.strip()
if not normalized_expected:
return True
return normalized_from == normalized_expected
def _run(self) -> None: def _run(self) -> None:
while not self._closing.is_set(): while not self._closing.is_set():
expected_sender = "" expected_sender = ""
@@ -460,21 +502,65 @@ class OmniSocketControlAckReceiver:
while not self._closing.is_set(): while not self._closing.is_set():
result = session.recv(timeout_ms=1000) result = session.recv(timeout_ms=1000)
if result is None: if result is None:
try:
session_stats = dict(session.stats())
except Exception:
session_stats = {}
with self._lock:
self._registered = bool(session_stats.get("registered", 0))
self._last_server_error = str(session_stats.get("last_server_error", "") or "")
continue continue
from_peer, msg_type, payload = result from_peer, msg_type, payload = result
with self._lock:
self._received_messages += 1
self._received_bytes += len(payload)
self._last_from_peer = str(from_peer or "")
self._last_msg_type = int(msg_type)
self._last_payload_preview = _payload_preview(payload)
try:
session_stats = dict(session.stats())
except Exception:
session_stats = {}
self._registered = bool(session_stats.get("registered", 0))
self._last_server_error = str(session_stats.get("last_server_error", "") or "")
if msg_type == self._msg_type_error: if msg_type == self._msg_type_error:
with self._lock: with self._lock:
self._last_error = f"ack session error from {from_peer}: {payload.decode('utf-8', errors='replace')}" self._last_error = f"ack session error from {from_peer}: {payload.decode('utf-8', errors='replace')}"
self._last_ack_result = "server_error"
continue continue
if msg_type != self._msg_type_text: if msg_type != self._msg_type_text:
continue with self._lock:
if expected_sender and from_peer != expected_sender: self._unexpected_message_type_count += 1
self._last_ack_result = "unexpected_message_type"
continue continue
try: try:
ack_payload = json.loads(payload.decode("utf-8")) ack_payload = json.loads(payload.decode("utf-8"))
except (UnicodeDecodeError, json.JSONDecodeError): except (UnicodeDecodeError, json.JSONDecodeError):
with self._lock:
self._payload_decode_errors += 1
self._last_ack_result = "payload_decode_error"
continue continue
self._ack_tracker.handle_ack(ack_payload, time.time_ns(), time.monotonic_ns()) sender_matches = self._sender_matches(expected_sender, from_peer)
if not sender_matches:
with self._lock:
self._unexpected_sender_count += 1
if not self._looks_like_control_ack(ack_payload):
with self._lock:
self._last_ack_result = "unexpected_sender"
continue
ack_result = self._ack_tracker.handle_ack(ack_payload, time.time_ns(), time.monotonic_ns())
with self._lock:
self._last_ack_result = ack_result
if ack_result == "accepted":
self._accepted_count += 1
if not sender_matches:
self._sender_mismatch_accepted_count += 1
elif ack_result == "pending_missing":
self._pending_missing_count += 1
elif ack_result == "invalid_message_id":
self._invalid_message_id_count += 1
elif ack_result == "invalid_timing":
self._invalid_timing_count += 1
except Exception as error: # pragma: no cover except Exception as error: # pragma: no cover
if not self._closing.is_set(): if not self._closing.is_set():
with self._lock: with self._lock:
@@ -493,14 +579,43 @@ class OmniSocketControlAckReceiver:
def get_status(self) -> dict[str, Any]: def get_status(self) -> dict[str, Any]:
config = load_omnisocket_config().get("control_ack_receiver", {}) config = load_omnisocket_config().get("control_ack_receiver", {})
with self._lock:
session = self._session
if session is not None:
try:
session_stats = dict(session.stats())
except Exception:
session_stats = {}
else:
session_stats = {}
with self._lock: with self._lock:
return { return {
"backend_ready": self._session_cls is not None, "backend_ready": self._session_cls is not None,
"started": self._started, "started": self._started,
"connected": self._session is not None, "connected": self._session is not None,
"registered": bool(session_stats.get("registered", 0) or self._registered),
"peer_id": str(config.get("peer_id", "")), "peer_id": str(config.get("peer_id", "")),
"expected_sender": str(config.get("expected_sender", "")), "expected_sender": str(config.get("expected_sender", "")),
"recv_calls": int(session_stats.get("recv_calls", 0)),
"recv_bytes": int(session_stats.get("recv_bytes", 0)),
"recv_timeouts": int(session_stats.get("recv_timeouts", 0)),
"recv_errors": int(session_stats.get("recv_errors", 0)),
"received_messages": self._received_messages,
"received_bytes": self._received_bytes,
"accepted_count": self._accepted_count,
"pending_missing_count": self._pending_missing_count,
"invalid_message_id_count": self._invalid_message_id_count,
"invalid_timing_count": self._invalid_timing_count,
"unexpected_message_type_count": self._unexpected_message_type_count,
"unexpected_sender_count": self._unexpected_sender_count,
"sender_mismatch_accepted_count": self._sender_mismatch_accepted_count,
"payload_decode_errors": self._payload_decode_errors,
"last_msg_type": self._last_msg_type,
"last_from_peer": self._last_from_peer,
"last_payload_preview": self._last_payload_preview,
"last_ack_result": self._last_ack_result,
"reconnect_count": self._reconnect_count, "reconnect_count": self._reconnect_count,
"last_server_error": str(session_stats.get("last_server_error", "") or self._last_server_error),
"last_error": self._last_error, "last_error": self._last_error,
} }