diff --git a/backend/config/asgi.py b/backend/config/asgi.py index ed7c431..6decc9d 100644 --- a/backend/config/asgi.py +++ b/backend/config/asgi.py @@ -9,8 +9,23 @@ https://docs.djangoproject.com/en/5.2/howto/deployment/asgi/ import os +from channels.auth import AuthMiddlewareStack +from channels.routing import ProtocolTypeRouter, URLRouter +from channels.security.websocket import OriginValidator +from django.conf import settings from django.core.asgi import get_asgi_application os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings') -application = get_asgi_application() +django_asgi_app = get_asgi_application() + +from monitoring.routing import websocket_urlpatterns + + +application = ProtocolTypeRouter({ + "http": django_asgi_app, + "websocket": OriginValidator( + AuthMiddlewareStack(URLRouter(websocket_urlpatterns)), + settings.CONTROL_WS_ALLOWED_ORIGINS, + ), +}) diff --git a/backend/config/settings.py b/backend/config/settings.py index 0d953b6..5133a7d 100644 --- a/backend/config/settings.py +++ b/backend/config/settings.py @@ -1,7 +1,13 @@ +import os from pathlib import Path BASE_DIR = Path(__file__).resolve().parent.parent + +def _split_csv_env(name: str) -> list[str]: + value = os.getenv(name, "") + return [item.strip().rstrip("/") for item in value.split(",") if item.strip()] + SECRET_KEY = 'django-insecure-pk4scm@ifo%mao6l=j0@-$_v+pg-43^hj4a!199^)zivz-_8xu' DEBUG = True ALLOWED_HOSTS = ["*"] @@ -87,3 +93,16 @@ STATIC_URL = 'static/' CORS_ALLOW_ALL_ORIGINS = True DEFAULT_AUTO_FIELD = 'django.db.models.BigAutoField' + +CONTROL_WS_ALLOWED_ORIGINS = _split_csv_env('CONTROL_WS_ALLOWED_ORIGINS') or [ + 'http://127.0.0.1', + 'http://127.0.0.1:5173', + 'http://127.0.0.1:4173', + 'http://127.0.0.1:8001', + 'https://127.0.0.1', + 'http://localhost:5173', + 'http://localhost:4173', + 'http://localhost', + 'http://localhost:8001', + 'https://localhost', +] diff --git a/backend/monitoring/common.py b/backend/monitoring/common.py new file mode 100644 index 0000000..06d09b0 --- /dev/null +++ b/backend/monitoring/common.py @@ -0,0 +1,176 @@ +from __future__ import annotations + +import os +import struct +from datetime import UTC, datetime +from pathlib import Path +from typing import Any + + +PROJECT_ROOT = Path(__file__).resolve().parents[2] +WORKSPACE_ROOT = PROJECT_ROOT.parent +JPEG_FRAME_DIR = WORKSPACE_ROOT / "RobotDataShow" / "jpeg-frames" +GEOSTREAM_JSON_PATH = WORKSPACE_ROOT / "GeoStream" / "gps_latest.json" +GEOSTREAM_STALE_SECONDS = 15 +OMNISOCKET_CONFIG_PATH = PROJECT_ROOT / "config" / "omnisocket_demo.yaml" + +VIDEO_SOURCE_MODE = os.getenv("VIDEO_SOURCE_MODE", "auto").strip().lower() +OMNISOCKET_FRAME_FRESH_SECONDS = 2.0 +VIDEO_TIMESTAMP_SAMPLE_SIZE = 10 +VIDEO_TIMESTAMP_TRAILER_BYTES = 8 +VIDEO_TIMESTAMP_ENDIANNESS = "little" +VIDEO_TIMESTAMP_UNIT = "ms" +VIDEO_TIMESTAMP_MULTIPLIER_NS = 1_000_000 +VIDEO_TIMESTAMP_MAX_SKEW_NS = 7 * 24 * 60 * 60 * 1_000_000_000 + +CONTROL_PACKET = struct.Struct("<6f") +CONTROL_PACKET_SIZE = CONTROL_PACKET.size +CONTROL_SOURCE_NATIVE_UDP = "native_udp" +CONTROL_SOURCE_WEB = "web" +CONTROL_SOURCE_PRIORITY = (CONTROL_SOURCE_NATIVE_UDP, CONTROL_SOURCE_WEB) +ZERO_CONTROL_PAYLOAD = CONTROL_PACKET.pack(0.0, 0.0, 0.0, 0.0, 0.0, 0.0) + + +def utc_iso_now() -> str: + return datetime.now(UTC).isoformat(timespec="seconds").replace("+00:00", "Z") + + +def parse_simple_yaml_scalar(value: str) -> Any: + if value in {'""', "''"}: + return "" + if len(value) >= 2 and value[0] == value[-1] and value[0] in {'"', "'"}: + return value[1:-1] + if value.lower() == "true": + return True + if value.lower() == "false": + return False + if value and value.lstrip("-").isdigit(): + return int(value) + return value + + +def load_simple_yaml_config(path: Path) -> dict[str, Any]: + parsed: dict[str, Any] = {} + current_section: str | None = None + + with path.open("r", encoding="utf-8") as file: + for raw_line in file: + line = raw_line.split("#", 1)[0].rstrip() + if not line.strip(): + continue + + if not line.startswith(" "): + if not line.endswith(":"): + raise ValueError(f"invalid top-level yaml line: {raw_line.strip()}") + current_section = line[:-1].strip() + parsed[current_section] = {} + continue + + if current_section is None: + raise ValueError(f"yaml key outside section: {raw_line.strip()}") + + stripped = line.strip() + if ":" not in stripped: + raise ValueError(f"invalid yaml key line: {raw_line.strip()}") + + key, value = stripped.split(":", 1) + parsed[current_section][key.strip()] = parse_simple_yaml_scalar(value.strip()) + + return parsed + + +def load_omnisocket_config() -> dict[str, Any]: + config: dict[str, Any] = {} + if OMNISOCKET_CONFIG_PATH.exists(): + try: + try: + import yaml # type: ignore + + with OMNISOCKET_CONFIG_PATH.open("r", encoding="utf-8") as file: + config = yaml.safe_load(file) or {} + except ImportError: + config = load_simple_yaml_config(OMNISOCKET_CONFIG_PATH) + except Exception: + config = {} + + transport_cfg = dict(config.get("transport", {})) + video_receiver_cfg = dict(config.get("video_receiver", {})) + control_sender_cfg = dict(config.get("control_sender", {})) + control_ingress_cfg = dict(config.get("control_ingress", {})) + + transport_cfg["server_addr"] = os.getenv( + "OMNISOCKET_SERVER_ADDR", + str(transport_cfg.get("server_addr", "127.0.0.1:10909")), + ) + transport_cfg["relay_via"] = os.getenv( + "OMNISOCKET_RELAY_VIA", + str(transport_cfg.get("relay_via", "")), + ) + transport_cfg["bind_ip"] = os.getenv( + "OMNISOCKET_BIND_IP", + str(transport_cfg.get("bind_ip", "")), + ) + transport_cfg["bind_device"] = os.getenv( + "OMNISOCKET_BIND_DEVICE", + str(transport_cfg.get("bind_device", "")), + ) + + video_receiver_cfg["peer_id"] = os.getenv( + "OMNISOCKET_VIDEO_PEER_ID", + str(video_receiver_cfg.get("peer_id", "peer-a-video")), + ) + video_receiver_cfg["buffer_bytes"] = int( + os.getenv( + "OMNISOCKET_BUFFER_BYTES", + str(video_receiver_cfg.get("buffer_bytes", 1024 * 1024)), + ) + ) + + control_sender_cfg["peer_id"] = os.getenv( + "OMNISOCKET_CONTROL_PEER_ID", + str(control_sender_cfg.get("peer_id", "peer-a-ctrl")), + ) + control_sender_cfg["target_peer"] = os.getenv( + "OMNISOCKET_CONTROL_TARGET_PEER", + str(control_sender_cfg.get("target_peer", "peer-b-ctrl")), + ) + + control_ingress_cfg["native_udp_bind"] = os.getenv( + "OMNISOCKET_CONTROL_NATIVE_UDP_BIND", + str(control_ingress_cfg.get("native_udp_bind", "127.0.0.1:10921")), + ) + control_ingress_cfg["source_lease_ms"] = int( + os.getenv( + "OMNISOCKET_CONTROL_SOURCE_LEASE_MS", + str(control_ingress_cfg.get("source_lease_ms", 300)), + ) + ) + control_ingress_cfg["send_rate_hz"] = float( + os.getenv( + "OMNISOCKET_CONTROL_SEND_RATE_HZ", + str(control_ingress_cfg.get("send_rate_hz", 20.0)), + ) + ) + control_ingress_cfg["zero_burst_packets"] = int( + os.getenv( + "OMNISOCKET_CONTROL_ZERO_BURST_PACKETS", + str(control_ingress_cfg.get("zero_burst_packets", 3)), + ) + ) + + return { + "transport": transport_cfg, + "video_receiver": video_receiver_cfg, + "control_sender": control_sender_cfg, + "control_ingress": control_ingress_cfg, + } + + +def parse_host_port(bind_addr: str) -> tuple[str, int]: + host, port_text = bind_addr.rsplit(":", 1) + host = host.strip() or "127.0.0.1" + port = int(port_text) + if port <= 0 or port > 65535: + raise ValueError(f"invalid port in bind address: {bind_addr}") + return host, port + diff --git a/backend/monitoring/consumers.py b/backend/monitoring/consumers.py new file mode 100644 index 0000000..1dc67f9 --- /dev/null +++ b/backend/monitoring/consumers.py @@ -0,0 +1,42 @@ +from __future__ import annotations + +import json + +from channels.generic.websocket import WebsocketConsumer + +from .common import CONTROL_PACKET_SIZE, CONTROL_SOURCE_WEB +from .services import control_arbiter, native_control_ingress + + +class ControlConsumer(WebsocketConsumer): + def connect(self) -> None: + control_arbiter.ensure_started() + native_control_ingress.ensure_started() + self.accept() + self.send( + text_data=json.dumps( + { + "type": "ready", + "packet_bytes": CONTROL_PACKET_SIZE, + } + ) + ) + + def receive(self, text_data: str | None = None, bytes_data: bytes | None = None) -> None: + if bytes_data is None: + self.send(text_data=json.dumps({"type": "error", "detail": "binary control payload required"})) + return + + if len(bytes_data) != CONTROL_PACKET_SIZE: + self.send( + text_data=json.dumps( + { + "type": "error", + "detail": f"expected {CONTROL_PACKET_SIZE} bytes, got {len(bytes_data)}", + } + ) + ) + return + + control_arbiter.ingest_command(CONTROL_SOURCE_WEB, bytes_data) + diff --git a/backend/monitoring/control.py b/backend/monitoring/control.py new file mode 100644 index 0000000..146a81d --- /dev/null +++ b/backend/monitoring/control.py @@ -0,0 +1,423 @@ +from __future__ import annotations + +import socket +import sys +import threading +import time +from typing import Any + +from .common import ( + CONTROL_PACKET_SIZE, + CONTROL_SOURCE_NATIVE_UDP, + CONTROL_SOURCE_PRIORITY, + ZERO_CONTROL_PAYLOAD, + WORKSPACE_ROOT, + load_omnisocket_config, + parse_host_port, +) +from .video import safe_kcp_stats + + +class OmniSocketControlSender: + def __init__(self) -> None: + self._lock = threading.Lock() + self._session = None + self._session_cls = None + self._msg_type_error = None + self._control_defaults: dict[str, Any] = {} + self._started = False + self._drain_thread: threading.Thread | None = None + self._closing = threading.Event() + self._target_peer = "" + self._send_count = 0 + self._send_errors = 0 + self._drain_errors = 0 + self._last_error = "" + self._load_backend() + + def _load_backend(self) -> None: + try: + self._import_backend() + except Exception as error: # pragma: no cover - optional runtime dependency + self._last_error = f"omnisocket import failed: {error}" + + def _import_backend(self) -> None: + try: + from omnisocket import CONTROL_DEFAULTS, MSG_TYPE_ERROR, Session # type: ignore + except ImportError: + python_dir = WORKSPACE_ROOT / "OmniSocketGo" / "python" + if python_dir.exists(): + sys.path.insert(0, str(python_dir)) + from omnisocket import CONTROL_DEFAULTS, MSG_TYPE_ERROR, Session # type: ignore + + self._session_cls = Session + self._msg_type_error = MSG_TYPE_ERROR + self._control_defaults = dict(CONTROL_DEFAULTS) + + def _connect_session(self): + assert self._session_cls is not None + + config = load_omnisocket_config() + transport_cfg = config.get("transport", {}) + control_cfg = config.get("control_sender", {}) + + session = self._session_cls() + session.connect( + server_addr=str(transport_cfg.get("server_addr", "127.0.0.1:10909")), + peer_id=str(control_cfg.get("peer_id", "peer-a-ctrl")), + relay_via=str(transport_cfg.get("relay_via", "")), + bind_ip=str(transport_cfg.get("bind_ip", "")), + bind_device=str(transport_cfg.get("bind_device", "")), + **self._control_defaults, + ) + target_peer = str(control_cfg.get("target_peer", "peer-b-ctrl")) + return session, target_peer + + def ensure_started(self) -> None: + if self._session_cls is None: + return + + with self._lock: + if self._started and self._session is not None: + return + session, target_peer = self._connect_session() + self._session = session + self._target_peer = target_peer + self._closing.clear() + self._started = True + self._last_error = "" + self._drain_thread = threading.Thread( + target=self._drain_loop, + name="omnisocket-control-drain", + daemon=True, + ) + self._drain_thread.start() + + def _reset_session(self, session: Any | None) -> None: + with self._lock: + if session is not None and session is not self._session: + return + current = self._session + self._session = None + self._started = False + if current is not None: + try: + current.close() + except Exception: + pass + + def send_payload(self, payload: bytes) -> None: + if len(payload) != CONTROL_PACKET_SIZE: + raise ValueError(f"expected {CONTROL_PACKET_SIZE} bytes, got {len(payload)}") + self.ensure_started() + with self._lock: + session = self._session + target_peer = self._target_peer + + if session is None: + raise RuntimeError("control session is not available") + + try: + session.send(to=target_peer, data=payload) + except Exception as error: + with self._lock: + self._send_errors += 1 + self._last_error = str(error) + self._reset_session(session) + raise + + with self._lock: + self._send_count += 1 + + def send_zero_burst(self, count: int) -> None: + for _ in range(max(0, count)): + try: + self.send_payload(ZERO_CONTROL_PAYLOAD) + except Exception: + return + + def _drain_loop(self) -> None: + while not self._closing.is_set(): + with self._lock: + session = self._session + if session is None: + return + + try: + result = session.recv(timeout_ms=100) + except Exception as error: + with self._lock: + self._drain_errors += 1 + self._last_error = str(error) + if not self._closing.is_set(): + self._reset_session(session) + return + + if result is None: + continue + + from_peer, msg_type, payload = result + if msg_type == self._msg_type_error: + text = payload.decode("utf-8", errors="replace") + with self._lock: + self._last_error = f"server error from {from_peer}: {text}" + + def session_stats(self) -> dict[str, Any]: + with self._lock: + session = self._session + if session is None: + return {"connected": 0} + try: + return dict(session.stats()) + except Exception: + return {"connected": 0} + + def session_kcp_stats(self) -> dict[str, Any]: + with self._lock: + session = self._session + return safe_kcp_stats(session) + + def get_status(self) -> dict[str, Any]: + config = load_omnisocket_config() + control_cfg = config.get("control_sender", {}) + with self._lock: + return { + "backend_ready": self._session_cls is not None, + "started": self._started, + "connected": self._session is not None, + "peer_id": str(control_cfg.get("peer_id", "")), + "target_peer": str(control_cfg.get("target_peer", "")), + "send_count": self._send_count, + "send_errors": self._send_errors, + "drain_errors": self._drain_errors, + "last_error": self._last_error, + } + + def close(self) -> None: + self._closing.set() + self.send_zero_burst(1) + self._reset_session(None) + drain_thread = self._drain_thread + if drain_thread is not None and drain_thread.is_alive(): + drain_thread.join(timeout=0.5) + + +class ControlArbiter: + def __init__(self, sender: OmniSocketControlSender) -> None: + self._sender = sender + self._lock = threading.Lock() + self._thread: threading.Thread | None = None + self._closing = threading.Event() + self._started = False + self._source_lease_ms = 300 + self._send_rate_hz = 20.0 + self._zero_burst_packets = 3 + self._latest_by_source: dict[str, tuple[bytes, float]] = {} + self._packet_counts = {source: 0 for source in CONTROL_SOURCE_PRIORITY} + self._last_payload = ZERO_CONTROL_PAYLOAD + self._last_sent_at = 0.0 + self._active_source: str | None = None + self._last_error = "" + + def _load_config(self) -> None: + cfg = load_omnisocket_config().get("control_ingress", {}) + self._source_lease_ms = max(50, int(cfg.get("source_lease_ms", 300))) + self._send_rate_hz = max(1.0, float(cfg.get("send_rate_hz", 20.0))) + self._zero_burst_packets = max(1, int(cfg.get("zero_burst_packets", 3))) + + def ensure_started(self) -> None: + self._load_config() + with self._lock: + if self._started: + return + self._started = True + self._thread = threading.Thread( + target=self._send_loop, + name="control-arbiter", + daemon=True, + ) + self._thread.start() + + def ingest_command(self, source: str, payload: bytes) -> None: + if source not in CONTROL_SOURCE_PRIORITY: + raise ValueError(f"unsupported control source: {source}") + if len(payload) != CONTROL_PACKET_SIZE: + raise ValueError(f"expected {CONTROL_PACKET_SIZE} bytes, got {len(payload)}") + + self.ensure_started() + now = time.monotonic() + with self._lock: + self._latest_by_source[source] = (payload, now) + self._packet_counts[source] += 1 + + def _resolve_active_locked(self, now: float) -> tuple[str | None, bytes, int]: + lease_seconds = self._source_lease_ms / 1000.0 + expired_sources = [ + source + for source, (_, updated_at) in self._latest_by_source.items() + if (now - updated_at) > lease_seconds + ] + for source in expired_sources: + self._latest_by_source.pop(source, None) + + for source in CONTROL_SOURCE_PRIORITY: + entry = self._latest_by_source.get(source) + if entry is None: + continue + payload, updated_at = entry + remaining_ms = max(0, int((lease_seconds - (now - updated_at)) * 1000)) + return source, payload, remaining_ms + + return None, ZERO_CONTROL_PAYLOAD, 0 + + def _send_loop(self) -> None: + interval = 1.0 / max(self._send_rate_hz, 1.0) + previous_active: str | None = None + + while not self._closing.is_set(): + now = time.monotonic() + with self._lock: + active_source, payload, _lease_ms = self._resolve_active_locked(now) + self._active_source = active_source + self._last_payload = payload + + if previous_active is not None and active_source is None: + try: + self._sender.send_zero_burst(self._zero_burst_packets) + except Exception as error: + with self._lock: + self._last_error = str(error) + elif active_source is not None: + try: + self._sender.send_payload(payload) + with self._lock: + self._last_sent_at = time.monotonic() + self._last_error = "" + except Exception as error: + with self._lock: + self._last_error = str(error) + + previous_active = active_source + self._closing.wait(interval) + + try: + self._sender.send_zero_burst(self._zero_burst_packets) + except Exception: + pass + + def get_status(self) -> dict[str, Any]: + self.ensure_started() + now = time.monotonic() + with self._lock: + active_source, _payload, lease_ms = self._resolve_active_locked(now) + return { + "active_source": active_source, + "control_lease_remaining_ms": lease_ms, + "packet_counts": dict(self._packet_counts), + "send_rate_hz": self._send_rate_hz, + "source_lease_ms": self._source_lease_ms, + "zero_burst_packets": self._zero_burst_packets, + "last_error": self._last_error, + "last_sent_at_monotonic": self._last_sent_at, + } + + def close(self) -> None: + self._closing.set() + thread = self._thread + if thread is not None and thread.is_alive(): + thread.join(timeout=0.5) + + +class NativeUdpControlIngress: + def __init__(self, arbiter: ControlArbiter) -> None: + self._arbiter = arbiter + self._lock = threading.Lock() + self._thread: threading.Thread | None = None + self._closing = threading.Event() + self._started = False + self._bind_addr = "127.0.0.1:10921" + self._packets_received = 0 + self._invalid_packets = 0 + self._last_sender = "" + self._last_error = "" + + def ensure_started(self) -> None: + bind_addr = str(load_omnisocket_config().get("control_ingress", {}).get("native_udp_bind", "127.0.0.1:10921")) + with self._lock: + self._bind_addr = bind_addr + if self._closing.is_set(): + return + if self._thread is not None and self._thread.is_alive(): + return + self._started = True + self._thread = threading.Thread( + target=self._run, + name="native-udp-control-ingress", + daemon=True, + ) + self._thread.start() + + def _run(self) -> None: + try: + try: + host, port = parse_host_port(self._bind_addr) + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + sock.bind((host, port)) + sock.settimeout(0.1) + except Exception as error: + with self._lock: + self._last_error = str(error) + return + + with sock: + while not self._closing.is_set(): + try: + payload, sender_addr = sock.recvfrom(CONTROL_PACKET_SIZE + 64) + except socket.timeout: + continue + except OSError as error: + with self._lock: + if not self._closing.is_set(): + self._last_error = str(error) + return + + with self._lock: + self._last_sender = f"{sender_addr[0]}:{sender_addr[1]}" + + if len(payload) != CONTROL_PACKET_SIZE: + with self._lock: + self._invalid_packets += 1 + continue + + try: + self._arbiter.ingest_command(CONTROL_SOURCE_NATIVE_UDP, payload) + except Exception as error: + with self._lock: + self._last_error = str(error) + continue + + with self._lock: + self._packets_received += 1 + finally: + with self._lock: + self._started = False + self._thread = None + + def get_status(self) -> dict[str, Any]: + self.ensure_started() + with self._lock: + return { + "started": self._started, + "bind_addr": self._bind_addr, + "packets_received": self._packets_received, + "invalid_packets": self._invalid_packets, + "last_sender": self._last_sender, + "last_error": self._last_error, + } + + def close(self) -> None: + self._closing.set() + thread = self._thread + if thread is not None and thread.is_alive(): + thread.join(timeout=0.5) diff --git a/backend/monitoring/routing.py b/backend/monitoring/routing.py new file mode 100644 index 0000000..1da9004 --- /dev/null +++ b/backend/monitoring/routing.py @@ -0,0 +1,9 @@ +from django.urls import re_path + +from .consumers import ControlConsumer + + +websocket_urlpatterns = [ + re_path(r"^ws/control/$", ControlConsumer.as_asgi()), +] + diff --git a/backend/monitoring/services.py b/backend/monitoring/services.py index 79f992e..02ea861 100644 --- a/backend/monitoring/services.py +++ b/backend/monitoring/services.py @@ -1,523 +1,22 @@ -from __future__ import annotations - -from collections import deque -import json -import math -import os -import sys -import threading -import time -from datetime import UTC, datetime -from pathlib import Path -from typing import Any, Iterator - - -PROJECT_ROOT = Path(__file__).resolve().parents[2] -WORKSPACE_ROOT = PROJECT_ROOT.parent -JPEG_FRAME_DIR = WORKSPACE_ROOT / "RobotDataShow" / "jpeg-frames" -# GPS 数据 JSON 文件路径 -GEOSTREAM_JSON_PATH = WORKSPACE_ROOT / "GeoStream" / "gps_latest.json" -GEOSTREAM_STALE_SECONDS = 15 -OMNISOCKET_CONFIG_PATH = PROJECT_ROOT / "config" / "omnisocket_demo.yaml" -VIDEO_SOURCE_MODE = os.getenv("VIDEO_SOURCE_MODE", "auto").strip().lower() -OMNISOCKET_FRAME_FRESH_SECONDS = 2.0 -VIDEO_TIMESTAMP_SAMPLE_SIZE = 10 -VIDEO_TIMESTAMP_TRAILER_BYTES = 8 -VIDEO_TIMESTAMP_ENDIANNESS = "little" -VIDEO_TIMESTAMP_UNIT = "ms" -VIDEO_TIMESTAMP_MULTIPLIER_NS = 1_000_000 -VIDEO_TIMESTAMP_MAX_SKEW_NS = 7 * 24 * 60 * 60 * 1_000_000_000 - - -def utc_iso_now() -> str: - return datetime.now(UTC).isoformat(timespec="seconds").replace("+00:00", "Z") - - -class OmniSocketVideoReceiver: - def __init__(self) -> None: - self._lock = threading.Lock() - self._thread: threading.Thread | None = None - self._started = False - self._session = None - self._session_cls = None - self._binary_msg_type = None - self._video_defaults: dict[str, Any] = {} - self._latest_frame: bytes | None = None - self._latest_received_at = 0.0 - self._latest_sequence: int | None = None - self._latest_latency_ms: float | None = None - self._latest_timestamp_unit: str | None = None - self._latest_timestamp_endianness: str | None = None - self._latency_samples_ms: deque[float] = deque(maxlen=VIDEO_TIMESTAMP_SAMPLE_SIZE) - self._frames_received = 0 - self._last_error = "" - self._load_backend() - - def _load_backend(self) -> None: - # 服务启动时先尝试导入一次 Python/C 扩展。 - try: - self._import_backend() - except Exception as error: # pragma: no cover - 可选的运行时依赖 - self._last_error = f"omnisocket import failed: {error}" - - def _import_backend(self) -> None: - # 优先使用已经安装到当前 Python 环境里的 omnisocket。 - # 如果导入失败,再尝试样例目录下的本地 python 路径。 - try: - from omnisocket import MSG_TYPE_BINARY, Session, VIDEO_DEFAULTS # type: ignore - except ImportError: - python_dir = WORKSPACE_ROOT / "OmniSocketGo" / "python" - if python_dir.exists(): - sys.path.insert(0, str(python_dir)) - from omnisocket import MSG_TYPE_BINARY, Session, VIDEO_DEFAULTS # type: ignore - - self._binary_msg_type = MSG_TYPE_BINARY - self._session_cls = Session - self._video_defaults = dict(VIDEO_DEFAULTS) - - def ensure_started(self) -> None: - # 当第一次请求帧或状态时,再懒启动后台接收线程。 - if self._session_cls is None or self._binary_msg_type is None: - return - - with self._lock: - if self._started: - return - self._started = True - self._thread = threading.Thread( - target=self._run, - name="omnisocket-video-receiver", - daemon=True, - ) - self._thread.start() - - def _load_config(self) -> dict[str, Any]: - # 这里保持和 SampleCData/omnisocket_video_receiver.py 一样的配置结构: - # transport + video_receiver。 - # 即使配置文件不存在,也允许回退到默认值继续运行。 - config: dict[str, Any] = {} - if OMNISOCKET_CONFIG_PATH.exists(): - try: - try: - import yaml # type: ignore - - with OMNISOCKET_CONFIG_PATH.open("r", encoding="utf-8") as file: - config = yaml.safe_load(file) or {} - except ImportError: - # 当前配置文件结构非常简单,缺少 PyYAML 时用简化解析器兜底。 - config = self._load_simple_yaml_config(OMNISOCKET_CONFIG_PATH) - except Exception as error: # pragma: no cover - 可选依赖 - self._last_error = f"config load failed: {error}" - - transport_cfg = dict(config.get("transport", {})) - video_cfg = dict(config.get("video_receiver", {})) - - transport_cfg["server_addr"] = os.getenv( - "OMNISOCKET_SERVER_ADDR", - str(transport_cfg.get("server_addr", "127.0.0.1:10909")), - ) - transport_cfg["relay_via"] = os.getenv( - "OMNISOCKET_RELAY_VIA", - str(transport_cfg.get("relay_via", "")), - ) - transport_cfg["bind_ip"] = os.getenv( - "OMNISOCKET_BIND_IP", - str(transport_cfg.get("bind_ip", "")), - ) - transport_cfg["bind_device"] = os.getenv( - "OMNISOCKET_BIND_DEVICE", - str(transport_cfg.get("bind_device", "")), - ) - video_cfg["peer_id"] = os.getenv( - "OMNISOCKET_VIDEO_PEER_ID", - str(video_cfg.get("peer_id", "peer-a-video")), - ) - video_cfg["buffer_bytes"] = int( - os.getenv( - "OMNISOCKET_BUFFER_BYTES", - str(video_cfg.get("buffer_bytes", 1024 * 1024)), - ) - ) - - return { - "transport": transport_cfg, - "video_receiver": video_cfg, - } - - def _load_simple_yaml_config(self, path: Path) -> dict[str, Any]: - parsed: dict[str, Any] = {} - current_section: str | None = None - - with path.open("r", encoding="utf-8") as file: - for raw_line in file: - line = raw_line.split("#", 1)[0].rstrip() - if not line.strip(): - continue - - if not line.startswith(" "): - if not line.endswith(":"): - raise ValueError(f"invalid top-level yaml line: {raw_line.strip()}") - current_section = line[:-1].strip() - parsed[current_section] = {} - continue - - if current_section is None: - raise ValueError(f"yaml key outside section: {raw_line.strip()}") - - stripped = line.strip() - if ":" not in stripped: - raise ValueError(f"invalid yaml key line: {raw_line.strip()}") - - key, value = stripped.split(":", 1) - parsed[current_section][key.strip()] = self._parse_simple_yaml_scalar(value.strip()) - - return parsed - - def _parse_simple_yaml_scalar(self, value: str) -> Any: - if value in {'""', "''"}: - return "" - if len(value) >= 2 and value[0] == value[-1] and value[0] in {'"', "'"}: - return value[1:-1] - if value.lower() == "true": - return True - if value.lower() == "false": - return False - if value and value.lstrip("-").isdigit(): - return int(value) - return value - - def _connect_session(self): - # 这里和样例接收器一致:创建 Session(),然后使用 transport/video 配置建立连接。 - assert self._session_cls is not None - - config = self._load_config() - transport_cfg = config.get("transport", {}) - video_cfg = config.get("video_receiver", {}) - - session = self._session_cls() - session.connect( - server_addr=str(transport_cfg.get("server_addr", "127.0.0.1:10909")), - peer_id=str(video_cfg.get("peer_id", "peer-a-video")), - relay_via=str(transport_cfg.get("relay_via", "")), - bind_ip=str(transport_cfg.get("bind_ip", "")), - bind_device=str(transport_cfg.get("bind_device", "")), - **self._video_defaults, - ) - return session, int(video_cfg.get("buffer_bytes", 1024 * 1024)) - - def _extract_jpeg_payload(self, frame: bytes) -> bytes | None: - # 同时兼容两种帧格式: - # 1. 纯 JPEG 二进制 - # 2. 前 8 字节是序号,后面才是真正的 JPEG 数据 - if frame.startswith(b"\xff\xd8"): - return frame - if len(frame) > 8 and frame[8:10] == b"\xff\xd8": - return frame[8:] - return None +from __future__ import annotations - def _split_jpeg_frame_and_trailer(self, frame: bytes) -> tuple[bytes, bytes] | None: - jpeg_payload = self._extract_jpeg_payload(frame) - if jpeg_payload is None: - return None +from .control import ControlArbiter, NativeUdpControlIngress, OmniSocketControlSender +from .telemetry import GpsDataService, NetworkTelemetryService +from .video import OmniSocketVideoReceiver, VideoFrameService - # 当前发送端协议是 JPEG 末尾固定追加 8 字节 little-endian 毫秒时间戳。 - if jpeg_payload.endswith(b"\xff\xd9"): - return jpeg_payload, b"" - if ( - len(jpeg_payload) >= VIDEO_TIMESTAMP_TRAILER_BYTES + 2 - and jpeg_payload[-(VIDEO_TIMESTAMP_TRAILER_BYTES + 2) : -VIDEO_TIMESTAMP_TRAILER_BYTES] == b"\xff\xd9" - ): - return jpeg_payload[:-VIDEO_TIMESTAMP_TRAILER_BYTES], jpeg_payload[-VIDEO_TIMESTAMP_TRAILER_BYTES:] +_video_receiver = OmniSocketVideoReceiver() +_control_sender = OmniSocketControlSender() - # 兜底兼容旧格式,避免直接丢帧。 - eoi_index = jpeg_payload.rfind(b"\xff\xd9") - if eoi_index < 0: - return jpeg_payload, b"" +control_arbiter = ControlArbiter(_control_sender) +native_control_ingress = NativeUdpControlIngress(control_arbiter) - trailer_start = eoi_index + 2 - return jpeg_payload[:trailer_start], jpeg_payload[trailer_start:] +video_service = VideoFrameService(_video_receiver) +gps_service = GpsDataService() +network_service = NetworkTelemetryService( + _video_receiver, + _control_sender, + control_arbiter, + native_control_ingress, +) - def _extract_jpeg_frame(self, frame: bytes) -> bytes | None: - split_payload = self._split_jpeg_frame_and_trailer(frame) - if split_payload is None: - return None - jpeg_frame, _ = split_payload - return jpeg_frame - - def _extract_sequence(self, frame: bytes) -> int | None: - if len(frame) >= 8 and not frame.startswith(b"\xff\xd8"): - return int.from_bytes(frame[:8], "big") - return None - - def _extract_frame_tail(self, frame: bytes) -> bytes: - split_payload = self._split_jpeg_frame_and_trailer(frame) - if split_payload is None: - return b"" - _, trailer = split_payload - return trailer - - def _extract_frame_timestamp(self, frame: bytes) -> tuple[int, str, str] | None: - trailer = self._extract_frame_tail(frame) - if len(trailer) != VIDEO_TIMESTAMP_TRAILER_BYTES: - return None - - value = int.from_bytes(trailer, VIDEO_TIMESTAMP_ENDIANNESS, signed=False) - if value <= 0: - return None - - timestamp_ns = value * VIDEO_TIMESTAMP_MULTIPLIER_NS - if abs(time.time_ns() - timestamp_ns) > VIDEO_TIMESTAMP_MAX_SKEW_NS: - return None - - return timestamp_ns, VIDEO_TIMESTAMP_UNIT, VIDEO_TIMESTAMP_ENDIANNESS - - def _run(self) -> None: - # 后台持续接收循环: - # connect -> recv_into(buffer) -> 按 body_len 截出有效内容 -> 把最新 JPEG 帧缓存在内存里 - while True: - try: - session, buffer_bytes = self._connect_session() - self._session = session - self._last_error = "" - buffer = bytearray(buffer_bytes) - - while True: - # 从 OmniSocket / C 侧收一帧原始二进制数据到缓冲区 - meta = session.recv_into(buffer, timeout_ms=1000) - if meta is None: - continue - if meta.get("msg_type") != self._binary_msg_type: - continue - - # 真正收到的有效部分切出来,形成当前这一帧 - frame = bytes(buffer[: meta["body_len"]]) - jpeg_frame = self._extract_jpeg_frame(frame) - if jpeg_frame is None: - self._last_error = "received non-JPEG binary frame" - continue - - timestamp_meta = self._extract_frame_timestamp(frame) - latency_ms = None - if timestamp_meta is not None: - timestamp_ns, unit, endianness = timestamp_meta - latency_ms = round((time.time_ns() - timestamp_ns) / 1_000_000, 3) - else: - unit = None - endianness = None - - with self._lock: - # 缓存:这里只保留最新的一张 JPEG 帧,供 Web 接口直接返回给前端。 - self._latest_frame = jpeg_frame - self._latest_received_at = time.time() - self._latest_sequence = self._extract_sequence(frame) - self._latest_latency_ms = latency_ms - self._latest_timestamp_unit = unit - self._latest_timestamp_endianness = endianness - if latency_ms is not None: - self._latency_samples_ms.append(latency_ms) - self._frames_received += 1 - except Exception as error: # pragma: no cover - 运行时集成路径 - self._last_error = str(error) - time.sleep(2) - finally: - if self._session is not None: - try: - self._session.close() - except Exception: - pass - self._session = None - - def get_latest_frame(self) -> bytes | None: - # 把内存里最新的一张真实 JPEG 帧暴露给 Django 视图层。 - # 如果这张帧已经过旧,就返回 None,让上层回退到本地模拟帧。 - self.ensure_started() - with self._lock: - if self._latest_frame is None: - return None - if time.time() - self._latest_received_at > OMNISOCKET_FRAME_FRESH_SECONDS: - return None - return self._latest_frame - - def get_status(self) -> dict[str, Any]: - self.ensure_started() - config = self._load_config() - transport_cfg = config.get("transport", {}) - video_cfg = config.get("video_receiver", {}) - with self._lock: - has_recent_frame = self._latest_frame is not None and ( - time.time() - self._latest_received_at <= OMNISOCKET_FRAME_FRESH_SECONDS - ) - if has_recent_frame and self._latest_latency_ms is not None: - timing_status = { - "available": True, - "latest_delta_ms": self._latest_latency_ms, - "delta_samples_ms": list(reversed(self._latency_samples_ms)), - "sample_count": len(self._latency_samples_ms), - "sample_window_size": VIDEO_TIMESTAMP_SAMPLE_SIZE, - "timestamp_unit": self._latest_timestamp_unit, - "timestamp_endianness": self._latest_timestamp_endianness, - } - else: - timing_status = { - "available": False, - "latest_delta_ms": None, - "delta_samples_ms": [], - "sample_count": 0, - "sample_window_size": VIDEO_TIMESTAMP_SAMPLE_SIZE, - "timestamp_unit": None, - "timestamp_endianness": None, - } - return { - "backend_ready": self._session_cls is not None, - "mode": VIDEO_SOURCE_MODE, - "connected": self._session is not None, - "has_recent_frame": has_recent_frame, - "frames_received": self._frames_received, - "latest_sequence": self._latest_sequence, - "last_error": self._last_error, - "config_path": str(OMNISOCKET_CONFIG_PATH), - "server_addr": str(transport_cfg.get("server_addr", "")), - "relay_via": str(transport_cfg.get("relay_via", "")), - "peer_id": str(video_cfg.get("peer_id", "")), - "buffer_bytes": int(video_cfg.get("buffer_bytes", 0)), - "timing": timing_status, - } - - -class VideoFrameService: - def __init__(self) -> None: - self._receiver = OmniSocketVideoReceiver() - - def get_status(self) -> dict[str, Any]: - receiver_status = self._receiver.get_status() - receiver_frame = self._receiver.get_latest_frame() - - # 如果已经收到了真实视频帧,就把当前状态标记为实时模式,给前端显示。 - if receiver_frame is not None: - return { - "available": True, - "source_mode": "omnisocket-jpeg-live", - "frame_count": receiver_status["frames_received"], - "fps": 30, - "frame_dir": str(JPEG_FRAME_DIR), - "source_detail": f"peer stream active, frames={receiver_status['frames_received']}", - "receiver": receiver_status, - "timing": receiver_status["timing"], - } - - wait_detail = receiver_status["last_error"] or ( - "未实时获取真实值,请检查 OmniSocket 服务、视频发送端和接收配置。" - ) - return { - "available": False, - "source_mode": "omnisocket-waiting", - "frame_count": receiver_status["frames_received"], - "fps": 30, - "frame_dir": str(JPEG_FRAME_DIR), - "source_detail": wait_detail, - "receiver": receiver_status, - "timing": receiver_status["timing"], - } - - def get_next_frame(self) -> bytes: - # 优先返回从 Python/C 视频接收器拿到的最新真实 JPEG 帧。 - receiver_frame = self._receiver.get_latest_frame() - if receiver_frame is not None: - return receiver_frame - - raise RuntimeError("未实时获取真实值,当前没有可用的真实 JPEG 帧。") - - def iter_mjpeg(self, fps: float = 6.0) -> Iterator[bytes]: - frame_interval = 1.0 / max(1.0, min(fps, 30.0)) - - while True: - frame = self.get_next_frame() - header = ( - b"--frame\r\n" - b"Content-Type: image/jpeg\r\n" - + f"Content-Length: {len(frame)}\r\n\r\n".encode("ascii") - ) - yield header + frame + b"\r\n" - time.sleep(frame_interval) - - -class GpsDataService: - def get_latest(self) -> dict[str, Any]: - # 优先使用由 GeoStream C 解析器生成的最新数据包。 - payload = self._read_geostream_payload() - if payload is not None: - payload["source_mode"] = "geostream-json" - payload["updated_at"] = utc_iso_now() - return payload - - # 当没有最新的 GPS 文件可用时,退回到使用一个移动的演示点位。 - return self._build_simulated_payload() - - def _read_geostream_payload(self) -> dict[str, Any] | None: - # Django 后端目前还不直接解析串口数据流。 - # 它只读取由 GeoStream/parse_gps.c 生成的 JSON 文件。 - if not GEOSTREAM_JSON_PATH.exists(): - return None - - # 忽略过期的文件,以便 UI 可以退回到使用模拟数据, - # 而不是把旧位置当作实时位置来显示。 - age_seconds = time.time() - GEOSTREAM_JSON_PATH.stat().st_mtime - if age_seconds > GEOSTREAM_STALE_SECONDS: - return None - - try: - with GEOSTREAM_JSON_PATH.open("r", encoding="utf-8") as file: - return json.load(file) - except (OSError, json.JSONDecodeError): - return None - - def _build_simulated_payload(self) -> dict[str, Any]: - # 为本地 UI 开发构建一个平滑的伪造轨迹。 - tick = time.time() / 12.0 - latitude = 31.2304 + math.sin(tick) * 0.0014 - longitude = 121.4737 + math.cos(tick) * 0.0018 - - return { - "has_fix": True, - "utc_time": datetime.now(UTC).strftime("%H:%M:%S"), - "latitude": round(latitude, 6), - "longitude": round(longitude, 6), - "satellites": 14 + int((math.sin(tick * 0.7) + 1.0) * 2), - "altitude_m": round(6.5 + math.cos(tick * 0.5) * 1.2, 2), - "coordinate_system": "WGS84", - "source_sentence": "SIMULATED", - "raw_coordinate_format": "decimal degrees", - "source_mode": "simulated", - "updated_at": utc_iso_now(), - } - - -class NetworkTelemetryService: - def get_latest(self) -> dict[str, Any]: - tick = time.time() - - latency_ms = 28 + math.sin(tick / 4.0) * 6 - jitter_ms = 3 + math.cos(tick / 5.0) * 1.4 - tx_kbps = 780 + math.sin(tick / 6.0) * 160 - rx_kbps = 720 + math.cos(tick / 7.0) * 140 - packet_loss_pct = max(0.0, 0.35 + math.sin(tick / 9.0) * 0.25) - signal_dbm = -53 - abs(math.sin(tick / 8.0) * 7) - - return { - "peer_status": "online", - "latency_ms": round(latency_ms, 1), - "jitter_ms": round(jitter_ms, 1), - "packet_loss_pct": round(packet_loss_pct, 2), - "tx_kbps": int(tx_kbps), - "rx_kbps": int(rx_kbps), - "signal_dbm": round(signal_dbm, 1), - "transport": "OmniSocket / simulated", - "source_mode": "simulated", - "updated_at": utc_iso_now(), - } - - -video_service = VideoFrameService() -gps_service = GpsDataService() -network_service = NetworkTelemetryService() diff --git a/backend/monitoring/telemetry.py b/backend/monitoring/telemetry.py new file mode 100644 index 0000000..f3c594f --- /dev/null +++ b/backend/monitoring/telemetry.py @@ -0,0 +1,161 @@ +from __future__ import annotations + +import json +import math +import threading +import time +from datetime import UTC, datetime +from typing import Any + +from .common import GEOSTREAM_JSON_PATH, GEOSTREAM_STALE_SECONDS, utc_iso_now +from .control import ControlArbiter, NativeUdpControlIngress, OmniSocketControlSender +from .video import OmniSocketVideoReceiver + + +class GpsDataService: + def get_latest(self) -> dict[str, Any]: + payload = self._read_geostream_payload() + if payload is not None: + payload["source_mode"] = "geostream-json" + payload["updated_at"] = utc_iso_now() + return payload + + return self._build_simulated_payload() + + def _read_geostream_payload(self) -> dict[str, Any] | None: + if not GEOSTREAM_JSON_PATH.exists(): + return None + + age_seconds = time.time() - GEOSTREAM_JSON_PATH.stat().st_mtime + if age_seconds > GEOSTREAM_STALE_SECONDS: + return None + + try: + with GEOSTREAM_JSON_PATH.open("r", encoding="utf-8") as file: + return json.load(file) + except (OSError, json.JSONDecodeError): + return None + + def _build_simulated_payload(self) -> dict[str, Any]: + tick = time.time() / 12.0 + latitude = 31.2304 + math.sin(tick) * 0.0014 + longitude = 121.4737 + math.cos(tick) * 0.0018 + + return { + "has_fix": True, + "utc_time": datetime.now(UTC).strftime("%H:%M:%S"), + "latitude": round(latitude, 6), + "longitude": round(longitude, 6), + "satellites": 14 + int((math.sin(tick * 0.7) + 1.0) * 2), + "altitude_m": round(6.5 + math.cos(tick * 0.5) * 1.2, 2), + "coordinate_system": "WGS84", + "source_sentence": "SIMULATED", + "raw_coordinate_format": "decimal degrees", + "source_mode": "simulated", + "updated_at": utc_iso_now(), + } + + +class NetworkTelemetryService: + def __init__( + self, + video_receiver: OmniSocketVideoReceiver, + control_sender: OmniSocketControlSender, + control_arbiter: ControlArbiter, + native_ingress: NativeUdpControlIngress, + ) -> None: + self._video_receiver = video_receiver + self._control_sender = control_sender + self._control_arbiter = control_arbiter + self._native_ingress = native_ingress + self._rate_lock = threading.Lock() + self._last_rate_sample: tuple[float, int, int] | None = None + + def _compute_rates(self, send_bytes: int, recv_bytes: int) -> tuple[float, float]: + now = time.monotonic() + with self._rate_lock: + previous = self._last_rate_sample + self._last_rate_sample = (now, send_bytes, recv_bytes) + + if previous is None: + return 0.0, 0.0 + + prev_time, prev_send, prev_recv = previous + elapsed = now - prev_time + if elapsed <= 0.0: + return 0.0, 0.0 + + tx_kbps = max(0.0, ((send_bytes - prev_send) * 8.0) / elapsed / 1000.0) + rx_kbps = max(0.0, ((recv_bytes - prev_recv) * 8.0) / elapsed / 1000.0) + return tx_kbps, rx_kbps + + def get_latest(self) -> dict[str, Any]: + self._video_receiver.ensure_started() + self._control_arbiter.ensure_started() + self._native_ingress.ensure_started() + + video_app = self._video_receiver.session_stats() + control_app = self._control_sender.session_stats() + video_kcp = self._video_receiver.session_kcp_stats() + control_kcp = self._control_sender.session_kcp_stats() + arbiter_status = self._control_arbiter.get_status() + ingress_status = self._native_ingress.get_status() + sender_status = self._control_sender.get_status() + + total_send_bytes = int(video_app.get("send_bytes", 0)) + int(control_app.get("send_bytes", 0)) + total_recv_bytes = int(video_app.get("recv_bytes", 0)) + int(control_app.get("recv_bytes", 0)) + tx_kbps, rx_kbps = self._compute_rates(total_send_bytes, total_recv_bytes) + + video_connected = int(video_app.get("connected", 0)) + control_connected = int(control_app.get("connected", 0)) + connected_sessions = video_connected + control_connected + + primary_kcp = control_kcp if control_connected else video_kcp + latency_ms = primary_kcp.get("srtt_ms") + jitter_ms = primary_kcp.get("srttvar_ms") + + if connected_sessions > 0: + peer_status = "online" + elif sender_status.get("backend_ready"): + peer_status = "idle" + else: + peer_status = "backend-unavailable" + + return { + "peer_status": peer_status, + "latency_ms": latency_ms, + "jitter_ms": jitter_ms, + "packet_loss_pct": None, + "tx_kbps": round(tx_kbps, 3), + "rx_kbps": round(rx_kbps, 3), + "transport": "OmniSocket / kcp", + "source_mode": "omnisocket-live" if connected_sessions > 0 else "omnisocket-idle", + "updated_at": utc_iso_now(), + "active_control_source": arbiter_status["active_source"], + "control_lease_remaining_ms": arbiter_status["control_lease_remaining_ms"], + "combined": { + "connected_sessions": connected_sessions, + "send_bytes": total_send_bytes, + "recv_bytes": total_recv_bytes, + "tx_kbps": round(tx_kbps, 3), + "rx_kbps": round(rx_kbps, 3), + }, + "sessions": { + "video": { + "app": video_app, + "kcp": video_kcp, + }, + "control": { + "app": control_app, + "kcp": control_kcp, + }, + }, + "ingress": { + "native_udp": ingress_status, + }, + "control": { + "arbiter": arbiter_status, + "sender": sender_status, + }, + } + diff --git a/backend/monitoring/video.py b/backend/monitoring/video.py new file mode 100644 index 0000000..38abeba --- /dev/null +++ b/backend/monitoring/video.py @@ -0,0 +1,343 @@ +from __future__ import annotations + +from collections import deque +import sys +import threading +import time +from typing import Any, Iterator + +from .common import ( + JPEG_FRAME_DIR, + OMNISOCKET_CONFIG_PATH, + OMNISOCKET_FRAME_FRESH_SECONDS, + PROJECT_ROOT, + VIDEO_SOURCE_MODE, + VIDEO_TIMESTAMP_ENDIANNESS, + VIDEO_TIMESTAMP_MAX_SKEW_NS, + VIDEO_TIMESTAMP_MULTIPLIER_NS, + VIDEO_TIMESTAMP_SAMPLE_SIZE, + VIDEO_TIMESTAMP_TRAILER_BYTES, + VIDEO_TIMESTAMP_UNIT, + WORKSPACE_ROOT, + load_omnisocket_config, +) + + +def safe_kcp_stats(session: Any) -> dict[str, Any]: + if session is None or not hasattr(session, "kcp_stats"): + return {} + try: + return dict(session.kcp_stats()) + except Exception: + return {} + + +class OmniSocketVideoReceiver: + def __init__(self) -> None: + self._lock = threading.Lock() + self._thread: threading.Thread | None = None + self._started = False + self._session = None + self._session_cls = None + self._binary_msg_type = None + self._video_defaults: dict[str, Any] = {} + self._latest_frame: bytes | None = None + self._latest_received_at = 0.0 + self._latest_sequence: int | None = None + self._latest_latency_ms: float | None = None + self._latest_timestamp_unit: str | None = None + self._latest_timestamp_endianness: str | None = None + self._latency_samples_ms: deque[float] = deque(maxlen=VIDEO_TIMESTAMP_SAMPLE_SIZE) + self._frames_received = 0 + self._last_error = "" + self._load_backend() + + def _load_backend(self) -> None: + try: + self._import_backend() + except Exception as error: # pragma: no cover - optional runtime dependency + self._last_error = f"omnisocket import failed: {error}" + + def _import_backend(self) -> None: + try: + from omnisocket import MSG_TYPE_BINARY, Session, VIDEO_DEFAULTS # type: ignore + except ImportError: + python_dir = WORKSPACE_ROOT / "OmniSocketGo" / "python" + if python_dir.exists(): + sys.path.insert(0, str(python_dir)) + from omnisocket import MSG_TYPE_BINARY, Session, VIDEO_DEFAULTS # type: ignore + + self._binary_msg_type = MSG_TYPE_BINARY + self._session_cls = Session + self._video_defaults = dict(VIDEO_DEFAULTS) + + def ensure_started(self) -> None: + if self._session_cls is None or self._binary_msg_type is None: + return + + with self._lock: + if self._started: + return + self._started = True + self._thread = threading.Thread( + target=self._run, + name="omnisocket-video-receiver", + daemon=True, + ) + self._thread.start() + + def _connect_session(self): + assert self._session_cls is not None + + config = load_omnisocket_config() + transport_cfg = config.get("transport", {}) + video_cfg = config.get("video_receiver", {}) + + session = self._session_cls() + session.connect( + server_addr=str(transport_cfg.get("server_addr", "127.0.0.1:10909")), + peer_id=str(video_cfg.get("peer_id", "peer-a-video")), + relay_via=str(transport_cfg.get("relay_via", "")), + bind_ip=str(transport_cfg.get("bind_ip", "")), + bind_device=str(transport_cfg.get("bind_device", "")), + **self._video_defaults, + ) + return session, int(video_cfg.get("buffer_bytes", 1024 * 1024)) + + def _extract_jpeg_payload(self, frame: bytes) -> bytes | None: + if frame.startswith(b"\xff\xd8"): + return frame + if len(frame) > 8 and frame[8:10] == b"\xff\xd8": + return frame[8:] + return None + + def _split_jpeg_frame_and_trailer(self, frame: bytes) -> tuple[bytes, bytes] | None: + jpeg_payload = self._extract_jpeg_payload(frame) + if jpeg_payload is None: + return None + + if jpeg_payload.endswith(b"\xff\xd9"): + return jpeg_payload, b"" + + if ( + len(jpeg_payload) >= VIDEO_TIMESTAMP_TRAILER_BYTES + 2 + and jpeg_payload[-(VIDEO_TIMESTAMP_TRAILER_BYTES + 2) : -VIDEO_TIMESTAMP_TRAILER_BYTES] == b"\xff\xd9" + ): + return jpeg_payload[:-VIDEO_TIMESTAMP_TRAILER_BYTES], jpeg_payload[-VIDEO_TIMESTAMP_TRAILER_BYTES:] + + eoi_index = jpeg_payload.rfind(b"\xff\xd9") + if eoi_index < 0: + return jpeg_payload, b"" + + trailer_start = eoi_index + 2 + return jpeg_payload[:trailer_start], jpeg_payload[trailer_start:] + + def _extract_jpeg_frame(self, frame: bytes) -> bytes | None: + split_payload = self._split_jpeg_frame_and_trailer(frame) + if split_payload is None: + return None + jpeg_frame, _ = split_payload + return jpeg_frame + + def _extract_sequence(self, frame: bytes) -> int | None: + if len(frame) >= 8 and not frame.startswith(b"\xff\xd8"): + return int.from_bytes(frame[:8], "big") + return None + + def _extract_frame_tail(self, frame: bytes) -> bytes: + split_payload = self._split_jpeg_frame_and_trailer(frame) + if split_payload is None: + return b"" + _, trailer = split_payload + return trailer + + def _extract_frame_timestamp(self, frame: bytes) -> tuple[int, str, str] | None: + trailer = self._extract_frame_tail(frame) + if len(trailer) != VIDEO_TIMESTAMP_TRAILER_BYTES: + return None + + value = int.from_bytes(trailer, VIDEO_TIMESTAMP_ENDIANNESS, signed=False) + if value <= 0: + return None + + timestamp_ns = value * VIDEO_TIMESTAMP_MULTIPLIER_NS + if abs(time.time_ns() - timestamp_ns) > VIDEO_TIMESTAMP_MAX_SKEW_NS: + return None + + return timestamp_ns, VIDEO_TIMESTAMP_UNIT, VIDEO_TIMESTAMP_ENDIANNESS + + def _run(self) -> None: + while True: + try: + session, buffer_bytes = self._connect_session() + self._session = session + self._last_error = "" + buffer = bytearray(buffer_bytes) + + while True: + meta = session.recv_into(buffer, timeout_ms=1000) + if meta is None: + continue + if meta.get("msg_type") != self._binary_msg_type: + continue + + frame = bytes(buffer[: meta["body_len"]]) + jpeg_frame = self._extract_jpeg_frame(frame) + if jpeg_frame is None: + self._last_error = "received non-JPEG binary frame" + continue + + timestamp_meta = self._extract_frame_timestamp(frame) + latency_ms = None + if timestamp_meta is not None: + timestamp_ns, unit, endianness = timestamp_meta + latency_ms = round((time.time_ns() - timestamp_ns) / 1_000_000, 3) + else: + unit = None + endianness = None + + with self._lock: + self._latest_frame = jpeg_frame + self._latest_received_at = time.time() + self._latest_sequence = self._extract_sequence(frame) + self._latest_latency_ms = latency_ms + self._latest_timestamp_unit = unit + self._latest_timestamp_endianness = endianness + if latency_ms is not None: + self._latency_samples_ms.append(latency_ms) + self._frames_received += 1 + except Exception as error: # pragma: no cover - runtime integration path + self._last_error = str(error) + time.sleep(2) + finally: + if self._session is not None: + try: + self._session.close() + except Exception: + pass + self._session = None + + def get_latest_frame(self) -> bytes | None: + self.ensure_started() + with self._lock: + if self._latest_frame is None: + return None + if time.time() - self._latest_received_at > OMNISOCKET_FRAME_FRESH_SECONDS: + return None + return self._latest_frame + + def session_stats(self) -> dict[str, Any]: + self.ensure_started() + with self._lock: + session = self._session + if session is None: + return {"connected": 0} + try: + return dict(session.stats()) + except Exception: + return {"connected": 0} + + def session_kcp_stats(self) -> dict[str, Any]: + self.ensure_started() + with self._lock: + session = self._session + return safe_kcp_stats(session) + + def get_status(self) -> dict[str, Any]: + self.ensure_started() + config = load_omnisocket_config() + transport_cfg = config.get("transport", {}) + video_cfg = config.get("video_receiver", {}) + with self._lock: + has_recent_frame = self._latest_frame is not None and ( + time.time() - self._latest_received_at <= OMNISOCKET_FRAME_FRESH_SECONDS + ) + if has_recent_frame and self._latest_latency_ms is not None: + timing_status = { + "available": True, + "latest_delta_ms": self._latest_latency_ms, + "delta_samples_ms": list(reversed(self._latency_samples_ms)), + "sample_count": len(self._latency_samples_ms), + "sample_window_size": VIDEO_TIMESTAMP_SAMPLE_SIZE, + "timestamp_unit": self._latest_timestamp_unit, + "timestamp_endianness": self._latest_timestamp_endianness, + } + else: + timing_status = { + "available": False, + "latest_delta_ms": None, + "delta_samples_ms": [], + "sample_count": 0, + "sample_window_size": VIDEO_TIMESTAMP_SAMPLE_SIZE, + "timestamp_unit": None, + "timestamp_endianness": None, + } + return { + "backend_ready": self._session_cls is not None, + "mode": VIDEO_SOURCE_MODE, + "connected": self._session is not None, + "has_recent_frame": has_recent_frame, + "frames_received": self._frames_received, + "latest_sequence": self._latest_sequence, + "last_error": self._last_error, + "config_path": str(OMNISOCKET_CONFIG_PATH), + "server_addr": str(transport_cfg.get("server_addr", "")), + "relay_via": str(transport_cfg.get("relay_via", "")), + "peer_id": str(video_cfg.get("peer_id", "")), + "buffer_bytes": int(video_cfg.get("buffer_bytes", 0)), + "timing": timing_status, + } + + +class VideoFrameService: + def __init__(self, receiver: OmniSocketVideoReceiver) -> None: + self._receiver = receiver + + def get_status(self) -> dict[str, Any]: + receiver_status = self._receiver.get_status() + receiver_frame = self._receiver.get_latest_frame() + + if receiver_frame is not None: + return { + "available": True, + "source_mode": "omnisocket-jpeg-live", + "frame_count": receiver_status["frames_received"], + "fps": 30, + "frame_dir": str(JPEG_FRAME_DIR), + "source_detail": f"peer stream active, frames={receiver_status['frames_received']}", + "receiver": receiver_status, + "timing": receiver_status["timing"], + } + + wait_detail = receiver_status["last_error"] or ( + "waiting for live OmniSocket JPEG frames; check the hub, sender, and receiver configuration" + ) + return { + "available": False, + "source_mode": "omnisocket-waiting", + "frame_count": receiver_status["frames_received"], + "fps": 30, + "frame_dir": str(JPEG_FRAME_DIR), + "source_detail": wait_detail, + "receiver": receiver_status, + "timing": receiver_status["timing"], + } + + def get_next_frame(self) -> bytes: + receiver_frame = self._receiver.get_latest_frame() + if receiver_frame is not None: + return receiver_frame + raise RuntimeError("no live OmniSocket JPEG frame is currently available") + + def iter_mjpeg(self, fps: float = 6.0) -> Iterator[bytes]: + frame_interval = 1.0 / max(1.0, min(fps, 30.0)) + while True: + frame = self.get_next_frame() + header = ( + b"--frame\r\n" + b"Content-Type: image/jpeg\r\n" + + f"Content-Length: {len(frame)}\r\n\r\n".encode("ascii") + ) + yield header + frame + b"\r\n" + time.sleep(frame_interval) + diff --git a/config/omnisocket_demo.yaml b/config/omnisocket_demo.yaml index 6199842..7c7d152 100644 --- a/config/omnisocket_demo.yaml +++ b/config/omnisocket_demo.yaml @@ -8,6 +8,16 @@ video_receiver: peer_id: "peer-a-video" buffer_bytes: 1048576 +control_sender: + peer_id: "peer-a-ctrl" + target_peer: "peer-b-ctrl" + +control_ingress: + native_udp_bind: "127.0.0.1:10921" + source_lease_ms: 300 + send_rate_hz: 20.0 + zero_burst_packets: 3 + video_sender: peer_id: "peer-b-video" target_peer: "peer-a-video" diff --git a/frontend/src/components/ControlPanel.vue b/frontend/src/components/ControlPanel.vue new file mode 100644 index 0000000..b8a8951 --- /dev/null +++ b/frontend/src/components/ControlPanel.vue @@ -0,0 +1,298 @@ + + + + + diff --git a/frontend/src/components/NetworkPanel.vue b/frontend/src/components/NetworkPanel.vue index dc98dc2..69a1f39 100644 --- a/frontend/src/components/NetworkPanel.vue +++ b/frontend/src/components/NetworkPanel.vue @@ -9,10 +9,12 @@ const props = defineProps<{ const updatedAt = computed(() => { if (!props.network?.updated_at) { - return '暂无' + return 'unavailable' } return new Date(props.network.updated_at).toLocaleString('zh-CN', { hour12: false }) }) + +const activeSource = computed(() => props.network?.active_control_source ?? 'none') @@ -102,7 +127,9 @@ h2 { gap: 12px; } -.stat-card { +.stat-card, +.summary, +.session-card { padding: 14px; border-radius: 16px; background: rgba(7, 14, 26, 0.78); @@ -120,32 +147,39 @@ h2 { font-size: 22px; } -.summary { - padding: 14px; - border-radius: 16px; - background: rgba(7, 14, 26, 0.78); - border: 1px solid rgba(133, 147, 169, 0.2); +.summary, +.session-card { color: #d5dbee; } -.summary p { +.summary p, +.session-card h3, +.session-card p { margin: 0; } -.summary p + p { +.summary p + p, +.session-card p + p { margin-top: 8px; } +.session-grid { + display: grid; + grid-template-columns: repeat(2, minmax(0, 1fr)); + gap: 12px; +} + @media (max-width: 960px) { - .stats { + .stats, + .session-grid { grid-template-columns: repeat(2, minmax(0, 1fr)); } } @media (max-width: 640px) { - .stats { + .stats, + .session-grid { grid-template-columns: 1fr; } } - diff --git a/frontend/src/composables/useMonitoringData.ts b/frontend/src/composables/useMonitoringData.ts index 6c372e3..38c1dc3 100644 --- a/frontend/src/composables/useMonitoringData.ts +++ b/frontend/src/composables/useMonitoringData.ts @@ -25,7 +25,7 @@ export function useMonitoringData(options: UseMonitoringDataOptions = {}) { video.value = snapshot.video errorMessage.value = '' } catch (error) { - errorMessage.value = error instanceof Error ? error.message : '数据加载失败' + errorMessage.value = error instanceof Error ? error.message : 'Failed to load monitoring data' } finally { loading.value = false } @@ -36,9 +36,9 @@ export function useMonitoringData(options: UseMonitoringDataOptions = {}) { return errorMessage.value } if (loading.value) { - return '正在连接 Django 后端并加载监控数据...' + return 'Connecting to the Django backend and loading live monitoring data...' } - return '页面已连接 Django 后端。GPS 与网络状态按当前页面策略轮询更新,视频区域单独按目标 30FPS 请求单帧 JPEG。' + return 'Dashboard connected. Video, GPS, and live session telemetry refresh continuously from the unified A-side daemon.' }) onMounted(() => { diff --git a/frontend/src/lib/api.ts b/frontend/src/lib/api.ts index 686f310..6b166f9 100644 --- a/frontend/src/lib/api.ts +++ b/frontend/src/lib/api.ts @@ -23,3 +23,13 @@ export function fetchVideoStatus() { export function buildVideoFrameUrl(frameKey: number) { return `${API_BASE}/api/video/frame/?frame=${frameKey}&t=${Date.now()}` } + +export function buildControlWebSocketUrl() { + const url = new URL(API_BASE, window.location.origin) + const basePath = url.pathname.replace(/\/$/, '') + url.protocol = url.protocol === 'https:' ? 'wss:' : 'ws:' + url.pathname = `${basePath}/ws/control/` + url.search = '' + url.hash = '' + return url.toString() +} diff --git a/frontend/src/types.ts b/frontend/src/types.ts index 648a169..68d72fe 100644 --- a/frontend/src/types.ts +++ b/frontend/src/types.ts @@ -12,17 +12,96 @@ export interface GpsTelemetry { updated_at: string } +export interface SessionAppStats { + connected: number + send_calls?: number + send_bytes?: number + send_errors?: number + recv_calls?: number + recv_bytes?: number + recv_timeouts?: number + recv_errors?: number +} + +export interface SessionKcpStats { + connected?: number + conv?: number + rto_ms?: number + srtt_ms?: number + srttvar_ms?: number + snd_queue?: number + rcv_queue?: number + snd_buffer?: number + xmit_total?: number +} + +export interface SessionTelemetry { + app: SessionAppStats + kcp: SessionKcpStats +} + +export interface NativeUdpIngress { + started: boolean + bind_addr: string + packets_received: number + invalid_packets: number + last_sender: string + last_error: string +} + +export interface ControlArbiterStatus { + active_source: string | null + control_lease_remaining_ms: number + packet_counts: Record + send_rate_hz: number + source_lease_ms: number + zero_burst_packets: number + last_error: string + last_sent_at_monotonic: number +} + +export interface ControlSenderStatus { + backend_ready: boolean + started: boolean + connected: boolean + peer_id: string + target_peer: string + send_count: number + send_errors: number + drain_errors: number + last_error: string +} + export interface NetworkTelemetry { peer_status: string - latency_ms: number - jitter_ms: number - packet_loss_pct: number + latency_ms: number | null + jitter_ms: number | null + packet_loss_pct: number | null tx_kbps: number rx_kbps: number - signal_dbm: number transport: string source_mode: string updated_at: string + active_control_source: string | null + control_lease_remaining_ms: number + combined: { + connected_sessions: number + send_bytes: number + recv_bytes: number + tx_kbps: number + rx_kbps: number + } + sessions: { + video: SessionTelemetry + control: SessionTelemetry + } + ingress: { + native_udp: NativeUdpIngress + } + control: { + arbiter: ControlArbiterStatus + sender: ControlSenderStatus + } } export interface VideoStatus { diff --git a/frontend/src/views/DashboardView.vue b/frontend/src/views/DashboardView.vue index 92cc2aa..6595de0 100644 --- a/frontend/src/views/DashboardView.vue +++ b/frontend/src/views/DashboardView.vue @@ -1,4 +1,5 @@