From 6d77dc26bdd1f49dadc6e14e212be3fb8abe2ece Mon Sep 17 00:00:00 2001 From: Mock Date: Thu, 2 Apr 2026 20:05:28 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E6=8A=8A=E6=9C=8D=E5=8A=A1=E7=AB=AF?= =?UTF-8?q?=E9=94=99=E8=AF=AF=E7=9B=B4=E6=8E=A5=E5=86=99=E8=BF=9B=20last?= =?UTF-8?q?=5Ferror?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- python/omnisocket_a_side/daemon.py | 26 +++++++++++++++++++------- python/omnisocket_b_side/daemon.py | 18 ++++++++++++++---- 2 files changed, 33 insertions(+), 11 deletions(-) diff --git a/python/omnisocket_a_side/daemon.py b/python/omnisocket_a_side/daemon.py index f9d9dae..e4fb251 100644 --- a/python/omnisocket_a_side/daemon.py +++ b/python/omnisocket_a_side/daemon.py @@ -30,9 +30,9 @@ def utc_iso_now() -> str: def load_omnisocket_api(): - from omnisocket import CONTROL_DEFAULTS, MSG_TYPE_BINARY, Session, VIDEO_DEFAULTS + from omnisocket import CONTROL_DEFAULTS, MSG_TYPE_BINARY, MSG_TYPE_ERROR, Session, VIDEO_DEFAULTS - return CONTROL_DEFAULTS, MSG_TYPE_BINARY, Session, VIDEO_DEFAULTS + return CONTROL_DEFAULTS, MSG_TYPE_BINARY, MSG_TYPE_ERROR, Session, VIDEO_DEFAULTS def _merge_kcp_defaults(defaults: dict[str, Any], override: dict[str, Any]) -> dict[str, Any]: @@ -50,7 +50,7 @@ def _load_config(config_path: str | None) -> dict[str, Any]: with path.open("r", encoding="utf-8") as file: raw = yaml.safe_load(file) or {} - control_defaults, _msg_type_binary, _session_cls, video_defaults = load_omnisocket_api() + control_defaults, _msg_type_binary, _msg_type_error, _session_cls, video_defaults = load_omnisocket_api() transport = dict(raw.get("transport", {})) control = dict(raw.get("control_sender", {})) @@ -160,7 +160,7 @@ class QueuedControlEvent: class ControlSessionManager: def __init__(self, config: dict[str, Any]) -> None: - control_defaults, _msg_type_binary, session_cls, _video_defaults = load_omnisocket_api() + control_defaults, _msg_type_binary, _msg_type_error, session_cls, _video_defaults = load_omnisocket_api() transport = config["transport"] control_cfg = config["control_sender"] daemon_cfg = config["daemon"] @@ -403,12 +403,13 @@ class ControlSessionManager: class VideoSessionManager: def __init__(self, config: dict[str, Any]) -> None: - _control_defaults, msg_type_binary, session_cls, video_defaults = load_omnisocket_api() + _control_defaults, msg_type_binary, msg_type_error, session_cls, video_defaults = load_omnisocket_api() transport = config["transport"] video_cfg = config["video_receiver"] daemon_cfg = config["daemon"] self._msg_type_binary = msg_type_binary + self._msg_type_error = msg_type_error self._session_cls = session_cls self._connect_kwargs = { "server_addr": transport["server_addr"], @@ -517,9 +518,12 @@ class VideoSessionManager: meta = session.recv_into(buffer, timeout_ms=200) if meta is None: continue - if meta.get("msg_type") != self._msg_type_binary: - continue + msg_type = int(meta.get("msg_type", -1)) frame = bytes(buffer[: int(meta["body_len"])]) + if msg_type != self._msg_type_binary: + self._disconnect(self._describe_unexpected_message(msg_type, frame)) + time.sleep(0.2) + break jpeg_frame = self._extract_jpeg_frame(frame) if jpeg_frame is None: with self._lock: @@ -535,6 +539,14 @@ class VideoSessionManager: self._disconnect(str(error)) time.sleep(0.2) + def _describe_unexpected_message(self, msg_type: int, payload: bytes) -> str: + detail = payload.decode("utf-8", errors="replace").strip() + if msg_type == self._msg_type_error: + return f"video session rejected by server: {detail or 'unknown error'}" + if detail: + return f"received unexpected video message type {msg_type}: {detail}" + return f"received unexpected video message type {msg_type}" + def _connect(self) -> None: session = self._session_cls() try: diff --git a/python/omnisocket_b_side/daemon.py b/python/omnisocket_b_side/daemon.py index bd254e8..9ea9eba 100644 --- a/python/omnisocket_b_side/daemon.py +++ b/python/omnisocket_b_side/daemon.py @@ -39,9 +39,9 @@ def utc_iso_now() -> str: def load_omnisocket_api(): - from omnisocket import CONTROL_DEFAULTS, MSG_TYPE_BINARY, Session + from omnisocket import CONTROL_DEFAULTS, MSG_TYPE_BINARY, MSG_TYPE_ERROR, Session - return CONTROL_DEFAULTS, MSG_TYPE_BINARY, Session + return CONTROL_DEFAULTS, MSG_TYPE_BINARY, MSG_TYPE_ERROR, Session def _merge_kcp_defaults(defaults: dict[str, Any], override: dict[str, Any]) -> dict[str, Any]: @@ -115,7 +115,7 @@ def _load_config(config_path: str | None) -> dict[str, Any]: with path.open("r", encoding="utf-8") as file: raw = yaml.safe_load(file) or {} - control_defaults, _msg_type_binary, _session_cls = load_omnisocket_api() + control_defaults, _msg_type_binary, _msg_type_error, _session_cls = load_omnisocket_api() transport = dict(raw.get("transport", {})) control = dict(raw.get("control_receiver", {})) video = dict(raw.get("video_sender", {})) @@ -193,12 +193,13 @@ def _load_config(config_path: str | None) -> dict[str, Any]: class ControlRecvManager: def __init__(self, config: dict[str, Any]) -> None: - control_defaults, msg_type_binary, session_cls = load_omnisocket_api() + control_defaults, msg_type_binary, msg_type_error, session_cls = load_omnisocket_api() transport = config["transport"] control_cfg = config["control_receiver"] daemon_cfg = config["daemon"] self._msg_type_binary = msg_type_binary + self._msg_type_error = msg_type_error self._session_cls = session_cls self._connect_kwargs = { "server_addr": transport["server_addr"], @@ -307,6 +308,7 @@ class ControlRecvManager: if msg_type != self._msg_type_binary: with self._lock: self._ignored_non_binary += 1 + self._disconnect(self._describe_unexpected_message(msg_type, payload)) continue if len(payload) != CONTROL_PACKET_STRUCT.size: with self._lock: @@ -346,6 +348,14 @@ class ControlRecvManager: except Exception: pass + def _describe_unexpected_message(self, msg_type: int, payload: bytes) -> str: + detail = payload.decode("utf-8", errors="replace").strip() + if msg_type == self._msg_type_error: + return f"control session rejected by server: {detail or 'unknown error'}" + if detail: + return f"received unexpected control message type {msg_type}: {detail}" + return f"received unexpected control message type {msg_type}" + def _enqueue_packet(self, payload: bytes) -> None: try: self._queue.put_nowait(payload)