379 lines
15 KiB
Python
379 lines
15 KiB
Python
from __future__ import annotations
|
|
|
|
from collections import deque
|
|
import sys
|
|
import threading
|
|
import time
|
|
from typing import Any, Iterator
|
|
|
|
from .common import (
|
|
JPEG_FRAME_DIR,
|
|
OMNISOCKET_CONFIG_PATH,
|
|
OMNISOCKET_FRAME_FRESH_SECONDS,
|
|
PROJECT_ROOT,
|
|
VIDEO_SOURCE_MODE,
|
|
VIDEO_TIMESTAMP_ENDIANNESS,
|
|
VIDEO_TIMESTAMP_MAX_SKEW_NS,
|
|
VIDEO_TIMESTAMP_MULTIPLIER_NS,
|
|
VIDEO_TIMESTAMP_SAMPLE_SIZE,
|
|
VIDEO_TIMESTAMP_TRAILER_BYTES,
|
|
VIDEO_TIMESTAMP_UNIT,
|
|
WORKSPACE_ROOT,
|
|
load_omnisocket_config,
|
|
)
|
|
|
|
|
|
def safe_kcp_stats(session: Any) -> dict[str, Any]:
|
|
if session is None or not hasattr(session, "kcp_stats"):
|
|
return {}
|
|
try:
|
|
return dict(session.kcp_stats())
|
|
except Exception:
|
|
return {}
|
|
|
|
|
|
class OmniSocketVideoReceiver:
|
|
def __init__(self) -> None:
|
|
self._lock = threading.Lock()
|
|
self._thread: threading.Thread | None = None
|
|
self._started = False
|
|
self._session = None
|
|
self._session_cls = None
|
|
self._binary_msg_type = None
|
|
self._video_defaults: dict[str, Any] = {}
|
|
self._latest_frame: bytes | None = None
|
|
self._latest_received_at = 0.0
|
|
self._latest_sequence: int | None = None
|
|
self._latest_latency_ms: float | None = None
|
|
self._latest_timestamp_unit: str | None = None
|
|
self._latest_timestamp_endianness: str | None = None
|
|
self._latency_samples_ms: deque[float] = deque(maxlen=VIDEO_TIMESTAMP_SAMPLE_SIZE)
|
|
self._frames_received = 0
|
|
self._last_error = ""
|
|
self._reconnect_count = 0
|
|
self._ever_connected = False
|
|
self._closing = threading.Event()
|
|
self._load_backend()
|
|
|
|
def _load_backend(self) -> None:
|
|
try:
|
|
self._import_backend()
|
|
except Exception as error: # pragma: no cover - optional runtime dependency
|
|
self._last_error = f"omnisocket import failed: {error}"
|
|
|
|
def _import_backend(self) -> None:
|
|
try:
|
|
from omnisocket import MSG_TYPE_BINARY, Session, VIDEO_DEFAULTS # type: ignore
|
|
except ImportError:
|
|
python_dir = WORKSPACE_ROOT / "OmniSocketGo" / "python"
|
|
if python_dir.exists():
|
|
sys.path.insert(0, str(python_dir))
|
|
from omnisocket import MSG_TYPE_BINARY, Session, VIDEO_DEFAULTS # type: ignore
|
|
|
|
self._binary_msg_type = MSG_TYPE_BINARY
|
|
self._session_cls = Session
|
|
self._video_defaults = dict(VIDEO_DEFAULTS)
|
|
|
|
def ensure_started(self) -> None:
|
|
if self._session_cls is None or self._binary_msg_type is None:
|
|
return
|
|
|
|
with self._lock:
|
|
if self._started or self._closing.is_set():
|
|
return
|
|
self._started = True
|
|
self._thread = threading.Thread(
|
|
target=self._run,
|
|
name="omnisocket-video-receiver",
|
|
daemon=True,
|
|
)
|
|
self._thread.start()
|
|
|
|
def _connect_session(self):
|
|
assert self._session_cls is not None
|
|
|
|
config = load_omnisocket_config()
|
|
transport_cfg = config.get("transport", {})
|
|
video_cfg = config.get("video_receiver", {})
|
|
|
|
session = self._session_cls()
|
|
session.connect(
|
|
server_addr=str(transport_cfg.get("server_addr", "127.0.0.1:10909")),
|
|
peer_id=str(video_cfg.get("peer_id", "peer-a-video")),
|
|
relay_via=str(transport_cfg.get("relay_via", "")),
|
|
bind_ip=str(transport_cfg.get("bind_ip", "")),
|
|
bind_device=str(transport_cfg.get("bind_device", "")),
|
|
**self._video_defaults,
|
|
)
|
|
return session, int(video_cfg.get("buffer_bytes", 1024 * 1024))
|
|
|
|
def _extract_jpeg_payload(self, frame: bytes) -> bytes | None:
|
|
if frame.startswith(b"\xff\xd8"):
|
|
return frame
|
|
if len(frame) > 8 and frame[8:10] == b"\xff\xd8":
|
|
return frame[8:]
|
|
return None
|
|
|
|
def _split_jpeg_frame_and_trailer(self, frame: bytes) -> tuple[bytes, bytes] | None:
|
|
jpeg_payload = self._extract_jpeg_payload(frame)
|
|
if jpeg_payload is None:
|
|
return None
|
|
|
|
if jpeg_payload.endswith(b"\xff\xd9"):
|
|
return jpeg_payload, b""
|
|
|
|
if (
|
|
len(jpeg_payload) >= VIDEO_TIMESTAMP_TRAILER_BYTES + 2
|
|
and jpeg_payload[-(VIDEO_TIMESTAMP_TRAILER_BYTES + 2) : -VIDEO_TIMESTAMP_TRAILER_BYTES] == b"\xff\xd9"
|
|
):
|
|
return jpeg_payload[:-VIDEO_TIMESTAMP_TRAILER_BYTES], jpeg_payload[-VIDEO_TIMESTAMP_TRAILER_BYTES:]
|
|
|
|
eoi_index = jpeg_payload.rfind(b"\xff\xd9")
|
|
if eoi_index < 0:
|
|
return jpeg_payload, b""
|
|
|
|
trailer_start = eoi_index + 2
|
|
return jpeg_payload[:trailer_start], jpeg_payload[trailer_start:]
|
|
|
|
def _extract_jpeg_frame(self, frame: bytes) -> bytes | None:
|
|
split_payload = self._split_jpeg_frame_and_trailer(frame)
|
|
if split_payload is None:
|
|
return None
|
|
jpeg_frame, _ = split_payload
|
|
return jpeg_frame
|
|
|
|
def _extract_sequence(self, frame: bytes) -> int | None:
|
|
if len(frame) >= 8 and not frame.startswith(b"\xff\xd8"):
|
|
return int.from_bytes(frame[:8], "big")
|
|
return None
|
|
|
|
def _extract_frame_tail(self, frame: bytes) -> bytes:
|
|
split_payload = self._split_jpeg_frame_and_trailer(frame)
|
|
if split_payload is None:
|
|
return b""
|
|
_, trailer = split_payload
|
|
return trailer
|
|
|
|
def _extract_frame_timestamp(self, frame: bytes) -> tuple[int, str, str] | None:
|
|
trailer = self._extract_frame_tail(frame)
|
|
if len(trailer) != VIDEO_TIMESTAMP_TRAILER_BYTES:
|
|
return None
|
|
|
|
value = int.from_bytes(trailer, VIDEO_TIMESTAMP_ENDIANNESS, signed=False)
|
|
if value <= 0:
|
|
return None
|
|
|
|
timestamp_ns = value * VIDEO_TIMESTAMP_MULTIPLIER_NS
|
|
if abs(time.time_ns() - timestamp_ns) > VIDEO_TIMESTAMP_MAX_SKEW_NS:
|
|
return None
|
|
|
|
return timestamp_ns, VIDEO_TIMESTAMP_UNIT, VIDEO_TIMESTAMP_ENDIANNESS
|
|
|
|
def _run(self) -> None:
|
|
while not self._closing.is_set():
|
|
try:
|
|
session, buffer_bytes = self._connect_session()
|
|
with self._lock:
|
|
self._session = session
|
|
self._last_error = ""
|
|
if self._ever_connected:
|
|
self._reconnect_count += 1
|
|
else:
|
|
self._ever_connected = True
|
|
buffer = bytearray(buffer_bytes)
|
|
|
|
while not self._closing.is_set():
|
|
meta = session.recv_into(buffer, timeout_ms=1000)
|
|
if meta is None:
|
|
continue
|
|
if meta.get("msg_type") != self._binary_msg_type:
|
|
continue
|
|
|
|
frame = bytes(buffer[: meta["body_len"]])
|
|
jpeg_frame = self._extract_jpeg_frame(frame)
|
|
if jpeg_frame is None:
|
|
self._last_error = "received non-JPEG binary frame"
|
|
continue
|
|
|
|
timestamp_meta = self._extract_frame_timestamp(frame)
|
|
latency_ms = None
|
|
if timestamp_meta is not None:
|
|
timestamp_ns, unit, endianness = timestamp_meta
|
|
latency_ms = round((time.time_ns() - timestamp_ns) / 1_000_000, 3)
|
|
else:
|
|
unit = None
|
|
endianness = None
|
|
|
|
with self._lock:
|
|
self._latest_frame = jpeg_frame
|
|
self._latest_received_at = time.time()
|
|
self._latest_sequence = self._extract_sequence(frame)
|
|
self._latest_latency_ms = latency_ms
|
|
self._latest_timestamp_unit = unit
|
|
self._latest_timestamp_endianness = endianness
|
|
if latency_ms is not None:
|
|
self._latency_samples_ms.append(latency_ms)
|
|
self._frames_received += 1
|
|
except Exception as error: # pragma: no cover - runtime integration path
|
|
if not self._closing.is_set():
|
|
session_error = ""
|
|
if self._session is not None:
|
|
try:
|
|
session_error = str(dict(self._session.stats()).get("last_server_error", "") or "")
|
|
except Exception:
|
|
session_error = ""
|
|
self._last_error = session_error or str(error)
|
|
time.sleep(2)
|
|
finally:
|
|
if self._session is not None:
|
|
try:
|
|
self._session.close()
|
|
except Exception:
|
|
pass
|
|
with self._lock:
|
|
self._session = None
|
|
if self._closing.is_set():
|
|
self._started = False
|
|
|
|
def get_latest_frame(self) -> bytes | None:
|
|
self.ensure_started()
|
|
with self._lock:
|
|
if self._latest_frame is None:
|
|
return None
|
|
if time.time() - self._latest_received_at > OMNISOCKET_FRAME_FRESH_SECONDS:
|
|
return None
|
|
return self._latest_frame
|
|
|
|
def session_stats(self) -> dict[str, Any]:
|
|
self.ensure_started()
|
|
with self._lock:
|
|
session = self._session
|
|
if session is None:
|
|
return {"connected": 0, "registered": 0, "last_server_error": self._last_error}
|
|
try:
|
|
return dict(session.stats())
|
|
except Exception:
|
|
return {"connected": 0, "registered": 0, "last_server_error": self._last_error}
|
|
|
|
def session_kcp_stats(self) -> dict[str, Any]:
|
|
self.ensure_started()
|
|
with self._lock:
|
|
session = self._session
|
|
return safe_kcp_stats(session)
|
|
|
|
def get_status(self) -> dict[str, Any]:
|
|
self.ensure_started()
|
|
config = load_omnisocket_config()
|
|
transport_cfg = config.get("transport", {})
|
|
video_cfg = config.get("video_receiver", {})
|
|
session_stats = self.session_stats()
|
|
with self._lock:
|
|
has_recent_frame = self._latest_frame is not None and (
|
|
time.time() - self._latest_received_at <= OMNISOCKET_FRAME_FRESH_SECONDS
|
|
)
|
|
if has_recent_frame and self._latest_latency_ms is not None:
|
|
timing_status = {
|
|
"available": True,
|
|
"latest_delta_ms": self._latest_latency_ms,
|
|
"delta_samples_ms": list(reversed(self._latency_samples_ms)),
|
|
"sample_count": len(self._latency_samples_ms),
|
|
"sample_window_size": VIDEO_TIMESTAMP_SAMPLE_SIZE,
|
|
"timestamp_unit": self._latest_timestamp_unit,
|
|
"timestamp_endianness": self._latest_timestamp_endianness,
|
|
}
|
|
else:
|
|
timing_status = {
|
|
"available": False,
|
|
"latest_delta_ms": None,
|
|
"delta_samples_ms": [],
|
|
"sample_count": 0,
|
|
"sample_window_size": VIDEO_TIMESTAMP_SAMPLE_SIZE,
|
|
"timestamp_unit": None,
|
|
"timestamp_endianness": None,
|
|
}
|
|
return {
|
|
"backend_ready": self._session_cls is not None,
|
|
"mode": VIDEO_SOURCE_MODE,
|
|
"connected": self._session is not None,
|
|
"registered": bool(session_stats.get("registered", 0)),
|
|
"has_recent_frame": has_recent_frame,
|
|
"frames_received": self._frames_received,
|
|
"latest_sequence": self._latest_sequence,
|
|
"reconnect_count": self._reconnect_count,
|
|
"last_server_error": str(session_stats.get("last_server_error", "") or ""),
|
|
"last_error": self._last_error,
|
|
"config_path": str(OMNISOCKET_CONFIG_PATH),
|
|
"server_addr": str(transport_cfg.get("server_addr", "")),
|
|
"relay_via": str(transport_cfg.get("relay_via", "")),
|
|
"peer_id": str(video_cfg.get("peer_id", "")),
|
|
"buffer_bytes": int(video_cfg.get("buffer_bytes", 0)),
|
|
"timing": timing_status,
|
|
}
|
|
|
|
def close(self) -> None:
|
|
self._closing.set()
|
|
with self._lock:
|
|
session = self._session
|
|
if session is not None:
|
|
try:
|
|
session.close()
|
|
except Exception:
|
|
pass
|
|
thread = self._thread
|
|
if thread is not None and thread.is_alive():
|
|
thread.join(timeout=0.5)
|
|
|
|
|
|
class VideoFrameService:
|
|
def __init__(self, receiver: OmniSocketVideoReceiver) -> None:
|
|
self._receiver = receiver
|
|
|
|
def get_status(self) -> dict[str, Any]:
|
|
receiver_status = self._receiver.get_status()
|
|
receiver_frame = self._receiver.get_latest_frame()
|
|
|
|
if receiver_frame is not None:
|
|
return {
|
|
"available": True,
|
|
"source_mode": "omnisocket-jpeg-live",
|
|
"frame_count": receiver_status["frames_received"],
|
|
"fps": 30,
|
|
"frame_dir": str(JPEG_FRAME_DIR),
|
|
"source_detail": f"peer stream active, frames={receiver_status['frames_received']}",
|
|
"receiver": receiver_status,
|
|
"timing": receiver_status["timing"],
|
|
}
|
|
|
|
wait_detail = receiver_status["last_error"] or (
|
|
"waiting for live OmniSocket JPEG frames; check the hub, sender, and receiver configuration"
|
|
)
|
|
return {
|
|
"available": False,
|
|
"source_mode": "omnisocket-waiting",
|
|
"frame_count": receiver_status["frames_received"],
|
|
"fps": 30,
|
|
"frame_dir": str(JPEG_FRAME_DIR),
|
|
"source_detail": wait_detail,
|
|
"receiver": receiver_status,
|
|
"timing": receiver_status["timing"],
|
|
}
|
|
|
|
def get_next_frame(self) -> bytes:
|
|
receiver_frame = self._receiver.get_latest_frame()
|
|
if receiver_frame is not None:
|
|
return receiver_frame
|
|
raise RuntimeError("no live OmniSocket JPEG frame is currently available")
|
|
|
|
def iter_mjpeg(self, fps: float = 6.0) -> Iterator[bytes]:
|
|
frame_interval = 1.0 / max(1.0, min(fps, 30.0))
|
|
while True:
|
|
frame = self.get_next_frame()
|
|
header = (
|
|
b"--frame\r\n"
|
|
b"Content-Type: image/jpeg\r\n"
|
|
+ f"Content-Length: {len(frame)}\r\n\r\n".encode("ascii")
|
|
)
|
|
yield header + frame + b"\r\n"
|
|
time.sleep(frame_interval)
|
|
|