Files
OmniSocketGo/python/omnisocket_b_side/client.py

55 lines
1.7 KiB
Python

"""Local AF_UNIX SOCK_SEQPACKET client for B-side control delivery."""
from __future__ import annotations
import os
import socket
from omnisocket_a_side.control_codec import CONTROL_PACKET_STRUCT
from . import DEFAULT_CTRL_SOCKET_PATH
class BSideControlClient:
def __init__(self, socket_path: str | None = None) -> None:
self.socket_path = socket_path or os.getenv(
"OMNIBDAEMON_CTRL_SOCKET",
DEFAULT_CTRL_SOCKET_PATH,
)
self._sock: socket.socket | None = None
def connect(self) -> None:
if not hasattr(socket, "AF_UNIX"):
raise OSError("AF_UNIX sockets are not available on this platform")
if not hasattr(socket, "SOCK_SEQPACKET"):
raise OSError("SOCK_SEQPACKET is not available on this platform")
sock = socket.socket(socket.AF_UNIX, socket.SOCK_SEQPACKET)
sock.connect(self.socket_path)
self._sock = sock
def recv_control_packet(self, timeout_ms: int = 100) -> bytes | None:
if self._sock is None:
raise RuntimeError("B-side control client is not connected")
self._sock.settimeout(max(0.001, timeout_ms / 1000.0))
try:
payload = self._sock.recv(CONTROL_PACKET_STRUCT.size)
except socket.timeout:
return None
except BlockingIOError:
return None
if payload == b"":
raise ConnectionResetError("daemon control socket closed")
if len(payload) != CONTROL_PACKET_STRUCT.size:
return None
return payload
def close(self) -> None:
if self._sock is None:
return
try:
self._sock.close()
finally:
self._sock = None