from __future__ import annotations from collections import deque import json import math import os import sys import threading import time from datetime import UTC, datetime from pathlib import Path from typing import Any, Iterator PROJECT_ROOT = Path(__file__).resolve().parents[2] WORKSPACE_ROOT = PROJECT_ROOT.parent JPEG_FRAME_DIR = WORKSPACE_ROOT / "RobotDataShow" / "jpeg-frames" # GPS 数据 JSON 文件路径 GEOSTREAM_JSON_PATH = WORKSPACE_ROOT / "GeoStream" / "gps_latest.json" GEOSTREAM_STALE_SECONDS = 15 OMNISOCKET_CONFIG_PATH = PROJECT_ROOT / "config" / "omnisocket_demo.yaml" VIDEO_SOURCE_MODE = os.getenv("VIDEO_SOURCE_MODE", "auto").strip().lower() OMNISOCKET_FRAME_FRESH_SECONDS = 2.0 VIDEO_TIMESTAMP_SAMPLE_SIZE = 10 VIDEO_TIMESTAMP_MAX_SKEW_NS = 7 * 24 * 60 * 60 * 1_000_000_000 def utc_iso_now() -> str: return datetime.now(UTC).isoformat(timespec="seconds").replace("+00:00", "Z") class OmniSocketVideoReceiver: def __init__(self) -> None: self._lock = threading.Lock() self._thread: threading.Thread | None = None self._started = False self._session = None self._session_cls = None self._binary_msg_type = None self._video_defaults: dict[str, Any] = {} self._latest_frame: bytes | None = None self._latest_received_at = 0.0 self._latest_sequence: int | None = None self._latest_latency_ms: float | None = None self._latest_timestamp_unit: str | None = None self._latest_timestamp_endianness: str | None = None self._latency_samples_ms: deque[float] = deque(maxlen=VIDEO_TIMESTAMP_SAMPLE_SIZE) self._frames_received = 0 self._last_error = "" self._load_backend() def _load_backend(self) -> None: # 服务启动时先尝试导入一次 Python/C 扩展。 try: self._import_backend() except Exception as error: # pragma: no cover - 可选的运行时依赖 self._last_error = f"omnisocket import failed: {error}" def _import_backend(self) -> None: # 优先使用已经安装到当前 Python 环境里的 omnisocket。 # 如果导入失败,再尝试样例目录下的本地 python 路径。 try: from omnisocket import MSG_TYPE_BINARY, Session, VIDEO_DEFAULTS # type: ignore except ImportError: python_dir = WORKSPACE_ROOT / "OmniSocketGo" / "python" if python_dir.exists(): sys.path.insert(0, str(python_dir)) from omnisocket import MSG_TYPE_BINARY, Session, VIDEO_DEFAULTS # type: ignore self._binary_msg_type = MSG_TYPE_BINARY self._session_cls = Session self._video_defaults = dict(VIDEO_DEFAULTS) def ensure_started(self) -> None: # 当第一次请求帧或状态时,再懒启动后台接收线程。 if self._session_cls is None or self._binary_msg_type is None: return with self._lock: if self._started: return self._started = True self._thread = threading.Thread( target=self._run, name="omnisocket-video-receiver", daemon=True, ) self._thread.start() def _load_config(self) -> dict[str, Any]: # 这里保持和 SampleCData/omnisocket_video_receiver.py 一样的配置结构: # transport + video_receiver。 # 即使配置文件不存在,也允许回退到默认值继续运行。 config: dict[str, Any] = {} if OMNISOCKET_CONFIG_PATH.exists(): try: try: import yaml # type: ignore with OMNISOCKET_CONFIG_PATH.open("r", encoding="utf-8") as file: config = yaml.safe_load(file) or {} except ImportError: # 当前配置文件结构非常简单,缺少 PyYAML 时用简化解析器兜底。 config = self._load_simple_yaml_config(OMNISOCKET_CONFIG_PATH) except Exception as error: # pragma: no cover - 可选依赖 self._last_error = f"config load failed: {error}" transport_cfg = dict(config.get("transport", {})) video_cfg = dict(config.get("video_receiver", {})) transport_cfg["server_addr"] = os.getenv( "OMNISOCKET_SERVER_ADDR", str(transport_cfg.get("server_addr", "127.0.0.1:10909")), ) transport_cfg["relay_via"] = os.getenv( "OMNISOCKET_RELAY_VIA", str(transport_cfg.get("relay_via", "")), ) transport_cfg["bind_ip"] = os.getenv( "OMNISOCKET_BIND_IP", str(transport_cfg.get("bind_ip", "")), ) transport_cfg["bind_device"] = os.getenv( "OMNISOCKET_BIND_DEVICE", str(transport_cfg.get("bind_device", "")), ) video_cfg["peer_id"] = os.getenv( "OMNISOCKET_VIDEO_PEER_ID", str(video_cfg.get("peer_id", "peer-a-video")), ) video_cfg["buffer_bytes"] = int( os.getenv( "OMNISOCKET_BUFFER_BYTES", str(video_cfg.get("buffer_bytes", 1024 * 1024)), ) ) return { "transport": transport_cfg, "video_receiver": video_cfg, } def _load_simple_yaml_config(self, path: Path) -> dict[str, Any]: parsed: dict[str, Any] = {} current_section: str | None = None with path.open("r", encoding="utf-8") as file: for raw_line in file: line = raw_line.split("#", 1)[0].rstrip() if not line.strip(): continue if not line.startswith(" "): if not line.endswith(":"): raise ValueError(f"invalid top-level yaml line: {raw_line.strip()}") current_section = line[:-1].strip() parsed[current_section] = {} continue if current_section is None: raise ValueError(f"yaml key outside section: {raw_line.strip()}") stripped = line.strip() if ":" not in stripped: raise ValueError(f"invalid yaml key line: {raw_line.strip()}") key, value = stripped.split(":", 1) parsed[current_section][key.strip()] = self._parse_simple_yaml_scalar(value.strip()) return parsed def _parse_simple_yaml_scalar(self, value: str) -> Any: if value in {'""', "''"}: return "" if len(value) >= 2 and value[0] == value[-1] and value[0] in {'"', "'"}: return value[1:-1] if value.lower() == "true": return True if value.lower() == "false": return False if value and value.lstrip("-").isdigit(): return int(value) return value def _connect_session(self): # 这里和样例接收器一致:创建 Session(),然后使用 transport/video 配置建立连接。 assert self._session_cls is not None config = self._load_config() transport_cfg = config.get("transport", {}) video_cfg = config.get("video_receiver", {}) session = self._session_cls() session.connect( server_addr=str(transport_cfg.get("server_addr", "127.0.0.1:10909")), peer_id=str(video_cfg.get("peer_id", "peer-a-video")), relay_via=str(transport_cfg.get("relay_via", "")), bind_ip=str(transport_cfg.get("bind_ip", "")), bind_device=str(transport_cfg.get("bind_device", "")), **self._video_defaults, ) return session, int(video_cfg.get("buffer_bytes", 1024 * 1024)) def _extract_jpeg_payload(self, frame: bytes) -> bytes | None: # 同时兼容两种帧格式: # 1. 纯 JPEG 二进制 # 2. 前 8 字节是序号,后面才是真正的 JPEG 数据 if frame.startswith(b"\xff\xd8"): return frame if len(frame) > 8 and frame[8:10] == b"\xff\xd8": return frame[8:] return None def _extract_jpeg_frame(self, frame: bytes) -> bytes | None: jpeg_payload = self._extract_jpeg_payload(frame) if jpeg_payload is None: return None eoi_index = jpeg_payload.rfind(b"\xff\xd9") if eoi_index < 0: return jpeg_payload return jpeg_payload[: eoi_index + 2] def _extract_sequence(self, frame: bytes) -> int | None: if len(frame) >= 8 and not frame.startswith(b"\xff\xd8"): return int.from_bytes(frame[:8], "big") return None def _extract_frame_tail(self, frame: bytes) -> bytes: jpeg_payload = self._extract_jpeg_payload(frame) if jpeg_payload is None: return b"" eoi_index = jpeg_payload.rfind(b"\xff\xd9") if eoi_index < 0: return b"" trailer_start = eoi_index + 2 if trailer_start >= len(jpeg_payload): return b"" return jpeg_payload[trailer_start:] def _extract_frame_timestamp(self, frame: bytes) -> tuple[int, str, str] | None: trailer = self._extract_frame_tail(frame) if len(trailer) < 8: return None now_ns = time.time_ns() raw_timestamp = trailer[-8:] best_candidate: tuple[int, str, str] | None = None best_distance_ns: int | None = None for endianness in ("big", "little"): value = int.from_bytes(raw_timestamp, endianness, signed=True) if value <= 0: continue for unit, multiplier in ( ("ns", 1), ("us", 1_000), ("ms", 1_000_000), ("s", 1_000_000_000), ): timestamp_ns = value * multiplier distance_ns = abs(now_ns - timestamp_ns) if distance_ns > VIDEO_TIMESTAMP_MAX_SKEW_NS: continue if best_distance_ns is None or distance_ns < best_distance_ns: best_distance_ns = distance_ns best_candidate = (timestamp_ns, unit, endianness) return best_candidate def _run(self) -> None: # 后台持续接收循环: # connect -> recv_into(buffer) -> 按 body_len 截出有效内容 -> 把最新 JPEG 帧缓存在内存里 while True: try: session, buffer_bytes = self._connect_session() self._session = session self._last_error = "" buffer = bytearray(buffer_bytes) while True: # 从 OmniSocket / C 侧收一帧原始二进制数据到缓冲区 meta = session.recv_into(buffer, timeout_ms=1000) if meta is None: continue if meta.get("msg_type") != self._binary_msg_type: continue # 真正收到的有效部分切出来,形成当前这一帧 frame = bytes(buffer[: meta["body_len"]]) jpeg_frame = self._extract_jpeg_frame(frame) if jpeg_frame is None: self._last_error = "received non-JPEG binary frame" continue timestamp_meta = self._extract_frame_timestamp(frame) latency_ms = None if timestamp_meta is not None: timestamp_ns, unit, endianness = timestamp_meta latency_ms = round((time.time_ns() - timestamp_ns) / 1_000_000, 3) else: unit = None endianness = None with self._lock: # 缓存:这里只保留最新的一张 JPEG 帧,供 Web 接口直接返回给前端。 self._latest_frame = jpeg_frame self._latest_received_at = time.time() self._latest_sequence = self._extract_sequence(frame) self._latest_latency_ms = latency_ms self._latest_timestamp_unit = unit self._latest_timestamp_endianness = endianness if latency_ms is not None: self._latency_samples_ms.append(latency_ms) self._frames_received += 1 except Exception as error: # pragma: no cover - 运行时集成路径 self._last_error = str(error) time.sleep(2) finally: if self._session is not None: try: self._session.close() except Exception: pass self._session = None def get_latest_frame(self) -> bytes | None: # 把内存里最新的一张真实 JPEG 帧暴露给 Django 视图层。 # 如果这张帧已经过旧,就返回 None,让上层回退到本地模拟帧。 self.ensure_started() with self._lock: if self._latest_frame is None: return None if time.time() - self._latest_received_at > OMNISOCKET_FRAME_FRESH_SECONDS: return None return self._latest_frame def get_status(self) -> dict[str, Any]: self.ensure_started() config = self._load_config() transport_cfg = config.get("transport", {}) video_cfg = config.get("video_receiver", {}) with self._lock: has_recent_frame = self._latest_frame is not None and ( time.time() - self._latest_received_at <= OMNISOCKET_FRAME_FRESH_SECONDS ) if has_recent_frame and self._latest_latency_ms is not None: timing_status = { "available": True, "latest_delta_ms": self._latest_latency_ms, "delta_samples_ms": list(reversed(self._latency_samples_ms)), "sample_count": len(self._latency_samples_ms), "sample_window_size": VIDEO_TIMESTAMP_SAMPLE_SIZE, "timestamp_unit": self._latest_timestamp_unit, "timestamp_endianness": self._latest_timestamp_endianness, } else: timing_status = { "available": False, "latest_delta_ms": None, "delta_samples_ms": [], "sample_count": 0, "sample_window_size": VIDEO_TIMESTAMP_SAMPLE_SIZE, "timestamp_unit": None, "timestamp_endianness": None, } return { "backend_ready": self._session_cls is not None, "mode": VIDEO_SOURCE_MODE, "connected": self._session is not None, "has_recent_frame": has_recent_frame, "frames_received": self._frames_received, "latest_sequence": self._latest_sequence, "last_error": self._last_error, "config_path": str(OMNISOCKET_CONFIG_PATH), "server_addr": str(transport_cfg.get("server_addr", "")), "relay_via": str(transport_cfg.get("relay_via", "")), "peer_id": str(video_cfg.get("peer_id", "")), "buffer_bytes": int(video_cfg.get("buffer_bytes", 0)), "timing": timing_status, } class VideoFrameService: def __init__(self) -> None: self._receiver = OmniSocketVideoReceiver() def get_status(self) -> dict[str, Any]: receiver_status = self._receiver.get_status() receiver_frame = self._receiver.get_latest_frame() # 如果已经收到了真实视频帧,就把当前状态标记为实时模式,给前端显示。 if receiver_frame is not None: return { "available": True, "source_mode": "omnisocket-jpeg-live", "frame_count": receiver_status["frames_received"], "fps": 30, "frame_dir": str(JPEG_FRAME_DIR), "source_detail": f"peer stream active, frames={receiver_status['frames_received']}", "receiver": receiver_status, "timing": receiver_status["timing"], } wait_detail = receiver_status["last_error"] or ( "未实时获取真实值,请检查 OmniSocket 服务、视频发送端和接收配置。" ) return { "available": False, "source_mode": "omnisocket-waiting", "frame_count": receiver_status["frames_received"], "fps": 30, "frame_dir": str(JPEG_FRAME_DIR), "source_detail": wait_detail, "receiver": receiver_status, "timing": receiver_status["timing"], } def get_next_frame(self) -> bytes: # 优先返回从 Python/C 视频接收器拿到的最新真实 JPEG 帧。 receiver_frame = self._receiver.get_latest_frame() if receiver_frame is not None: return receiver_frame raise RuntimeError("未实时获取真实值,当前没有可用的真实 JPEG 帧。") def iter_mjpeg(self, fps: float = 6.0) -> Iterator[bytes]: frame_interval = 1.0 / max(1.0, min(fps, 30.0)) while True: frame = self.get_next_frame() header = ( b"--frame\r\n" b"Content-Type: image/jpeg\r\n" + f"Content-Length: {len(frame)}\r\n\r\n".encode("ascii") ) yield header + frame + b"\r\n" time.sleep(frame_interval) class GpsDataService: def get_latest(self) -> dict[str, Any]: # 优先使用由 GeoStream C 解析器生成的最新数据包。 payload = self._read_geostream_payload() if payload is not None: payload["source_mode"] = "geostream-json" payload["updated_at"] = utc_iso_now() return payload # 当没有最新的 GPS 文件可用时,退回到使用一个移动的演示点位。 return self._build_simulated_payload() def _read_geostream_payload(self) -> dict[str, Any] | None: # Django 后端目前还不直接解析串口数据流。 # 它只读取由 GeoStream/parse_gps.c 生成的 JSON 文件。 if not GEOSTREAM_JSON_PATH.exists(): return None # 忽略过期的文件,以便 UI 可以退回到使用模拟数据, # 而不是把旧位置当作实时位置来显示。 age_seconds = time.time() - GEOSTREAM_JSON_PATH.stat().st_mtime if age_seconds > GEOSTREAM_STALE_SECONDS: return None try: with GEOSTREAM_JSON_PATH.open("r", encoding="utf-8") as file: return json.load(file) except (OSError, json.JSONDecodeError): return None def _build_simulated_payload(self) -> dict[str, Any]: # 为本地 UI 开发构建一个平滑的伪造轨迹。 tick = time.time() / 12.0 latitude = 31.2304 + math.sin(tick) * 0.0014 longitude = 121.4737 + math.cos(tick) * 0.0018 return { "has_fix": True, "utc_time": datetime.now(UTC).strftime("%H:%M:%S"), "latitude": round(latitude, 6), "longitude": round(longitude, 6), "satellites": 14 + int((math.sin(tick * 0.7) + 1.0) * 2), "altitude_m": round(6.5 + math.cos(tick * 0.5) * 1.2, 2), "coordinate_system": "WGS84", "source_sentence": "SIMULATED", "raw_coordinate_format": "decimal degrees", "source_mode": "simulated", "updated_at": utc_iso_now(), } class NetworkTelemetryService: def get_latest(self) -> dict[str, Any]: tick = time.time() latency_ms = 28 + math.sin(tick / 4.0) * 6 jitter_ms = 3 + math.cos(tick / 5.0) * 1.4 tx_kbps = 780 + math.sin(tick / 6.0) * 160 rx_kbps = 720 + math.cos(tick / 7.0) * 140 packet_loss_pct = max(0.0, 0.35 + math.sin(tick / 9.0) * 0.25) signal_dbm = -53 - abs(math.sin(tick / 8.0) * 7) return { "peer_status": "online", "latency_ms": round(latency_ms, 1), "jitter_ms": round(jitter_ms, 1), "packet_loss_pct": round(packet_loss_pct, 2), "tx_kbps": int(tx_kbps), "rx_kbps": int(rx_kbps), "signal_dbm": round(signal_dbm, 1), "transport": "OmniSocket / simulated", "source_mode": "simulated", "updated_at": utc_iso_now(), } video_service = VideoFrameService() gps_service = GpsDataService() network_service = NetworkTelemetryService()