feat: 视频与控制程序合并

This commit is contained in:
2026-04-04 23:25:52 +08:00
parent 1a41905d4c
commit b0dcf7b571
17 changed files with 1674 additions and 554 deletions

View File

@@ -9,8 +9,23 @@ https://docs.djangoproject.com/en/5.2/howto/deployment/asgi/
import os
from channels.auth import AuthMiddlewareStack
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.security.websocket import OriginValidator
from django.conf import settings
from django.core.asgi import get_asgi_application
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings')
application = get_asgi_application()
django_asgi_app = get_asgi_application()
from monitoring.routing import websocket_urlpatterns
application = ProtocolTypeRouter({
"http": django_asgi_app,
"websocket": OriginValidator(
AuthMiddlewareStack(URLRouter(websocket_urlpatterns)),
settings.CONTROL_WS_ALLOWED_ORIGINS,
),
})

View File

@@ -1,7 +1,13 @@
import os
from pathlib import Path
BASE_DIR = Path(__file__).resolve().parent.parent
def _split_csv_env(name: str) -> list[str]:
value = os.getenv(name, "")
return [item.strip().rstrip("/") for item in value.split(",") if item.strip()]
SECRET_KEY = 'django-insecure-pk4scm@ifo%mao6l=j0@-$_v+pg-43^hj4a!199^)zivz-_8xu'
DEBUG = True
ALLOWED_HOSTS = ["*"]
@@ -87,3 +93,16 @@ STATIC_URL = 'static/'
CORS_ALLOW_ALL_ORIGINS = True
DEFAULT_AUTO_FIELD = 'django.db.models.BigAutoField'
CONTROL_WS_ALLOWED_ORIGINS = _split_csv_env('CONTROL_WS_ALLOWED_ORIGINS') or [
'http://127.0.0.1',
'http://127.0.0.1:5173',
'http://127.0.0.1:4173',
'http://127.0.0.1:8001',
'https://127.0.0.1',
'http://localhost:5173',
'http://localhost:4173',
'http://localhost',
'http://localhost:8001',
'https://localhost',
]

View File

@@ -0,0 +1,176 @@
from __future__ import annotations
import os
import struct
from datetime import UTC, datetime
from pathlib import Path
from typing import Any
PROJECT_ROOT = Path(__file__).resolve().parents[2]
WORKSPACE_ROOT = PROJECT_ROOT.parent
JPEG_FRAME_DIR = WORKSPACE_ROOT / "RobotDataShow" / "jpeg-frames"
GEOSTREAM_JSON_PATH = WORKSPACE_ROOT / "GeoStream" / "gps_latest.json"
GEOSTREAM_STALE_SECONDS = 15
OMNISOCKET_CONFIG_PATH = PROJECT_ROOT / "config" / "omnisocket_demo.yaml"
VIDEO_SOURCE_MODE = os.getenv("VIDEO_SOURCE_MODE", "auto").strip().lower()
OMNISOCKET_FRAME_FRESH_SECONDS = 2.0
VIDEO_TIMESTAMP_SAMPLE_SIZE = 10
VIDEO_TIMESTAMP_TRAILER_BYTES = 8
VIDEO_TIMESTAMP_ENDIANNESS = "little"
VIDEO_TIMESTAMP_UNIT = "ms"
VIDEO_TIMESTAMP_MULTIPLIER_NS = 1_000_000
VIDEO_TIMESTAMP_MAX_SKEW_NS = 7 * 24 * 60 * 60 * 1_000_000_000
CONTROL_PACKET = struct.Struct("<6f")
CONTROL_PACKET_SIZE = CONTROL_PACKET.size
CONTROL_SOURCE_NATIVE_UDP = "native_udp"
CONTROL_SOURCE_WEB = "web"
CONTROL_SOURCE_PRIORITY = (CONTROL_SOURCE_NATIVE_UDP, CONTROL_SOURCE_WEB)
ZERO_CONTROL_PAYLOAD = CONTROL_PACKET.pack(0.0, 0.0, 0.0, 0.0, 0.0, 0.0)
def utc_iso_now() -> str:
return datetime.now(UTC).isoformat(timespec="seconds").replace("+00:00", "Z")
def parse_simple_yaml_scalar(value: str) -> Any:
if value in {'""', "''"}:
return ""
if len(value) >= 2 and value[0] == value[-1] and value[0] in {'"', "'"}:
return value[1:-1]
if value.lower() == "true":
return True
if value.lower() == "false":
return False
if value and value.lstrip("-").isdigit():
return int(value)
return value
def load_simple_yaml_config(path: Path) -> dict[str, Any]:
parsed: dict[str, Any] = {}
current_section: str | None = None
with path.open("r", encoding="utf-8") as file:
for raw_line in file:
line = raw_line.split("#", 1)[0].rstrip()
if not line.strip():
continue
if not line.startswith(" "):
if not line.endswith(":"):
raise ValueError(f"invalid top-level yaml line: {raw_line.strip()}")
current_section = line[:-1].strip()
parsed[current_section] = {}
continue
if current_section is None:
raise ValueError(f"yaml key outside section: {raw_line.strip()}")
stripped = line.strip()
if ":" not in stripped:
raise ValueError(f"invalid yaml key line: {raw_line.strip()}")
key, value = stripped.split(":", 1)
parsed[current_section][key.strip()] = parse_simple_yaml_scalar(value.strip())
return parsed
def load_omnisocket_config() -> dict[str, Any]:
config: dict[str, Any] = {}
if OMNISOCKET_CONFIG_PATH.exists():
try:
try:
import yaml # type: ignore
with OMNISOCKET_CONFIG_PATH.open("r", encoding="utf-8") as file:
config = yaml.safe_load(file) or {}
except ImportError:
config = load_simple_yaml_config(OMNISOCKET_CONFIG_PATH)
except Exception:
config = {}
transport_cfg = dict(config.get("transport", {}))
video_receiver_cfg = dict(config.get("video_receiver", {}))
control_sender_cfg = dict(config.get("control_sender", {}))
control_ingress_cfg = dict(config.get("control_ingress", {}))
transport_cfg["server_addr"] = os.getenv(
"OMNISOCKET_SERVER_ADDR",
str(transport_cfg.get("server_addr", "127.0.0.1:10909")),
)
transport_cfg["relay_via"] = os.getenv(
"OMNISOCKET_RELAY_VIA",
str(transport_cfg.get("relay_via", "")),
)
transport_cfg["bind_ip"] = os.getenv(
"OMNISOCKET_BIND_IP",
str(transport_cfg.get("bind_ip", "")),
)
transport_cfg["bind_device"] = os.getenv(
"OMNISOCKET_BIND_DEVICE",
str(transport_cfg.get("bind_device", "")),
)
video_receiver_cfg["peer_id"] = os.getenv(
"OMNISOCKET_VIDEO_PEER_ID",
str(video_receiver_cfg.get("peer_id", "peer-a-video")),
)
video_receiver_cfg["buffer_bytes"] = int(
os.getenv(
"OMNISOCKET_BUFFER_BYTES",
str(video_receiver_cfg.get("buffer_bytes", 1024 * 1024)),
)
)
control_sender_cfg["peer_id"] = os.getenv(
"OMNISOCKET_CONTROL_PEER_ID",
str(control_sender_cfg.get("peer_id", "peer-a-ctrl")),
)
control_sender_cfg["target_peer"] = os.getenv(
"OMNISOCKET_CONTROL_TARGET_PEER",
str(control_sender_cfg.get("target_peer", "peer-b-ctrl")),
)
control_ingress_cfg["native_udp_bind"] = os.getenv(
"OMNISOCKET_CONTROL_NATIVE_UDP_BIND",
str(control_ingress_cfg.get("native_udp_bind", "127.0.0.1:10921")),
)
control_ingress_cfg["source_lease_ms"] = int(
os.getenv(
"OMNISOCKET_CONTROL_SOURCE_LEASE_MS",
str(control_ingress_cfg.get("source_lease_ms", 300)),
)
)
control_ingress_cfg["send_rate_hz"] = float(
os.getenv(
"OMNISOCKET_CONTROL_SEND_RATE_HZ",
str(control_ingress_cfg.get("send_rate_hz", 20.0)),
)
)
control_ingress_cfg["zero_burst_packets"] = int(
os.getenv(
"OMNISOCKET_CONTROL_ZERO_BURST_PACKETS",
str(control_ingress_cfg.get("zero_burst_packets", 3)),
)
)
return {
"transport": transport_cfg,
"video_receiver": video_receiver_cfg,
"control_sender": control_sender_cfg,
"control_ingress": control_ingress_cfg,
}
def parse_host_port(bind_addr: str) -> tuple[str, int]:
host, port_text = bind_addr.rsplit(":", 1)
host = host.strip() or "127.0.0.1"
port = int(port_text)
if port <= 0 or port > 65535:
raise ValueError(f"invalid port in bind address: {bind_addr}")
return host, port

View File

@@ -0,0 +1,42 @@
from __future__ import annotations
import json
from channels.generic.websocket import WebsocketConsumer
from .common import CONTROL_PACKET_SIZE, CONTROL_SOURCE_WEB
from .services import control_arbiter, native_control_ingress
class ControlConsumer(WebsocketConsumer):
def connect(self) -> None:
control_arbiter.ensure_started()
native_control_ingress.ensure_started()
self.accept()
self.send(
text_data=json.dumps(
{
"type": "ready",
"packet_bytes": CONTROL_PACKET_SIZE,
}
)
)
def receive(self, text_data: str | None = None, bytes_data: bytes | None = None) -> None:
if bytes_data is None:
self.send(text_data=json.dumps({"type": "error", "detail": "binary control payload required"}))
return
if len(bytes_data) != CONTROL_PACKET_SIZE:
self.send(
text_data=json.dumps(
{
"type": "error",
"detail": f"expected {CONTROL_PACKET_SIZE} bytes, got {len(bytes_data)}",
}
)
)
return
control_arbiter.ingest_command(CONTROL_SOURCE_WEB, bytes_data)

View File

@@ -0,0 +1,423 @@
from __future__ import annotations
import socket
import sys
import threading
import time
from typing import Any
from .common import (
CONTROL_PACKET_SIZE,
CONTROL_SOURCE_NATIVE_UDP,
CONTROL_SOURCE_PRIORITY,
ZERO_CONTROL_PAYLOAD,
WORKSPACE_ROOT,
load_omnisocket_config,
parse_host_port,
)
from .video import safe_kcp_stats
class OmniSocketControlSender:
def __init__(self) -> None:
self._lock = threading.Lock()
self._session = None
self._session_cls = None
self._msg_type_error = None
self._control_defaults: dict[str, Any] = {}
self._started = False
self._drain_thread: threading.Thread | None = None
self._closing = threading.Event()
self._target_peer = ""
self._send_count = 0
self._send_errors = 0
self._drain_errors = 0
self._last_error = ""
self._load_backend()
def _load_backend(self) -> None:
try:
self._import_backend()
except Exception as error: # pragma: no cover - optional runtime dependency
self._last_error = f"omnisocket import failed: {error}"
def _import_backend(self) -> None:
try:
from omnisocket import CONTROL_DEFAULTS, MSG_TYPE_ERROR, Session # type: ignore
except ImportError:
python_dir = WORKSPACE_ROOT / "OmniSocketGo" / "python"
if python_dir.exists():
sys.path.insert(0, str(python_dir))
from omnisocket import CONTROL_DEFAULTS, MSG_TYPE_ERROR, Session # type: ignore
self._session_cls = Session
self._msg_type_error = MSG_TYPE_ERROR
self._control_defaults = dict(CONTROL_DEFAULTS)
def _connect_session(self):
assert self._session_cls is not None
config = load_omnisocket_config()
transport_cfg = config.get("transport", {})
control_cfg = config.get("control_sender", {})
session = self._session_cls()
session.connect(
server_addr=str(transport_cfg.get("server_addr", "127.0.0.1:10909")),
peer_id=str(control_cfg.get("peer_id", "peer-a-ctrl")),
relay_via=str(transport_cfg.get("relay_via", "")),
bind_ip=str(transport_cfg.get("bind_ip", "")),
bind_device=str(transport_cfg.get("bind_device", "")),
**self._control_defaults,
)
target_peer = str(control_cfg.get("target_peer", "peer-b-ctrl"))
return session, target_peer
def ensure_started(self) -> None:
if self._session_cls is None:
return
with self._lock:
if self._started and self._session is not None:
return
session, target_peer = self._connect_session()
self._session = session
self._target_peer = target_peer
self._closing.clear()
self._started = True
self._last_error = ""
self._drain_thread = threading.Thread(
target=self._drain_loop,
name="omnisocket-control-drain",
daemon=True,
)
self._drain_thread.start()
def _reset_session(self, session: Any | None) -> None:
with self._lock:
if session is not None and session is not self._session:
return
current = self._session
self._session = None
self._started = False
if current is not None:
try:
current.close()
except Exception:
pass
def send_payload(self, payload: bytes) -> None:
if len(payload) != CONTROL_PACKET_SIZE:
raise ValueError(f"expected {CONTROL_PACKET_SIZE} bytes, got {len(payload)}")
self.ensure_started()
with self._lock:
session = self._session
target_peer = self._target_peer
if session is None:
raise RuntimeError("control session is not available")
try:
session.send(to=target_peer, data=payload)
except Exception as error:
with self._lock:
self._send_errors += 1
self._last_error = str(error)
self._reset_session(session)
raise
with self._lock:
self._send_count += 1
def send_zero_burst(self, count: int) -> None:
for _ in range(max(0, count)):
try:
self.send_payload(ZERO_CONTROL_PAYLOAD)
except Exception:
return
def _drain_loop(self) -> None:
while not self._closing.is_set():
with self._lock:
session = self._session
if session is None:
return
try:
result = session.recv(timeout_ms=100)
except Exception as error:
with self._lock:
self._drain_errors += 1
self._last_error = str(error)
if not self._closing.is_set():
self._reset_session(session)
return
if result is None:
continue
from_peer, msg_type, payload = result
if msg_type == self._msg_type_error:
text = payload.decode("utf-8", errors="replace")
with self._lock:
self._last_error = f"server error from {from_peer}: {text}"
def session_stats(self) -> dict[str, Any]:
with self._lock:
session = self._session
if session is None:
return {"connected": 0}
try:
return dict(session.stats())
except Exception:
return {"connected": 0}
def session_kcp_stats(self) -> dict[str, Any]:
with self._lock:
session = self._session
return safe_kcp_stats(session)
def get_status(self) -> dict[str, Any]:
config = load_omnisocket_config()
control_cfg = config.get("control_sender", {})
with self._lock:
return {
"backend_ready": self._session_cls is not None,
"started": self._started,
"connected": self._session is not None,
"peer_id": str(control_cfg.get("peer_id", "")),
"target_peer": str(control_cfg.get("target_peer", "")),
"send_count": self._send_count,
"send_errors": self._send_errors,
"drain_errors": self._drain_errors,
"last_error": self._last_error,
}
def close(self) -> None:
self._closing.set()
self.send_zero_burst(1)
self._reset_session(None)
drain_thread = self._drain_thread
if drain_thread is not None and drain_thread.is_alive():
drain_thread.join(timeout=0.5)
class ControlArbiter:
def __init__(self, sender: OmniSocketControlSender) -> None:
self._sender = sender
self._lock = threading.Lock()
self._thread: threading.Thread | None = None
self._closing = threading.Event()
self._started = False
self._source_lease_ms = 300
self._send_rate_hz = 20.0
self._zero_burst_packets = 3
self._latest_by_source: dict[str, tuple[bytes, float]] = {}
self._packet_counts = {source: 0 for source in CONTROL_SOURCE_PRIORITY}
self._last_payload = ZERO_CONTROL_PAYLOAD
self._last_sent_at = 0.0
self._active_source: str | None = None
self._last_error = ""
def _load_config(self) -> None:
cfg = load_omnisocket_config().get("control_ingress", {})
self._source_lease_ms = max(50, int(cfg.get("source_lease_ms", 300)))
self._send_rate_hz = max(1.0, float(cfg.get("send_rate_hz", 20.0)))
self._zero_burst_packets = max(1, int(cfg.get("zero_burst_packets", 3)))
def ensure_started(self) -> None:
self._load_config()
with self._lock:
if self._started:
return
self._started = True
self._thread = threading.Thread(
target=self._send_loop,
name="control-arbiter",
daemon=True,
)
self._thread.start()
def ingest_command(self, source: str, payload: bytes) -> None:
if source not in CONTROL_SOURCE_PRIORITY:
raise ValueError(f"unsupported control source: {source}")
if len(payload) != CONTROL_PACKET_SIZE:
raise ValueError(f"expected {CONTROL_PACKET_SIZE} bytes, got {len(payload)}")
self.ensure_started()
now = time.monotonic()
with self._lock:
self._latest_by_source[source] = (payload, now)
self._packet_counts[source] += 1
def _resolve_active_locked(self, now: float) -> tuple[str | None, bytes, int]:
lease_seconds = self._source_lease_ms / 1000.0
expired_sources = [
source
for source, (_, updated_at) in self._latest_by_source.items()
if (now - updated_at) > lease_seconds
]
for source in expired_sources:
self._latest_by_source.pop(source, None)
for source in CONTROL_SOURCE_PRIORITY:
entry = self._latest_by_source.get(source)
if entry is None:
continue
payload, updated_at = entry
remaining_ms = max(0, int((lease_seconds - (now - updated_at)) * 1000))
return source, payload, remaining_ms
return None, ZERO_CONTROL_PAYLOAD, 0
def _send_loop(self) -> None:
interval = 1.0 / max(self._send_rate_hz, 1.0)
previous_active: str | None = None
while not self._closing.is_set():
now = time.monotonic()
with self._lock:
active_source, payload, _lease_ms = self._resolve_active_locked(now)
self._active_source = active_source
self._last_payload = payload
if previous_active is not None and active_source is None:
try:
self._sender.send_zero_burst(self._zero_burst_packets)
except Exception as error:
with self._lock:
self._last_error = str(error)
elif active_source is not None:
try:
self._sender.send_payload(payload)
with self._lock:
self._last_sent_at = time.monotonic()
self._last_error = ""
except Exception as error:
with self._lock:
self._last_error = str(error)
previous_active = active_source
self._closing.wait(interval)
try:
self._sender.send_zero_burst(self._zero_burst_packets)
except Exception:
pass
def get_status(self) -> dict[str, Any]:
self.ensure_started()
now = time.monotonic()
with self._lock:
active_source, _payload, lease_ms = self._resolve_active_locked(now)
return {
"active_source": active_source,
"control_lease_remaining_ms": lease_ms,
"packet_counts": dict(self._packet_counts),
"send_rate_hz": self._send_rate_hz,
"source_lease_ms": self._source_lease_ms,
"zero_burst_packets": self._zero_burst_packets,
"last_error": self._last_error,
"last_sent_at_monotonic": self._last_sent_at,
}
def close(self) -> None:
self._closing.set()
thread = self._thread
if thread is not None and thread.is_alive():
thread.join(timeout=0.5)
class NativeUdpControlIngress:
def __init__(self, arbiter: ControlArbiter) -> None:
self._arbiter = arbiter
self._lock = threading.Lock()
self._thread: threading.Thread | None = None
self._closing = threading.Event()
self._started = False
self._bind_addr = "127.0.0.1:10921"
self._packets_received = 0
self._invalid_packets = 0
self._last_sender = ""
self._last_error = ""
def ensure_started(self) -> None:
bind_addr = str(load_omnisocket_config().get("control_ingress", {}).get("native_udp_bind", "127.0.0.1:10921"))
with self._lock:
self._bind_addr = bind_addr
if self._closing.is_set():
return
if self._thread is not None and self._thread.is_alive():
return
self._started = True
self._thread = threading.Thread(
target=self._run,
name="native-udp-control-ingress",
daemon=True,
)
self._thread.start()
def _run(self) -> None:
try:
try:
host, port = parse_host_port(self._bind_addr)
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((host, port))
sock.settimeout(0.1)
except Exception as error:
with self._lock:
self._last_error = str(error)
return
with sock:
while not self._closing.is_set():
try:
payload, sender_addr = sock.recvfrom(CONTROL_PACKET_SIZE + 64)
except socket.timeout:
continue
except OSError as error:
with self._lock:
if not self._closing.is_set():
self._last_error = str(error)
return
with self._lock:
self._last_sender = f"{sender_addr[0]}:{sender_addr[1]}"
if len(payload) != CONTROL_PACKET_SIZE:
with self._lock:
self._invalid_packets += 1
continue
try:
self._arbiter.ingest_command(CONTROL_SOURCE_NATIVE_UDP, payload)
except Exception as error:
with self._lock:
self._last_error = str(error)
continue
with self._lock:
self._packets_received += 1
finally:
with self._lock:
self._started = False
self._thread = None
def get_status(self) -> dict[str, Any]:
self.ensure_started()
with self._lock:
return {
"started": self._started,
"bind_addr": self._bind_addr,
"packets_received": self._packets_received,
"invalid_packets": self._invalid_packets,
"last_sender": self._last_sender,
"last_error": self._last_error,
}
def close(self) -> None:
self._closing.set()
thread = self._thread
if thread is not None and thread.is_alive():
thread.join(timeout=0.5)

View File

@@ -0,0 +1,9 @@
from django.urls import re_path
from .consumers import ControlConsumer
websocket_urlpatterns = [
re_path(r"^ws/control/$", ControlConsumer.as_asgi()),
]

View File

@@ -1,523 +1,22 @@
from __future__ import annotations
from collections import deque
import json
import math
import os
import sys
import threading
import time
from datetime import UTC, datetime
from pathlib import Path
from typing import Any, Iterator
PROJECT_ROOT = Path(__file__).resolve().parents[2]
WORKSPACE_ROOT = PROJECT_ROOT.parent
JPEG_FRAME_DIR = WORKSPACE_ROOT / "RobotDataShow" / "jpeg-frames"
# GPS 数据 JSON 文件路径
GEOSTREAM_JSON_PATH = WORKSPACE_ROOT / "GeoStream" / "gps_latest.json"
GEOSTREAM_STALE_SECONDS = 15
OMNISOCKET_CONFIG_PATH = PROJECT_ROOT / "config" / "omnisocket_demo.yaml"
VIDEO_SOURCE_MODE = os.getenv("VIDEO_SOURCE_MODE", "auto").strip().lower()
OMNISOCKET_FRAME_FRESH_SECONDS = 2.0
VIDEO_TIMESTAMP_SAMPLE_SIZE = 10
VIDEO_TIMESTAMP_TRAILER_BYTES = 8
VIDEO_TIMESTAMP_ENDIANNESS = "little"
VIDEO_TIMESTAMP_UNIT = "ms"
VIDEO_TIMESTAMP_MULTIPLIER_NS = 1_000_000
VIDEO_TIMESTAMP_MAX_SKEW_NS = 7 * 24 * 60 * 60 * 1_000_000_000
def utc_iso_now() -> str:
return datetime.now(UTC).isoformat(timespec="seconds").replace("+00:00", "Z")
class OmniSocketVideoReceiver:
def __init__(self) -> None:
self._lock = threading.Lock()
self._thread: threading.Thread | None = None
self._started = False
self._session = None
self._session_cls = None
self._binary_msg_type = None
self._video_defaults: dict[str, Any] = {}
self._latest_frame: bytes | None = None
self._latest_received_at = 0.0
self._latest_sequence: int | None = None
self._latest_latency_ms: float | None = None
self._latest_timestamp_unit: str | None = None
self._latest_timestamp_endianness: str | None = None
self._latency_samples_ms: deque[float] = deque(maxlen=VIDEO_TIMESTAMP_SAMPLE_SIZE)
self._frames_received = 0
self._last_error = ""
self._load_backend()
def _load_backend(self) -> None:
# 服务启动时先尝试导入一次 Python/C 扩展。
try:
self._import_backend()
except Exception as error: # pragma: no cover - 可选的运行时依赖
self._last_error = f"omnisocket import failed: {error}"
def _import_backend(self) -> None:
# 优先使用已经安装到当前 Python 环境里的 omnisocket。
# 如果导入失败,再尝试样例目录下的本地 python 路径。
try:
from omnisocket import MSG_TYPE_BINARY, Session, VIDEO_DEFAULTS # type: ignore
except ImportError:
python_dir = WORKSPACE_ROOT / "OmniSocketGo" / "python"
if python_dir.exists():
sys.path.insert(0, str(python_dir))
from omnisocket import MSG_TYPE_BINARY, Session, VIDEO_DEFAULTS # type: ignore
self._binary_msg_type = MSG_TYPE_BINARY
self._session_cls = Session
self._video_defaults = dict(VIDEO_DEFAULTS)
def ensure_started(self) -> None:
# 当第一次请求帧或状态时,再懒启动后台接收线程。
if self._session_cls is None or self._binary_msg_type is None:
return
with self._lock:
if self._started:
return
self._started = True
self._thread = threading.Thread(
target=self._run,
name="omnisocket-video-receiver",
daemon=True,
)
self._thread.start()
def _load_config(self) -> dict[str, Any]:
# 这里保持和 SampleCData/omnisocket_video_receiver.py 一样的配置结构:
# transport + video_receiver。
# 即使配置文件不存在,也允许回退到默认值继续运行。
config: dict[str, Any] = {}
if OMNISOCKET_CONFIG_PATH.exists():
try:
try:
import yaml # type: ignore
with OMNISOCKET_CONFIG_PATH.open("r", encoding="utf-8") as file:
config = yaml.safe_load(file) or {}
except ImportError:
# 当前配置文件结构非常简单,缺少 PyYAML 时用简化解析器兜底。
config = self._load_simple_yaml_config(OMNISOCKET_CONFIG_PATH)
except Exception as error: # pragma: no cover - 可选依赖
self._last_error = f"config load failed: {error}"
transport_cfg = dict(config.get("transport", {}))
video_cfg = dict(config.get("video_receiver", {}))
transport_cfg["server_addr"] = os.getenv(
"OMNISOCKET_SERVER_ADDR",
str(transport_cfg.get("server_addr", "127.0.0.1:10909")),
)
transport_cfg["relay_via"] = os.getenv(
"OMNISOCKET_RELAY_VIA",
str(transport_cfg.get("relay_via", "")),
)
transport_cfg["bind_ip"] = os.getenv(
"OMNISOCKET_BIND_IP",
str(transport_cfg.get("bind_ip", "")),
)
transport_cfg["bind_device"] = os.getenv(
"OMNISOCKET_BIND_DEVICE",
str(transport_cfg.get("bind_device", "")),
)
video_cfg["peer_id"] = os.getenv(
"OMNISOCKET_VIDEO_PEER_ID",
str(video_cfg.get("peer_id", "peer-a-video")),
)
video_cfg["buffer_bytes"] = int(
os.getenv(
"OMNISOCKET_BUFFER_BYTES",
str(video_cfg.get("buffer_bytes", 1024 * 1024)),
)
)
return {
"transport": transport_cfg,
"video_receiver": video_cfg,
}
def _load_simple_yaml_config(self, path: Path) -> dict[str, Any]:
parsed: dict[str, Any] = {}
current_section: str | None = None
with path.open("r", encoding="utf-8") as file:
for raw_line in file:
line = raw_line.split("#", 1)[0].rstrip()
if not line.strip():
continue
if not line.startswith(" "):
if not line.endswith(":"):
raise ValueError(f"invalid top-level yaml line: {raw_line.strip()}")
current_section = line[:-1].strip()
parsed[current_section] = {}
continue
if current_section is None:
raise ValueError(f"yaml key outside section: {raw_line.strip()}")
stripped = line.strip()
if ":" not in stripped:
raise ValueError(f"invalid yaml key line: {raw_line.strip()}")
key, value = stripped.split(":", 1)
parsed[current_section][key.strip()] = self._parse_simple_yaml_scalar(value.strip())
return parsed
def _parse_simple_yaml_scalar(self, value: str) -> Any:
if value in {'""', "''"}:
return ""
if len(value) >= 2 and value[0] == value[-1] and value[0] in {'"', "'"}:
return value[1:-1]
if value.lower() == "true":
return True
if value.lower() == "false":
return False
if value and value.lstrip("-").isdigit():
return int(value)
return value
def _connect_session(self):
# 这里和样例接收器一致:创建 Session(),然后使用 transport/video 配置建立连接。
assert self._session_cls is not None
config = self._load_config()
transport_cfg = config.get("transport", {})
video_cfg = config.get("video_receiver", {})
session = self._session_cls()
session.connect(
server_addr=str(transport_cfg.get("server_addr", "127.0.0.1:10909")),
peer_id=str(video_cfg.get("peer_id", "peer-a-video")),
relay_via=str(transport_cfg.get("relay_via", "")),
bind_ip=str(transport_cfg.get("bind_ip", "")),
bind_device=str(transport_cfg.get("bind_device", "")),
**self._video_defaults,
)
return session, int(video_cfg.get("buffer_bytes", 1024 * 1024))
def _extract_jpeg_payload(self, frame: bytes) -> bytes | None:
# 同时兼容两种帧格式:
# 1. 纯 JPEG 二进制
# 2. 前 8 字节是序号,后面才是真正的 JPEG 数据
if frame.startswith(b"\xff\xd8"):
return frame
if len(frame) > 8 and frame[8:10] == b"\xff\xd8":
return frame[8:]
return None
from __future__ import annotations
def _split_jpeg_frame_and_trailer(self, frame: bytes) -> tuple[bytes, bytes] | None:
jpeg_payload = self._extract_jpeg_payload(frame)
if jpeg_payload is None:
return None
from .control import ControlArbiter, NativeUdpControlIngress, OmniSocketControlSender
from .telemetry import GpsDataService, NetworkTelemetryService
from .video import OmniSocketVideoReceiver, VideoFrameService
# 当前发送端协议是 JPEG 末尾固定追加 8 字节 little-endian 毫秒时间戳。
if jpeg_payload.endswith(b"\xff\xd9"):
return jpeg_payload, b""
if (
len(jpeg_payload) >= VIDEO_TIMESTAMP_TRAILER_BYTES + 2
and jpeg_payload[-(VIDEO_TIMESTAMP_TRAILER_BYTES + 2) : -VIDEO_TIMESTAMP_TRAILER_BYTES] == b"\xff\xd9"
):
return jpeg_payload[:-VIDEO_TIMESTAMP_TRAILER_BYTES], jpeg_payload[-VIDEO_TIMESTAMP_TRAILER_BYTES:]
_video_receiver = OmniSocketVideoReceiver()
_control_sender = OmniSocketControlSender()
# 兜底兼容旧格式,避免直接丢帧。
eoi_index = jpeg_payload.rfind(b"\xff\xd9")
if eoi_index < 0:
return jpeg_payload, b""
control_arbiter = ControlArbiter(_control_sender)
native_control_ingress = NativeUdpControlIngress(control_arbiter)
trailer_start = eoi_index + 2
return jpeg_payload[:trailer_start], jpeg_payload[trailer_start:]
video_service = VideoFrameService(_video_receiver)
gps_service = GpsDataService()
network_service = NetworkTelemetryService(
_video_receiver,
_control_sender,
control_arbiter,
native_control_ingress,
)
def _extract_jpeg_frame(self, frame: bytes) -> bytes | None:
split_payload = self._split_jpeg_frame_and_trailer(frame)
if split_payload is None:
return None
jpeg_frame, _ = split_payload
return jpeg_frame
def _extract_sequence(self, frame: bytes) -> int | None:
if len(frame) >= 8 and not frame.startswith(b"\xff\xd8"):
return int.from_bytes(frame[:8], "big")
return None
def _extract_frame_tail(self, frame: bytes) -> bytes:
split_payload = self._split_jpeg_frame_and_trailer(frame)
if split_payload is None:
return b""
_, trailer = split_payload
return trailer
def _extract_frame_timestamp(self, frame: bytes) -> tuple[int, str, str] | None:
trailer = self._extract_frame_tail(frame)
if len(trailer) != VIDEO_TIMESTAMP_TRAILER_BYTES:
return None
value = int.from_bytes(trailer, VIDEO_TIMESTAMP_ENDIANNESS, signed=False)
if value <= 0:
return None
timestamp_ns = value * VIDEO_TIMESTAMP_MULTIPLIER_NS
if abs(time.time_ns() - timestamp_ns) > VIDEO_TIMESTAMP_MAX_SKEW_NS:
return None
return timestamp_ns, VIDEO_TIMESTAMP_UNIT, VIDEO_TIMESTAMP_ENDIANNESS
def _run(self) -> None:
# 后台持续接收循环:
# connect -> recv_into(buffer) -> 按 body_len 截出有效内容 -> 把最新 JPEG 帧缓存在内存里
while True:
try:
session, buffer_bytes = self._connect_session()
self._session = session
self._last_error = ""
buffer = bytearray(buffer_bytes)
while True:
# 从 OmniSocket / C 侧收一帧原始二进制数据到缓冲区
meta = session.recv_into(buffer, timeout_ms=1000)
if meta is None:
continue
if meta.get("msg_type") != self._binary_msg_type:
continue
# 真正收到的有效部分切出来,形成当前这一帧
frame = bytes(buffer[: meta["body_len"]])
jpeg_frame = self._extract_jpeg_frame(frame)
if jpeg_frame is None:
self._last_error = "received non-JPEG binary frame"
continue
timestamp_meta = self._extract_frame_timestamp(frame)
latency_ms = None
if timestamp_meta is not None:
timestamp_ns, unit, endianness = timestamp_meta
latency_ms = round((time.time_ns() - timestamp_ns) / 1_000_000, 3)
else:
unit = None
endianness = None
with self._lock:
# 缓存:这里只保留最新的一张 JPEG 帧,供 Web 接口直接返回给前端。
self._latest_frame = jpeg_frame
self._latest_received_at = time.time()
self._latest_sequence = self._extract_sequence(frame)
self._latest_latency_ms = latency_ms
self._latest_timestamp_unit = unit
self._latest_timestamp_endianness = endianness
if latency_ms is not None:
self._latency_samples_ms.append(latency_ms)
self._frames_received += 1
except Exception as error: # pragma: no cover - 运行时集成路径
self._last_error = str(error)
time.sleep(2)
finally:
if self._session is not None:
try:
self._session.close()
except Exception:
pass
self._session = None
def get_latest_frame(self) -> bytes | None:
# 把内存里最新的一张真实 JPEG 帧暴露给 Django 视图层。
# 如果这张帧已经过旧,就返回 None让上层回退到本地模拟帧。
self.ensure_started()
with self._lock:
if self._latest_frame is None:
return None
if time.time() - self._latest_received_at > OMNISOCKET_FRAME_FRESH_SECONDS:
return None
return self._latest_frame
def get_status(self) -> dict[str, Any]:
self.ensure_started()
config = self._load_config()
transport_cfg = config.get("transport", {})
video_cfg = config.get("video_receiver", {})
with self._lock:
has_recent_frame = self._latest_frame is not None and (
time.time() - self._latest_received_at <= OMNISOCKET_FRAME_FRESH_SECONDS
)
if has_recent_frame and self._latest_latency_ms is not None:
timing_status = {
"available": True,
"latest_delta_ms": self._latest_latency_ms,
"delta_samples_ms": list(reversed(self._latency_samples_ms)),
"sample_count": len(self._latency_samples_ms),
"sample_window_size": VIDEO_TIMESTAMP_SAMPLE_SIZE,
"timestamp_unit": self._latest_timestamp_unit,
"timestamp_endianness": self._latest_timestamp_endianness,
}
else:
timing_status = {
"available": False,
"latest_delta_ms": None,
"delta_samples_ms": [],
"sample_count": 0,
"sample_window_size": VIDEO_TIMESTAMP_SAMPLE_SIZE,
"timestamp_unit": None,
"timestamp_endianness": None,
}
return {
"backend_ready": self._session_cls is not None,
"mode": VIDEO_SOURCE_MODE,
"connected": self._session is not None,
"has_recent_frame": has_recent_frame,
"frames_received": self._frames_received,
"latest_sequence": self._latest_sequence,
"last_error": self._last_error,
"config_path": str(OMNISOCKET_CONFIG_PATH),
"server_addr": str(transport_cfg.get("server_addr", "")),
"relay_via": str(transport_cfg.get("relay_via", "")),
"peer_id": str(video_cfg.get("peer_id", "")),
"buffer_bytes": int(video_cfg.get("buffer_bytes", 0)),
"timing": timing_status,
}
class VideoFrameService:
def __init__(self) -> None:
self._receiver = OmniSocketVideoReceiver()
def get_status(self) -> dict[str, Any]:
receiver_status = self._receiver.get_status()
receiver_frame = self._receiver.get_latest_frame()
# 如果已经收到了真实视频帧,就把当前状态标记为实时模式,给前端显示。
if receiver_frame is not None:
return {
"available": True,
"source_mode": "omnisocket-jpeg-live",
"frame_count": receiver_status["frames_received"],
"fps": 30,
"frame_dir": str(JPEG_FRAME_DIR),
"source_detail": f"peer stream active, frames={receiver_status['frames_received']}",
"receiver": receiver_status,
"timing": receiver_status["timing"],
}
wait_detail = receiver_status["last_error"] or (
"未实时获取真实值,请检查 OmniSocket 服务、视频发送端和接收配置。"
)
return {
"available": False,
"source_mode": "omnisocket-waiting",
"frame_count": receiver_status["frames_received"],
"fps": 30,
"frame_dir": str(JPEG_FRAME_DIR),
"source_detail": wait_detail,
"receiver": receiver_status,
"timing": receiver_status["timing"],
}
def get_next_frame(self) -> bytes:
# 优先返回从 Python/C 视频接收器拿到的最新真实 JPEG 帧。
receiver_frame = self._receiver.get_latest_frame()
if receiver_frame is not None:
return receiver_frame
raise RuntimeError("未实时获取真实值,当前没有可用的真实 JPEG 帧。")
def iter_mjpeg(self, fps: float = 6.0) -> Iterator[bytes]:
frame_interval = 1.0 / max(1.0, min(fps, 30.0))
while True:
frame = self.get_next_frame()
header = (
b"--frame\r\n"
b"Content-Type: image/jpeg\r\n"
+ f"Content-Length: {len(frame)}\r\n\r\n".encode("ascii")
)
yield header + frame + b"\r\n"
time.sleep(frame_interval)
class GpsDataService:
def get_latest(self) -> dict[str, Any]:
# 优先使用由 GeoStream C 解析器生成的最新数据包。
payload = self._read_geostream_payload()
if payload is not None:
payload["source_mode"] = "geostream-json"
payload["updated_at"] = utc_iso_now()
return payload
# 当没有最新的 GPS 文件可用时,退回到使用一个移动的演示点位。
return self._build_simulated_payload()
def _read_geostream_payload(self) -> dict[str, Any] | None:
# Django 后端目前还不直接解析串口数据流。
# 它只读取由 GeoStream/parse_gps.c 生成的 JSON 文件。
if not GEOSTREAM_JSON_PATH.exists():
return None
# 忽略过期的文件,以便 UI 可以退回到使用模拟数据,
# 而不是把旧位置当作实时位置来显示。
age_seconds = time.time() - GEOSTREAM_JSON_PATH.stat().st_mtime
if age_seconds > GEOSTREAM_STALE_SECONDS:
return None
try:
with GEOSTREAM_JSON_PATH.open("r", encoding="utf-8") as file:
return json.load(file)
except (OSError, json.JSONDecodeError):
return None
def _build_simulated_payload(self) -> dict[str, Any]:
# 为本地 UI 开发构建一个平滑的伪造轨迹。
tick = time.time() / 12.0
latitude = 31.2304 + math.sin(tick) * 0.0014
longitude = 121.4737 + math.cos(tick) * 0.0018
return {
"has_fix": True,
"utc_time": datetime.now(UTC).strftime("%H:%M:%S"),
"latitude": round(latitude, 6),
"longitude": round(longitude, 6),
"satellites": 14 + int((math.sin(tick * 0.7) + 1.0) * 2),
"altitude_m": round(6.5 + math.cos(tick * 0.5) * 1.2, 2),
"coordinate_system": "WGS84",
"source_sentence": "SIMULATED",
"raw_coordinate_format": "decimal degrees",
"source_mode": "simulated",
"updated_at": utc_iso_now(),
}
class NetworkTelemetryService:
def get_latest(self) -> dict[str, Any]:
tick = time.time()
latency_ms = 28 + math.sin(tick / 4.0) * 6
jitter_ms = 3 + math.cos(tick / 5.0) * 1.4
tx_kbps = 780 + math.sin(tick / 6.0) * 160
rx_kbps = 720 + math.cos(tick / 7.0) * 140
packet_loss_pct = max(0.0, 0.35 + math.sin(tick / 9.0) * 0.25)
signal_dbm = -53 - abs(math.sin(tick / 8.0) * 7)
return {
"peer_status": "online",
"latency_ms": round(latency_ms, 1),
"jitter_ms": round(jitter_ms, 1),
"packet_loss_pct": round(packet_loss_pct, 2),
"tx_kbps": int(tx_kbps),
"rx_kbps": int(rx_kbps),
"signal_dbm": round(signal_dbm, 1),
"transport": "OmniSocket / simulated",
"source_mode": "simulated",
"updated_at": utc_iso_now(),
}
video_service = VideoFrameService()
gps_service = GpsDataService()
network_service = NetworkTelemetryService()

View File

@@ -0,0 +1,161 @@
from __future__ import annotations
import json
import math
import threading
import time
from datetime import UTC, datetime
from typing import Any
from .common import GEOSTREAM_JSON_PATH, GEOSTREAM_STALE_SECONDS, utc_iso_now
from .control import ControlArbiter, NativeUdpControlIngress, OmniSocketControlSender
from .video import OmniSocketVideoReceiver
class GpsDataService:
def get_latest(self) -> dict[str, Any]:
payload = self._read_geostream_payload()
if payload is not None:
payload["source_mode"] = "geostream-json"
payload["updated_at"] = utc_iso_now()
return payload
return self._build_simulated_payload()
def _read_geostream_payload(self) -> dict[str, Any] | None:
if not GEOSTREAM_JSON_PATH.exists():
return None
age_seconds = time.time() - GEOSTREAM_JSON_PATH.stat().st_mtime
if age_seconds > GEOSTREAM_STALE_SECONDS:
return None
try:
with GEOSTREAM_JSON_PATH.open("r", encoding="utf-8") as file:
return json.load(file)
except (OSError, json.JSONDecodeError):
return None
def _build_simulated_payload(self) -> dict[str, Any]:
tick = time.time() / 12.0
latitude = 31.2304 + math.sin(tick) * 0.0014
longitude = 121.4737 + math.cos(tick) * 0.0018
return {
"has_fix": True,
"utc_time": datetime.now(UTC).strftime("%H:%M:%S"),
"latitude": round(latitude, 6),
"longitude": round(longitude, 6),
"satellites": 14 + int((math.sin(tick * 0.7) + 1.0) * 2),
"altitude_m": round(6.5 + math.cos(tick * 0.5) * 1.2, 2),
"coordinate_system": "WGS84",
"source_sentence": "SIMULATED",
"raw_coordinate_format": "decimal degrees",
"source_mode": "simulated",
"updated_at": utc_iso_now(),
}
class NetworkTelemetryService:
def __init__(
self,
video_receiver: OmniSocketVideoReceiver,
control_sender: OmniSocketControlSender,
control_arbiter: ControlArbiter,
native_ingress: NativeUdpControlIngress,
) -> None:
self._video_receiver = video_receiver
self._control_sender = control_sender
self._control_arbiter = control_arbiter
self._native_ingress = native_ingress
self._rate_lock = threading.Lock()
self._last_rate_sample: tuple[float, int, int] | None = None
def _compute_rates(self, send_bytes: int, recv_bytes: int) -> tuple[float, float]:
now = time.monotonic()
with self._rate_lock:
previous = self._last_rate_sample
self._last_rate_sample = (now, send_bytes, recv_bytes)
if previous is None:
return 0.0, 0.0
prev_time, prev_send, prev_recv = previous
elapsed = now - prev_time
if elapsed <= 0.0:
return 0.0, 0.0
tx_kbps = max(0.0, ((send_bytes - prev_send) * 8.0) / elapsed / 1000.0)
rx_kbps = max(0.0, ((recv_bytes - prev_recv) * 8.0) / elapsed / 1000.0)
return tx_kbps, rx_kbps
def get_latest(self) -> dict[str, Any]:
self._video_receiver.ensure_started()
self._control_arbiter.ensure_started()
self._native_ingress.ensure_started()
video_app = self._video_receiver.session_stats()
control_app = self._control_sender.session_stats()
video_kcp = self._video_receiver.session_kcp_stats()
control_kcp = self._control_sender.session_kcp_stats()
arbiter_status = self._control_arbiter.get_status()
ingress_status = self._native_ingress.get_status()
sender_status = self._control_sender.get_status()
total_send_bytes = int(video_app.get("send_bytes", 0)) + int(control_app.get("send_bytes", 0))
total_recv_bytes = int(video_app.get("recv_bytes", 0)) + int(control_app.get("recv_bytes", 0))
tx_kbps, rx_kbps = self._compute_rates(total_send_bytes, total_recv_bytes)
video_connected = int(video_app.get("connected", 0))
control_connected = int(control_app.get("connected", 0))
connected_sessions = video_connected + control_connected
primary_kcp = control_kcp if control_connected else video_kcp
latency_ms = primary_kcp.get("srtt_ms")
jitter_ms = primary_kcp.get("srttvar_ms")
if connected_sessions > 0:
peer_status = "online"
elif sender_status.get("backend_ready"):
peer_status = "idle"
else:
peer_status = "backend-unavailable"
return {
"peer_status": peer_status,
"latency_ms": latency_ms,
"jitter_ms": jitter_ms,
"packet_loss_pct": None,
"tx_kbps": round(tx_kbps, 3),
"rx_kbps": round(rx_kbps, 3),
"transport": "OmniSocket / kcp",
"source_mode": "omnisocket-live" if connected_sessions > 0 else "omnisocket-idle",
"updated_at": utc_iso_now(),
"active_control_source": arbiter_status["active_source"],
"control_lease_remaining_ms": arbiter_status["control_lease_remaining_ms"],
"combined": {
"connected_sessions": connected_sessions,
"send_bytes": total_send_bytes,
"recv_bytes": total_recv_bytes,
"tx_kbps": round(tx_kbps, 3),
"rx_kbps": round(rx_kbps, 3),
},
"sessions": {
"video": {
"app": video_app,
"kcp": video_kcp,
},
"control": {
"app": control_app,
"kcp": control_kcp,
},
},
"ingress": {
"native_udp": ingress_status,
},
"control": {
"arbiter": arbiter_status,
"sender": sender_status,
},
}

343
backend/monitoring/video.py Normal file
View File

@@ -0,0 +1,343 @@
from __future__ import annotations
from collections import deque
import sys
import threading
import time
from typing import Any, Iterator
from .common import (
JPEG_FRAME_DIR,
OMNISOCKET_CONFIG_PATH,
OMNISOCKET_FRAME_FRESH_SECONDS,
PROJECT_ROOT,
VIDEO_SOURCE_MODE,
VIDEO_TIMESTAMP_ENDIANNESS,
VIDEO_TIMESTAMP_MAX_SKEW_NS,
VIDEO_TIMESTAMP_MULTIPLIER_NS,
VIDEO_TIMESTAMP_SAMPLE_SIZE,
VIDEO_TIMESTAMP_TRAILER_BYTES,
VIDEO_TIMESTAMP_UNIT,
WORKSPACE_ROOT,
load_omnisocket_config,
)
def safe_kcp_stats(session: Any) -> dict[str, Any]:
if session is None or not hasattr(session, "kcp_stats"):
return {}
try:
return dict(session.kcp_stats())
except Exception:
return {}
class OmniSocketVideoReceiver:
def __init__(self) -> None:
self._lock = threading.Lock()
self._thread: threading.Thread | None = None
self._started = False
self._session = None
self._session_cls = None
self._binary_msg_type = None
self._video_defaults: dict[str, Any] = {}
self._latest_frame: bytes | None = None
self._latest_received_at = 0.0
self._latest_sequence: int | None = None
self._latest_latency_ms: float | None = None
self._latest_timestamp_unit: str | None = None
self._latest_timestamp_endianness: str | None = None
self._latency_samples_ms: deque[float] = deque(maxlen=VIDEO_TIMESTAMP_SAMPLE_SIZE)
self._frames_received = 0
self._last_error = ""
self._load_backend()
def _load_backend(self) -> None:
try:
self._import_backend()
except Exception as error: # pragma: no cover - optional runtime dependency
self._last_error = f"omnisocket import failed: {error}"
def _import_backend(self) -> None:
try:
from omnisocket import MSG_TYPE_BINARY, Session, VIDEO_DEFAULTS # type: ignore
except ImportError:
python_dir = WORKSPACE_ROOT / "OmniSocketGo" / "python"
if python_dir.exists():
sys.path.insert(0, str(python_dir))
from omnisocket import MSG_TYPE_BINARY, Session, VIDEO_DEFAULTS # type: ignore
self._binary_msg_type = MSG_TYPE_BINARY
self._session_cls = Session
self._video_defaults = dict(VIDEO_DEFAULTS)
def ensure_started(self) -> None:
if self._session_cls is None or self._binary_msg_type is None:
return
with self._lock:
if self._started:
return
self._started = True
self._thread = threading.Thread(
target=self._run,
name="omnisocket-video-receiver",
daemon=True,
)
self._thread.start()
def _connect_session(self):
assert self._session_cls is not None
config = load_omnisocket_config()
transport_cfg = config.get("transport", {})
video_cfg = config.get("video_receiver", {})
session = self._session_cls()
session.connect(
server_addr=str(transport_cfg.get("server_addr", "127.0.0.1:10909")),
peer_id=str(video_cfg.get("peer_id", "peer-a-video")),
relay_via=str(transport_cfg.get("relay_via", "")),
bind_ip=str(transport_cfg.get("bind_ip", "")),
bind_device=str(transport_cfg.get("bind_device", "")),
**self._video_defaults,
)
return session, int(video_cfg.get("buffer_bytes", 1024 * 1024))
def _extract_jpeg_payload(self, frame: bytes) -> bytes | None:
if frame.startswith(b"\xff\xd8"):
return frame
if len(frame) > 8 and frame[8:10] == b"\xff\xd8":
return frame[8:]
return None
def _split_jpeg_frame_and_trailer(self, frame: bytes) -> tuple[bytes, bytes] | None:
jpeg_payload = self._extract_jpeg_payload(frame)
if jpeg_payload is None:
return None
if jpeg_payload.endswith(b"\xff\xd9"):
return jpeg_payload, b""
if (
len(jpeg_payload) >= VIDEO_TIMESTAMP_TRAILER_BYTES + 2
and jpeg_payload[-(VIDEO_TIMESTAMP_TRAILER_BYTES + 2) : -VIDEO_TIMESTAMP_TRAILER_BYTES] == b"\xff\xd9"
):
return jpeg_payload[:-VIDEO_TIMESTAMP_TRAILER_BYTES], jpeg_payload[-VIDEO_TIMESTAMP_TRAILER_BYTES:]
eoi_index = jpeg_payload.rfind(b"\xff\xd9")
if eoi_index < 0:
return jpeg_payload, b""
trailer_start = eoi_index + 2
return jpeg_payload[:trailer_start], jpeg_payload[trailer_start:]
def _extract_jpeg_frame(self, frame: bytes) -> bytes | None:
split_payload = self._split_jpeg_frame_and_trailer(frame)
if split_payload is None:
return None
jpeg_frame, _ = split_payload
return jpeg_frame
def _extract_sequence(self, frame: bytes) -> int | None:
if len(frame) >= 8 and not frame.startswith(b"\xff\xd8"):
return int.from_bytes(frame[:8], "big")
return None
def _extract_frame_tail(self, frame: bytes) -> bytes:
split_payload = self._split_jpeg_frame_and_trailer(frame)
if split_payload is None:
return b""
_, trailer = split_payload
return trailer
def _extract_frame_timestamp(self, frame: bytes) -> tuple[int, str, str] | None:
trailer = self._extract_frame_tail(frame)
if len(trailer) != VIDEO_TIMESTAMP_TRAILER_BYTES:
return None
value = int.from_bytes(trailer, VIDEO_TIMESTAMP_ENDIANNESS, signed=False)
if value <= 0:
return None
timestamp_ns = value * VIDEO_TIMESTAMP_MULTIPLIER_NS
if abs(time.time_ns() - timestamp_ns) > VIDEO_TIMESTAMP_MAX_SKEW_NS:
return None
return timestamp_ns, VIDEO_TIMESTAMP_UNIT, VIDEO_TIMESTAMP_ENDIANNESS
def _run(self) -> None:
while True:
try:
session, buffer_bytes = self._connect_session()
self._session = session
self._last_error = ""
buffer = bytearray(buffer_bytes)
while True:
meta = session.recv_into(buffer, timeout_ms=1000)
if meta is None:
continue
if meta.get("msg_type") != self._binary_msg_type:
continue
frame = bytes(buffer[: meta["body_len"]])
jpeg_frame = self._extract_jpeg_frame(frame)
if jpeg_frame is None:
self._last_error = "received non-JPEG binary frame"
continue
timestamp_meta = self._extract_frame_timestamp(frame)
latency_ms = None
if timestamp_meta is not None:
timestamp_ns, unit, endianness = timestamp_meta
latency_ms = round((time.time_ns() - timestamp_ns) / 1_000_000, 3)
else:
unit = None
endianness = None
with self._lock:
self._latest_frame = jpeg_frame
self._latest_received_at = time.time()
self._latest_sequence = self._extract_sequence(frame)
self._latest_latency_ms = latency_ms
self._latest_timestamp_unit = unit
self._latest_timestamp_endianness = endianness
if latency_ms is not None:
self._latency_samples_ms.append(latency_ms)
self._frames_received += 1
except Exception as error: # pragma: no cover - runtime integration path
self._last_error = str(error)
time.sleep(2)
finally:
if self._session is not None:
try:
self._session.close()
except Exception:
pass
self._session = None
def get_latest_frame(self) -> bytes | None:
self.ensure_started()
with self._lock:
if self._latest_frame is None:
return None
if time.time() - self._latest_received_at > OMNISOCKET_FRAME_FRESH_SECONDS:
return None
return self._latest_frame
def session_stats(self) -> dict[str, Any]:
self.ensure_started()
with self._lock:
session = self._session
if session is None:
return {"connected": 0}
try:
return dict(session.stats())
except Exception:
return {"connected": 0}
def session_kcp_stats(self) -> dict[str, Any]:
self.ensure_started()
with self._lock:
session = self._session
return safe_kcp_stats(session)
def get_status(self) -> dict[str, Any]:
self.ensure_started()
config = load_omnisocket_config()
transport_cfg = config.get("transport", {})
video_cfg = config.get("video_receiver", {})
with self._lock:
has_recent_frame = self._latest_frame is not None and (
time.time() - self._latest_received_at <= OMNISOCKET_FRAME_FRESH_SECONDS
)
if has_recent_frame and self._latest_latency_ms is not None:
timing_status = {
"available": True,
"latest_delta_ms": self._latest_latency_ms,
"delta_samples_ms": list(reversed(self._latency_samples_ms)),
"sample_count": len(self._latency_samples_ms),
"sample_window_size": VIDEO_TIMESTAMP_SAMPLE_SIZE,
"timestamp_unit": self._latest_timestamp_unit,
"timestamp_endianness": self._latest_timestamp_endianness,
}
else:
timing_status = {
"available": False,
"latest_delta_ms": None,
"delta_samples_ms": [],
"sample_count": 0,
"sample_window_size": VIDEO_TIMESTAMP_SAMPLE_SIZE,
"timestamp_unit": None,
"timestamp_endianness": None,
}
return {
"backend_ready": self._session_cls is not None,
"mode": VIDEO_SOURCE_MODE,
"connected": self._session is not None,
"has_recent_frame": has_recent_frame,
"frames_received": self._frames_received,
"latest_sequence": self._latest_sequence,
"last_error": self._last_error,
"config_path": str(OMNISOCKET_CONFIG_PATH),
"server_addr": str(transport_cfg.get("server_addr", "")),
"relay_via": str(transport_cfg.get("relay_via", "")),
"peer_id": str(video_cfg.get("peer_id", "")),
"buffer_bytes": int(video_cfg.get("buffer_bytes", 0)),
"timing": timing_status,
}
class VideoFrameService:
def __init__(self, receiver: OmniSocketVideoReceiver) -> None:
self._receiver = receiver
def get_status(self) -> dict[str, Any]:
receiver_status = self._receiver.get_status()
receiver_frame = self._receiver.get_latest_frame()
if receiver_frame is not None:
return {
"available": True,
"source_mode": "omnisocket-jpeg-live",
"frame_count": receiver_status["frames_received"],
"fps": 30,
"frame_dir": str(JPEG_FRAME_DIR),
"source_detail": f"peer stream active, frames={receiver_status['frames_received']}",
"receiver": receiver_status,
"timing": receiver_status["timing"],
}
wait_detail = receiver_status["last_error"] or (
"waiting for live OmniSocket JPEG frames; check the hub, sender, and receiver configuration"
)
return {
"available": False,
"source_mode": "omnisocket-waiting",
"frame_count": receiver_status["frames_received"],
"fps": 30,
"frame_dir": str(JPEG_FRAME_DIR),
"source_detail": wait_detail,
"receiver": receiver_status,
"timing": receiver_status["timing"],
}
def get_next_frame(self) -> bytes:
receiver_frame = self._receiver.get_latest_frame()
if receiver_frame is not None:
return receiver_frame
raise RuntimeError("no live OmniSocket JPEG frame is currently available")
def iter_mjpeg(self, fps: float = 6.0) -> Iterator[bytes]:
frame_interval = 1.0 / max(1.0, min(fps, 30.0))
while True:
frame = self.get_next_frame()
header = (
b"--frame\r\n"
b"Content-Type: image/jpeg\r\n"
+ f"Content-Length: {len(frame)}\r\n\r\n".encode("ascii")
)
yield header + frame + b"\r\n"
time.sleep(frame_interval)