diff --git a/python/omnisocket_a_side/daemon.py b/python/omnisocket_a_side/daemon.py index f9d9dae..e4fb251 100644 --- a/python/omnisocket_a_side/daemon.py +++ b/python/omnisocket_a_side/daemon.py @@ -30,9 +30,9 @@ def utc_iso_now() -> str: def load_omnisocket_api(): - from omnisocket import CONTROL_DEFAULTS, MSG_TYPE_BINARY, Session, VIDEO_DEFAULTS + from omnisocket import CONTROL_DEFAULTS, MSG_TYPE_BINARY, MSG_TYPE_ERROR, Session, VIDEO_DEFAULTS - return CONTROL_DEFAULTS, MSG_TYPE_BINARY, Session, VIDEO_DEFAULTS + return CONTROL_DEFAULTS, MSG_TYPE_BINARY, MSG_TYPE_ERROR, Session, VIDEO_DEFAULTS def _merge_kcp_defaults(defaults: dict[str, Any], override: dict[str, Any]) -> dict[str, Any]: @@ -50,7 +50,7 @@ def _load_config(config_path: str | None) -> dict[str, Any]: with path.open("r", encoding="utf-8") as file: raw = yaml.safe_load(file) or {} - control_defaults, _msg_type_binary, _session_cls, video_defaults = load_omnisocket_api() + control_defaults, _msg_type_binary, _msg_type_error, _session_cls, video_defaults = load_omnisocket_api() transport = dict(raw.get("transport", {})) control = dict(raw.get("control_sender", {})) @@ -160,7 +160,7 @@ class QueuedControlEvent: class ControlSessionManager: def __init__(self, config: dict[str, Any]) -> None: - control_defaults, _msg_type_binary, session_cls, _video_defaults = load_omnisocket_api() + control_defaults, _msg_type_binary, _msg_type_error, session_cls, _video_defaults = load_omnisocket_api() transport = config["transport"] control_cfg = config["control_sender"] daemon_cfg = config["daemon"] @@ -403,12 +403,13 @@ class ControlSessionManager: class VideoSessionManager: def __init__(self, config: dict[str, Any]) -> None: - _control_defaults, msg_type_binary, session_cls, video_defaults = load_omnisocket_api() + _control_defaults, msg_type_binary, msg_type_error, session_cls, video_defaults = load_omnisocket_api() transport = config["transport"] video_cfg = config["video_receiver"] daemon_cfg = config["daemon"] self._msg_type_binary = msg_type_binary + self._msg_type_error = msg_type_error self._session_cls = session_cls self._connect_kwargs = { "server_addr": transport["server_addr"], @@ -517,9 +518,12 @@ class VideoSessionManager: meta = session.recv_into(buffer, timeout_ms=200) if meta is None: continue - if meta.get("msg_type") != self._msg_type_binary: - continue + msg_type = int(meta.get("msg_type", -1)) frame = bytes(buffer[: int(meta["body_len"])]) + if msg_type != self._msg_type_binary: + self._disconnect(self._describe_unexpected_message(msg_type, frame)) + time.sleep(0.2) + break jpeg_frame = self._extract_jpeg_frame(frame) if jpeg_frame is None: with self._lock: @@ -535,6 +539,14 @@ class VideoSessionManager: self._disconnect(str(error)) time.sleep(0.2) + def _describe_unexpected_message(self, msg_type: int, payload: bytes) -> str: + detail = payload.decode("utf-8", errors="replace").strip() + if msg_type == self._msg_type_error: + return f"video session rejected by server: {detail or 'unknown error'}" + if detail: + return f"received unexpected video message type {msg_type}: {detail}" + return f"received unexpected video message type {msg_type}" + def _connect(self) -> None: session = self._session_cls() try: diff --git a/python/omnisocket_b_side/daemon.py b/python/omnisocket_b_side/daemon.py index bd254e8..9ea9eba 100644 --- a/python/omnisocket_b_side/daemon.py +++ b/python/omnisocket_b_side/daemon.py @@ -39,9 +39,9 @@ def utc_iso_now() -> str: def load_omnisocket_api(): - from omnisocket import CONTROL_DEFAULTS, MSG_TYPE_BINARY, Session + from omnisocket import CONTROL_DEFAULTS, MSG_TYPE_BINARY, MSG_TYPE_ERROR, Session - return CONTROL_DEFAULTS, MSG_TYPE_BINARY, Session + return CONTROL_DEFAULTS, MSG_TYPE_BINARY, MSG_TYPE_ERROR, Session def _merge_kcp_defaults(defaults: dict[str, Any], override: dict[str, Any]) -> dict[str, Any]: @@ -115,7 +115,7 @@ def _load_config(config_path: str | None) -> dict[str, Any]: with path.open("r", encoding="utf-8") as file: raw = yaml.safe_load(file) or {} - control_defaults, _msg_type_binary, _session_cls = load_omnisocket_api() + control_defaults, _msg_type_binary, _msg_type_error, _session_cls = load_omnisocket_api() transport = dict(raw.get("transport", {})) control = dict(raw.get("control_receiver", {})) video = dict(raw.get("video_sender", {})) @@ -193,12 +193,13 @@ def _load_config(config_path: str | None) -> dict[str, Any]: class ControlRecvManager: def __init__(self, config: dict[str, Any]) -> None: - control_defaults, msg_type_binary, session_cls = load_omnisocket_api() + control_defaults, msg_type_binary, msg_type_error, session_cls = load_omnisocket_api() transport = config["transport"] control_cfg = config["control_receiver"] daemon_cfg = config["daemon"] self._msg_type_binary = msg_type_binary + self._msg_type_error = msg_type_error self._session_cls = session_cls self._connect_kwargs = { "server_addr": transport["server_addr"], @@ -307,6 +308,7 @@ class ControlRecvManager: if msg_type != self._msg_type_binary: with self._lock: self._ignored_non_binary += 1 + self._disconnect(self._describe_unexpected_message(msg_type, payload)) continue if len(payload) != CONTROL_PACKET_STRUCT.size: with self._lock: @@ -346,6 +348,14 @@ class ControlRecvManager: except Exception: pass + def _describe_unexpected_message(self, msg_type: int, payload: bytes) -> str: + detail = payload.decode("utf-8", errors="replace").strip() + if msg_type == self._msg_type_error: + return f"control session rejected by server: {detail or 'unknown error'}" + if detail: + return f"received unexpected control message type {msg_type}: {detail}" + return f"received unexpected control message type {msg_type}" + def _enqueue_packet(self, payload: bytes) -> None: try: self._queue.put_nowait(payload)