Files
2026-04-18 17:01:10 +08:00

752 lines
33 KiB
Python

from __future__ import annotations
from collections import deque
from dataclasses import dataclass
import hashlib
import math
import struct
import sys
import threading
import time
from typing import Any, Iterator
from .common import (
JPEG_FRAME_DIR,
OMNISOCKET_CONFIG_PATH,
OMNISOCKET_FRAME_FRESH_SECONDS,
JsonlRunLogger,
VIDEO_SOURCE_MODE,
VIDEO_TIMESTAMP_SAMPLE_SIZE,
VIDEO_TRAILER_BYTES,
VIDEO_TRAILER_ENDIANNESS,
VIDEO_TRAILER_STRUCT,
VIDEO_TRAILER_TIMESTAMP_MAX_SKEW_NS,
VIDEO_TRAILER_TIMESTAMP_MULTIPLIER_NS,
VIDEO_TRAILER_TIMESTAMP_UNIT,
WORKSPACE_ROOT,
load_omnisocket_config,
)
def safe_kcp_stats(session: Any) -> dict[str, Any]:
if session is None or not hasattr(session, "kcp_stats"):
return {}
try:
return dict(session.kcp_stats())
except Exception:
return {}
class VideoDisplayProbeStore:
def __init__(self) -> None:
self._lock = threading.Lock()
self._logger = JsonlRunLogger("BLITZ_A_VIDEO_DISPLAY_PROBE_LOG_PATH", "a-video-display-probe")
self._latest: VideoDisplayProbeStatus = VideoDisplayProbeStatus(
updated_at=None,
frame_seq=None,
frame_hash="",
input_to_next_fresh_frame_ms=None,
input_to_next_changed_frame_ms=None,
input_to_next_paint_ms=None,
request_to_paint_ms=None,
response_to_paint_ms=None,
backend_to_request_ms=None,
backend_to_request_ms_raw=None,
backend_to_paint_ms=None,
backend_to_paint_ms_raw=None,
browser_backend_clock_offset_ms=None,
browser_backend_clock_rtt_ms=None,
browser_backend_clock_sample_count=0,
browser_backend_clock_calibrated_at=None,
)
def record_event(self, payload: dict[str, Any]) -> None:
backend_received_unix_ns = payload.get("backend_received_unix_ns")
request_started_unix_ms = payload.get("request_started_unix_ms")
response_received_unix_ms = payload.get("response_received_unix_ms")
paint_unix_ms = payload.get("paint_unix_ms")
browser_backend_clock_offset_ms = self._coerce_float(payload.get("browser_backend_clock_offset_ms"))
browser_backend_clock_rtt_ms = self._coerce_float(payload.get("browser_backend_clock_rtt_ms"))
browser_backend_clock_sample_count = self._coerce_int(payload.get("browser_backend_clock_sample_count"))
browser_backend_clock_calibrated_at = self._coerce_text(payload.get("browser_backend_clock_calibrated_at"))
request_to_paint_ms = self._duration_ms(paint_unix_ms, request_started_unix_ms, clamp_floor_zero=True)
response_to_paint_ms = self._duration_ms(paint_unix_ms, response_received_unix_ms, clamp_floor_zero=True)
backend_received_unix_ms = None
try:
if backend_received_unix_ns is not None:
backend_received_unix_ms = int(backend_received_unix_ns) / 1_000_000.0
except (TypeError, ValueError):
backend_received_unix_ms = None
backend_received_browser_unix_ms = None
if backend_received_unix_ms is not None and browser_backend_clock_offset_ms is not None:
backend_received_browser_unix_ms = round(backend_received_unix_ms + browser_backend_clock_offset_ms, 3)
backend_to_request_ms_raw = self._duration_ms(request_started_unix_ms, backend_received_unix_ms, clamp_floor_zero=False)
backend_to_paint_ms_raw = self._duration_ms(paint_unix_ms, backend_received_unix_ms, clamp_floor_zero=False)
backend_to_request_ms = self._duration_ms(
request_started_unix_ms,
backend_received_browser_unix_ms,
clamp_floor_zero=True,
)
backend_to_paint_ms = self._duration_ms(
paint_unix_ms,
backend_received_browser_unix_ms,
clamp_floor_zero=True,
)
status = VideoDisplayProbeStatus(
updated_at=self._coerce_text(payload.get("updated_at")),
frame_seq=int(payload["frame_seq"]) if payload.get("frame_seq") is not None else None,
frame_hash=str(payload.get("frame_hash") or ""),
input_to_next_fresh_frame_ms=self._coerce_float(payload.get("input_to_next_fresh_frame_ms")),
input_to_next_changed_frame_ms=self._coerce_float(payload.get("input_to_next_changed_frame_ms")),
input_to_next_paint_ms=self._coerce_float(payload.get("input_to_next_paint_ms")),
request_to_paint_ms=request_to_paint_ms,
response_to_paint_ms=response_to_paint_ms,
backend_to_request_ms=backend_to_request_ms,
backend_to_request_ms_raw=backend_to_request_ms_raw,
backend_to_paint_ms=backend_to_paint_ms,
backend_to_paint_ms_raw=backend_to_paint_ms_raw,
browser_backend_clock_offset_ms=browser_backend_clock_offset_ms,
browser_backend_clock_rtt_ms=browser_backend_clock_rtt_ms,
browser_backend_clock_sample_count=browser_backend_clock_sample_count,
browser_backend_clock_calibrated_at=browser_backend_clock_calibrated_at,
)
logged_payload = dict(payload)
logged_payload.update(
{
"request_to_paint_ms": request_to_paint_ms,
"response_to_paint_ms": response_to_paint_ms,
"backend_received_browser_unix_ms": backend_received_browser_unix_ms,
"backend_to_request_ms": backend_to_request_ms,
"backend_to_request_ms_raw": backend_to_request_ms_raw,
"backend_to_paint_ms": backend_to_paint_ms,
"backend_to_paint_ms_raw": backend_to_paint_ms_raw,
}
)
with self._lock:
self._latest = status
self._logger.write(logged_payload)
def get_status(self) -> dict[str, Any]:
with self._lock:
latest = self._latest
return {
"updated_at": latest.updated_at,
"frame_seq": latest.frame_seq,
"frame_hash": latest.frame_hash,
"input_to_next_fresh_frame_ms": latest.input_to_next_fresh_frame_ms,
"input_to_next_changed_frame_ms": latest.input_to_next_changed_frame_ms,
"input_to_next_paint_ms": latest.input_to_next_paint_ms,
"request_to_paint_ms": latest.request_to_paint_ms,
"response_to_paint_ms": latest.response_to_paint_ms,
"backend_to_request_ms": latest.backend_to_request_ms,
"backend_to_request_ms_raw": latest.backend_to_request_ms_raw,
"backend_to_paint_ms": latest.backend_to_paint_ms,
"backend_to_paint_ms_raw": latest.backend_to_paint_ms_raw,
"browser_backend_clock_offset_ms": latest.browser_backend_clock_offset_ms,
"browser_backend_clock_rtt_ms": latest.browser_backend_clock_rtt_ms,
"browser_backend_clock_sample_count": latest.browser_backend_clock_sample_count,
"browser_backend_clock_calibrated_at": latest.browser_backend_clock_calibrated_at,
}
def close(self) -> None:
self._logger.close()
@staticmethod
def _coerce_float(value: Any) -> float | None:
try:
if value is None:
return None
return round(float(value), 3)
except (TypeError, ValueError):
return None
@staticmethod
def _coerce_int(value: Any) -> int:
try:
if value is None:
return 0
return int(value)
except (TypeError, ValueError):
return 0
@staticmethod
def _coerce_text(value: Any) -> str | None:
text = str(value or "").strip()
return text or None
@classmethod
def _duration_ms(cls, end_ms: Any, start_ms: Any, *, clamp_floor_zero: bool) -> float | None:
end_value = cls._coerce_float(end_ms)
start_value = cls._coerce_float(start_ms)
if end_value is None or start_value is None:
return None
delta = round(end_value - start_value, 3)
if clamp_floor_zero:
delta = max(0.0, delta)
return delta
@dataclass(frozen=True)
class FrameTrailerMetadata:
timestamp_ns: int
latitude: float | None
longitude: float | None
capture_to_send_ms: int
raw_latitude_hex: str
raw_longitude_hex: str
received_at: float
@property
def has_gps_fix(self) -> bool:
return self.latitude is not None and self.longitude is not None
@dataclass(frozen=True)
class VideoDisplayProbeStatus:
updated_at: str | None
frame_seq: int | None
frame_hash: str
input_to_next_fresh_frame_ms: float | None
input_to_next_changed_frame_ms: float | None
input_to_next_paint_ms: float | None
request_to_paint_ms: float | None
response_to_paint_ms: float | None
backend_to_request_ms: float | None
backend_to_request_ms_raw: float | None
backend_to_paint_ms: float | None
backend_to_paint_ms_raw: float | None
browser_backend_clock_offset_ms: float | None
browser_backend_clock_rtt_ms: float | None
browser_backend_clock_sample_count: int
browser_backend_clock_calibrated_at: str | None
class OmniSocketVideoReceiver:
def __init__(self) -> None:
self._lock = threading.Lock()
self._thread: threading.Thread | None = None
self._started = False
self._session = None
self._session_cls = None
self._binary_msg_type = None
self._video_defaults: dict[str, Any] = {}
self._latest_frame: bytes | None = None
self._latest_received_at = 0.0
self._latest_backend_received_unix_ns = 0
self._latest_backend_received_mono_ns = 0
self._latest_sequence: int | None = None
self._latest_metadata: FrameTrailerMetadata | None = None
self._latest_sender_clock_delta_ms_raw: float | None = None
self._latest_timestamp_unit: str | None = None
self._latest_timestamp_endianness: str | None = None
self._sender_clock_delta_samples_ms_raw: deque[float] = deque(maxlen=VIDEO_TIMESTAMP_SAMPLE_SIZE)
self._latest_frame_hash = ""
self._latest_frame_bytes = 0
self._last_frame_hash = ""
self._last_sequence: int | None = None
self._last_backend_received_mono_ns = 0
self._interarrival_ms_samples: deque[float] = deque(maxlen=120)
self._repeat_samples: deque[int] = deque(maxlen=120)
self._skip_samples: deque[int] = deque(maxlen=120)
self._freeze_samples_ms: deque[float] = deque(maxlen=120)
self._current_stale_frame_run_length = 0
self._latest_remote_video_srtt_ms: int | None = None
self._frames_received = 0
self._last_error = ""
self._reconnect_count = 0
self._ever_connected = False
self._closing = threading.Event()
self._frame_recv_logger = JsonlRunLogger("BLITZ_A_VIDEO_FRAME_RECV_LOG_PATH", "a-video-frame-recv")
self._load_backend()
def _load_backend(self) -> None:
try:
self._import_backend()
except Exception as error: # pragma: no cover - optional runtime dependency
self._last_error = f"omnisocket import failed: {error}"
def _import_backend(self) -> None:
try:
from omnisocket import MSG_TYPE_BINARY, Session, VIDEO_DEFAULTS # type: ignore
except ImportError:
python_dir = WORKSPACE_ROOT / "OmniSocketGo" / "python"
if python_dir.exists():
sys.path.insert(0, str(python_dir))
from omnisocket import MSG_TYPE_BINARY, Session, VIDEO_DEFAULTS # type: ignore
self._binary_msg_type = MSG_TYPE_BINARY
self._session_cls = Session
self._video_defaults = dict(VIDEO_DEFAULTS)
def ensure_started(self) -> None:
if self._session_cls is None or self._binary_msg_type is None:
return
with self._lock:
if self._started or self._closing.is_set():
return
self._started = True
self._thread = threading.Thread(
target=self._run,
name="omnisocket-video-receiver",
daemon=True,
)
self._thread.start()
def _connect_session(self):
assert self._session_cls is not None
config = load_omnisocket_config()
transport_cfg = config.get("transport", {})
video_cfg = config.get("video_receiver", {})
session = self._session_cls()
session.connect(
server_addr=str(transport_cfg.get("server_addr", "127.0.0.1:10909")),
peer_id=str(video_cfg.get("peer_id", "peer-a-video")),
relay_via=str(transport_cfg.get("relay_via", "")),
bind_ip=str(transport_cfg.get("bind_ip", "")),
bind_device=str(transport_cfg.get("bind_device", "")),
**self._video_defaults,
)
return session, int(video_cfg.get("buffer_bytes", 1024 * 1024))
def _extract_jpeg_payload(self, frame: bytes) -> bytes | None:
if frame.startswith(b"\xff\xd8"):
return frame
if len(frame) > 8 and frame[8:10] == b"\xff\xd8":
return frame[8:]
return None
def _split_jpeg_frame_and_trailer(self, frame: bytes) -> tuple[bytes, bytes] | None:
jpeg_payload = self._extract_jpeg_payload(frame)
if jpeg_payload is None:
return None
if jpeg_payload.endswith(b"\xff\xd9"):
return jpeg_payload, b""
if (
len(jpeg_payload) >= VIDEO_TRAILER_BYTES + 2
and jpeg_payload[-(VIDEO_TRAILER_BYTES + 2) : -VIDEO_TRAILER_BYTES] == b"\xff\xd9"
):
return jpeg_payload[:-VIDEO_TRAILER_BYTES], jpeg_payload[-VIDEO_TRAILER_BYTES:]
eoi_index = jpeg_payload.rfind(b"\xff\xd9")
if eoi_index < 0:
return jpeg_payload, b""
trailer_start = eoi_index + 2
return jpeg_payload[:trailer_start], jpeg_payload[trailer_start:]
def _extract_jpeg_frame(self, frame: bytes) -> bytes | None:
split_payload = self._split_jpeg_frame_and_trailer(frame)
if split_payload is None:
return None
jpeg_frame, _ = split_payload
return jpeg_frame
def _extract_sequence(self, frame: bytes) -> int | None:
if len(frame) >= 8 and not frame.startswith(b"\xff\xd8"):
return int.from_bytes(frame[:8], "big")
return None
def _extract_frame_tail(self, frame: bytes) -> bytes:
split_payload = self._split_jpeg_frame_and_trailer(frame)
if split_payload is None:
return b""
_, trailer = split_payload
return trailer
def _extract_frame_metadata(self, frame: bytes, received_at: float | None = None) -> FrameTrailerMetadata | None:
trailer = self._extract_frame_tail(frame)
if len(trailer) != VIDEO_TRAILER_BYTES:
return None
try:
timestamp_ms, latitude, longitude, capture_to_send_ms = VIDEO_TRAILER_STRUCT.unpack(trailer)
except struct.error:
return None
if timestamp_ms <= 0:
return None
timestamp_ns = timestamp_ms * VIDEO_TRAILER_TIMESTAMP_MULTIPLIER_NS
if abs(time.time_ns() - timestamp_ns) > VIDEO_TRAILER_TIMESTAMP_MAX_SKEW_NS:
return None
gps_fix_available = (
math.isfinite(latitude)
and math.isfinite(longitude)
and (-90.0 <= latitude <= 90.0)
and (-180.0 <= longitude <= 180.0)
and not (abs(latitude) < 1e-9 and abs(longitude) < 1e-9)
)
return FrameTrailerMetadata(
timestamp_ns=timestamp_ns,
latitude=latitude if gps_fix_available else None,
longitude=longitude if gps_fix_available else None,
capture_to_send_ms=int(capture_to_send_ms),
raw_latitude_hex=trailer[8:16].hex(),
raw_longitude_hex=trailer[16:24].hex(),
received_at=received_at if received_at is not None else time.time(),
)
def _has_fresh_frame_locked(self) -> bool:
return self._latest_frame is not None and (
time.time() - self._latest_received_at <= OMNISOCKET_FRAME_FRESH_SECONDS
)
def update_remote_video_srtt(self, srtt_ms: int | None) -> None:
with self._lock:
self._latest_remote_video_srtt_ms = srtt_ms
def _freshness_payload_locked(self) -> dict[str, Any]:
interarrival_samples = list(self._interarrival_ms_samples)
repeat_samples = list(self._repeat_samples)
skip_samples = list(self._skip_samples)
freeze_samples_ms = list(self._freeze_samples_ms)
inter_frame_avg_ms = round(sum(interarrival_samples) / len(interarrival_samples), 3) if interarrival_samples else None
if interarrival_samples:
ordered = sorted(interarrival_samples)
p95_index = min(len(ordered) - 1, max(0, math.ceil(len(ordered) * 0.95) - 1))
inter_frame_p95_ms = round(ordered[p95_index], 3)
else:
inter_frame_p95_ms = None
repeated_frame_ratio = round(sum(repeat_samples) / len(repeat_samples), 4) if repeat_samples else 0.0
total_skip = sum(skip_samples)
expected_frames = len(skip_samples) + total_skip
skip_ratio = round(total_skip / expected_frames, 4) if expected_frames > 0 else 0.0
longest_freeze_ms = round(max(freeze_samples_ms), 3) if freeze_samples_ms else 0.0
return {
"inter_frame_avg_ms": inter_frame_avg_ms,
"inter_frame_p95_ms": inter_frame_p95_ms,
"repeated_frame_ratio": repeated_frame_ratio,
"skip_ratio": skip_ratio,
"longest_freeze_ms": longest_freeze_ms,
"stale_frame_run_length": self._current_stale_frame_run_length,
"relative_freshness_lag_frames": self._current_stale_frame_run_length + (skip_samples[-1] if skip_samples else 0),
}
def _frame_headers_locked(self) -> dict[str, str]:
capture_to_send_ms = self._latest_metadata.capture_to_send_ms if self._latest_metadata is not None else None
headers: dict[str, str] = {}
if self._latest_sequence is not None:
headers["X-Blitz-Frame-Seq"] = str(self._latest_sequence)
if self._latest_backend_received_unix_ns > 0:
headers["X-Blitz-Backend-Received-Unix-Ns"] = str(self._latest_backend_received_unix_ns)
if self._latest_frame_hash:
headers["X-Blitz-Frame-Hash"] = self._latest_frame_hash
if capture_to_send_ms is not None:
headers["X-Blitz-BSide-Capture-To-Send-Ms"] = str(capture_to_send_ms)
return headers
def get_latest_frame_headers(self) -> dict[str, str]:
snapshot = self.get_latest_frame_snapshot()
return snapshot[1] if snapshot is not None else {}
def _run(self) -> None:
while not self._closing.is_set():
try:
session, buffer_bytes = self._connect_session()
with self._lock:
self._session = session
self._last_error = ""
if self._ever_connected:
self._reconnect_count += 1
else:
self._ever_connected = True
buffer = bytearray(buffer_bytes)
while not self._closing.is_set():
meta = session.recv_into(buffer, timeout_ms=1000)
if meta is None:
continue
if meta.get("msg_type") != self._binary_msg_type:
continue
frame = bytes(buffer[: meta["body_len"]])
jpeg_frame = self._extract_jpeg_frame(frame)
if jpeg_frame is None:
self._last_error = "received non-JPEG binary frame"
continue
received_at = time.time()
received_unix_ns = time.time_ns()
received_mono_ns = time.monotonic_ns()
frame_metadata = self._extract_frame_metadata(frame, received_at=received_at)
sender_clock_delta_ms_raw = None
if frame_metadata is not None:
sender_clock_delta_ms_raw = round((received_unix_ns - frame_metadata.timestamp_ns) / 1_000_000, 3)
unit = VIDEO_TRAILER_TIMESTAMP_UNIT
endianness = VIDEO_TRAILER_ENDIANNESS
else:
unit = None
endianness = None
frame_sequence = self._extract_sequence(frame)
frame_hash = hashlib.blake2s(jpeg_frame, digest_size=8).hexdigest()
local_kcp = safe_kcp_stats(session) if self._frame_recv_logger.enabled else {}
frame_log_record: dict[str, Any] | None = None
with self._lock:
interarrival_ms = None
if self._last_backend_received_mono_ns > 0:
interarrival_ms = round((received_mono_ns - self._last_backend_received_mono_ns) / 1_000_000, 3)
self._interarrival_ms_samples.append(interarrival_ms)
sequence_gap = 0
if frame_sequence is not None and self._last_sequence is not None and frame_sequence > self._last_sequence:
sequence_gap = max(0, frame_sequence - self._last_sequence - 1)
repeat_flag = bool(self._last_frame_hash) and frame_hash == self._last_frame_hash
self._repeat_samples.append(1 if repeat_flag else 0)
self._skip_samples.append(sequence_gap)
if repeat_flag:
self._current_stale_frame_run_length += 1
else:
self._current_stale_frame_run_length = 0
if interarrival_ms is not None and repeat_flag:
self._freeze_samples_ms.append(interarrival_ms)
elif interarrival_ms is not None:
self._freeze_samples_ms.append(0.0)
self._latest_frame = jpeg_frame
self._latest_received_at = received_at
self._latest_backend_received_unix_ns = received_unix_ns
self._latest_backend_received_mono_ns = received_mono_ns
self._latest_sequence = frame_sequence
self._last_sequence = frame_sequence
self._latest_metadata = frame_metadata
self._latest_sender_clock_delta_ms_raw = sender_clock_delta_ms_raw
self._latest_timestamp_unit = unit
self._latest_timestamp_endianness = endianness
self._latest_frame_hash = frame_hash
self._latest_frame_bytes = len(jpeg_frame)
self._last_frame_hash = frame_hash
self._last_backend_received_mono_ns = received_mono_ns
if sender_clock_delta_ms_raw is not None:
self._sender_clock_delta_samples_ms_raw.append(sender_clock_delta_ms_raw)
self._frames_received += 1
if self._frame_recv_logger.enabled:
frame_log_record = {
"ts_unix_nano": received_unix_ns,
"frame_seq": frame_sequence,
"backend_received_unix_ns": received_unix_ns,
"backend_received_mono_ns": received_mono_ns,
"jpeg_bytes": len(jpeg_frame),
"interarrival_ms": interarrival_ms,
"sequence_gap": sequence_gap,
"repeat_flag": repeat_flag,
"skip_count": sequence_gap,
"frame_hash": frame_hash,
"a_to_d_video_srtt_ms": local_kcp.get("srtt_ms"),
"d_to_b_video_srtt_ms": self._latest_remote_video_srtt_ms,
"b_side_capture_to_send_ms": frame_metadata.capture_to_send_ms if frame_metadata is not None else None,
"sender_clock_delta_ms_raw": sender_clock_delta_ms_raw,
}
if frame_log_record is not None:
self._frame_recv_logger.write(frame_log_record)
except Exception as error: # pragma: no cover - runtime integration path
if not self._closing.is_set():
session_error = ""
if self._session is not None:
try:
session_error = str(dict(self._session.stats()).get("last_server_error", "") or "")
except Exception:
session_error = ""
self._last_error = session_error or str(error)
time.sleep(2)
finally:
if self._session is not None:
try:
self._session.close()
except Exception:
pass
with self._lock:
self._session = None
if self._closing.is_set():
self._started = False
def get_latest_frame(self) -> bytes | None:
snapshot = self.get_latest_frame_snapshot()
return snapshot[0] if snapshot is not None else None
def get_latest_frame_snapshot(self) -> tuple[bytes, dict[str, str]] | None:
self.ensure_started()
with self._lock:
if not self._has_fresh_frame_locked():
return None
if self._latest_frame is None:
return None
return self._latest_frame, self._frame_headers_locked()
def get_latest_frame_metadata(self) -> FrameTrailerMetadata | None:
self.ensure_started()
with self._lock:
if not self._has_fresh_frame_locked():
return None
return self._latest_metadata
def session_stats(self) -> dict[str, Any]:
self.ensure_started()
with self._lock:
session = self._session
if session is None:
return {"connected": 0, "registered": 0, "last_server_error": self._last_error}
try:
return dict(session.stats())
except Exception:
return {"connected": 0, "registered": 0, "last_server_error": self._last_error}
def session_kcp_stats(self) -> dict[str, Any]:
self.ensure_started()
with self._lock:
session = self._session
return safe_kcp_stats(session)
def get_status(self) -> dict[str, Any]:
self.ensure_started()
config = load_omnisocket_config()
transport_cfg = config.get("transport", {})
video_cfg = config.get("video_receiver", {})
session_stats = self.session_stats()
with self._lock:
has_recent_frame = self._has_fresh_frame_locked()
freshness_status = self._freshness_payload_locked()
if has_recent_frame and self._latest_sender_clock_delta_ms_raw is not None:
timing_status = {
"available": True,
"sender_clock_delta_ms_raw": self._latest_sender_clock_delta_ms_raw,
"sender_clock_delta_samples_ms_raw": list(reversed(self._sender_clock_delta_samples_ms_raw)),
"sample_count": len(self._sender_clock_delta_samples_ms_raw),
"sample_window_size": VIDEO_TIMESTAMP_SAMPLE_SIZE,
"timestamp_unit": self._latest_timestamp_unit,
"timestamp_endianness": self._latest_timestamp_endianness,
"unsynced_clock": True,
}
else:
timing_status = {
"available": False,
"sender_clock_delta_ms_raw": None,
"sender_clock_delta_samples_ms_raw": [],
"sample_count": 0,
"sample_window_size": VIDEO_TIMESTAMP_SAMPLE_SIZE,
"timestamp_unit": None,
"timestamp_endianness": None,
"unsynced_clock": True,
}
return {
"backend_ready": self._session_cls is not None,
"mode": VIDEO_SOURCE_MODE,
"connected": self._session is not None,
"registered": bool(session_stats.get("registered", 0)),
"has_recent_frame": has_recent_frame,
"frames_received": self._frames_received,
"latest_sequence": self._latest_sequence,
"latest_frame_hash": self._latest_frame_hash,
"latest_backend_received_unix_ns": self._latest_backend_received_unix_ns or None,
"latest_backend_received_mono_ns": self._latest_backend_received_mono_ns or None,
"latest_frame_bytes": self._latest_frame_bytes,
"latest_capture_to_send_ms": self._latest_metadata.capture_to_send_ms if self._latest_metadata is not None else None,
"reconnect_count": self._reconnect_count,
"last_server_error": str(session_stats.get("last_server_error", "") or ""),
"last_error": self._last_error,
"config_path": str(OMNISOCKET_CONFIG_PATH),
"server_addr": str(transport_cfg.get("server_addr", "")),
"relay_via": str(transport_cfg.get("relay_via", "")),
"peer_id": str(video_cfg.get("peer_id", "")),
"buffer_bytes": int(video_cfg.get("buffer_bytes", 0)),
"timing": timing_status,
"freshness": freshness_status,
}
def close(self) -> None:
self._closing.set()
with self._lock:
session = self._session
if session is not None:
try:
session.close()
except Exception:
pass
thread = self._thread
if thread is not None and thread.is_alive():
thread.join(timeout=0.5)
self._frame_recv_logger.close()
class VideoFrameService:
def __init__(self, receiver: OmniSocketVideoReceiver, display_probe_store: VideoDisplayProbeStore) -> None:
self._receiver = receiver
self._display_probe_store = display_probe_store
def get_status(self) -> dict[str, Any]:
receiver_status = self._receiver.get_status()
receiver_frame = self._receiver.get_latest_frame()
display_probe_status = self._display_probe_store.get_status()
if receiver_frame is not None:
return {
"available": True,
"source_mode": "omnisocket-jpeg-live",
"frame_count": receiver_status["frames_received"],
"fps": 30,
"frame_dir": str(JPEG_FRAME_DIR),
"source_detail": f"peer stream active, frames={receiver_status['frames_received']}",
"receiver": receiver_status,
"timing": receiver_status["timing"],
"freshness": receiver_status.get("freshness", {}),
"display_probe": display_probe_status,
}
wait_detail = receiver_status["last_error"] or (
"waiting for live OmniSocket JPEG frames; check the hub, sender, and receiver configuration"
)
return {
"available": False,
"source_mode": "omnisocket-waiting",
"frame_count": receiver_status["frames_received"],
"fps": 30,
"frame_dir": str(JPEG_FRAME_DIR),
"source_detail": wait_detail,
"receiver": receiver_status,
"timing": receiver_status["timing"],
"freshness": receiver_status.get("freshness", {}),
"display_probe": display_probe_status,
}
def get_next_frame(self) -> bytes:
receiver_frame = self._receiver.get_latest_frame()
if receiver_frame is not None:
return receiver_frame
raise RuntimeError("no live OmniSocket JPEG frame is currently available")
def get_next_frame_with_headers(self) -> tuple[bytes, dict[str, str]]:
snapshot = self._receiver.get_latest_frame_snapshot()
if snapshot is not None:
return snapshot
raise RuntimeError("no live OmniSocket JPEG frame is currently available")
def get_latest_frame_headers(self) -> dict[str, str]:
return self._receiver.get_latest_frame_headers()
def record_display_probe(self, payload: dict[str, Any]) -> None:
self._display_probe_store.record_event(payload)
def iter_mjpeg(self, fps: float = 6.0) -> Iterator[bytes]:
frame_interval = 1.0 / max(1.0, min(fps, 30.0))
while True:
frame = self.get_next_frame()
header = (
b"--frame\r\n"
b"Content-Type: image/jpeg\r\n"
+ f"Content-Length: {len(frame)}\r\n\r\n".encode("ascii")
)
yield header + frame + b"\r\n"
time.sleep(frame_interval)