from __future__ import annotations import json import math import os import sys import threading import time from datetime import UTC, datetime from pathlib import Path from typing import Any, Iterator PROJECT_ROOT = Path(__file__).resolve().parents[2] WORKSPACE_ROOT = PROJECT_ROOT.parent JPEG_FRAME_DIR = WORKSPACE_ROOT / "RobotDataShow" / "jpeg-frames" # GPS 数据 JSON 文件路径 GEOSTREAM_JSON_PATH = WORKSPACE_ROOT / "GeoStream" / "gps_latest.json" GEOSTREAM_STALE_SECONDS = 15 SAMPLE_CDATA_DIR = PROJECT_ROOT / "SampleCData" CONFIG_DIR = PROJECT_ROOT / "config" PRIMARY_OMNISOCKET_CONFIG_PATH = CONFIG_DIR / "omnisocket_demo.yaml" LEGACY_OMNISOCKET_CONFIG_PATH = SAMPLE_CDATA_DIR / "config" / "omnisocket_demo.yaml" VIDEO_SOURCE_MODE = os.getenv("VIDEO_SOURCE_MODE", "auto").strip().lower() OMNISOCKET_FRAME_FRESH_SECONDS = 2.0 def utc_iso_now() -> str: return datetime.now(UTC).isoformat(timespec="seconds").replace("+00:00", "Z") def resolve_omnisocket_config_path() -> Path: if PRIMARY_OMNISOCKET_CONFIG_PATH.exists(): return PRIMARY_OMNISOCKET_CONFIG_PATH return LEGACY_OMNISOCKET_CONFIG_PATH class OmniSocketVideoReceiver: def __init__(self) -> None: self._lock = threading.Lock() self._thread: threading.Thread | None = None self._started = False self._session = None self._session_cls = None self._binary_msg_type = None self._video_defaults: dict[str, Any] = {} self._latest_frame: bytes | None = None self._latest_received_at = 0.0 self._latest_sequence: int | None = None self._frames_received = 0 self._last_error = "" self._load_backend() def _load_backend(self) -> None: # 服务启动时先尝试导入一次 Python/C 扩展。 try: self._import_backend() except Exception as error: # pragma: no cover - 可选的运行时依赖 self._last_error = f"omnisocket import failed: {error}" def _import_backend(self) -> None: # 优先使用已经安装到当前 Python 环境里的 omnisocket。 # 如果导入失败,再尝试样例目录下的本地 python 路径。 try: from omnisocket import MSG_TYPE_BINARY, Session, VIDEO_DEFAULTS # type: ignore except ImportError: python_dir = SAMPLE_CDATA_DIR / "python" if python_dir.exists(): sys.path.insert(0, str(python_dir)) from omnisocket import MSG_TYPE_BINARY, Session, VIDEO_DEFAULTS # type: ignore self._binary_msg_type = MSG_TYPE_BINARY self._session_cls = Session self._video_defaults = dict(VIDEO_DEFAULTS) def ensure_started(self) -> None: # 当第一次请求帧或状态时,再懒启动后台接收线程。 if VIDEO_SOURCE_MODE == "sample": return if self._session_cls is None or self._binary_msg_type is None: return with self._lock: if self._started: return self._started = True self._thread = threading.Thread( target=self._run, name="omnisocket-video-receiver", daemon=True, ) self._thread.start() def _load_config(self) -> dict[str, Any]: # 这里保持和 SampleCData/omnisocket_video_receiver.py 一样的配置结构: # transport + video_receiver。 # 即使配置文件不存在,也允许回退到默认值继续运行。 config: dict[str, Any] = {} config_path = resolve_omnisocket_config_path() if config_path.exists(): try: try: import yaml # type: ignore with config_path.open("r", encoding="utf-8") as file: config = yaml.safe_load(file) or {} except ImportError: # 如果当前环境没有 PyYAML,就用一个足够支撑当前 demo 配置的简化解析器。 config = self._load_simple_yaml_config(config_path) except Exception as error: # pragma: no cover - 可选依赖 self._last_error = f"config load failed: {error}" transport_cfg = dict(config.get("transport", {})) video_cfg = dict(config.get("video_receiver", {})) transport_cfg["server_addr"] = os.getenv( "OMNISOCKET_SERVER_ADDR", str(transport_cfg.get("server_addr", "127.0.0.1:10909")), ) transport_cfg["relay_via"] = os.getenv( "OMNISOCKET_RELAY_VIA", str(transport_cfg.get("relay_via", "")), ) transport_cfg["bind_ip"] = os.getenv( "OMNISOCKET_BIND_IP", str(transport_cfg.get("bind_ip", "")), ) transport_cfg["bind_device"] = os.getenv( "OMNISOCKET_BIND_DEVICE", str(transport_cfg.get("bind_device", "")), ) video_cfg["peer_id"] = os.getenv( "OMNISOCKET_VIDEO_PEER_ID", str(video_cfg.get("peer_id", "peer-a-video")), ) video_cfg["buffer_bytes"] = int( os.getenv( "OMNISOCKET_BUFFER_BYTES", str(video_cfg.get("buffer_bytes", 1024 * 1024)), ) ) return { "transport": transport_cfg, "video_receiver": video_cfg, } def _load_simple_yaml_config(self, path: Path) -> dict[str, Any]: parsed: dict[str, Any] = {} current_section: str | None = None with path.open("r", encoding="utf-8") as file: for raw_line in file: line = raw_line.split("#", 1)[0].rstrip() if not line.strip(): continue if not line.startswith(" "): if not line.endswith(":"): raise ValueError(f"invalid top-level yaml line: {raw_line.strip()}") current_section = line[:-1].strip() parsed[current_section] = {} continue if current_section is None: raise ValueError(f"yaml key outside section: {raw_line.strip()}") stripped = line.strip() if ":" not in stripped: raise ValueError(f"invalid yaml key line: {raw_line.strip()}") key, value = stripped.split(":", 1) parsed[current_section][key.strip()] = self._parse_simple_yaml_scalar(value.strip()) return parsed def _parse_simple_yaml_scalar(self, value: str) -> Any: if value in {'""', "''"}: return "" if len(value) >= 2 and value[0] == value[-1] and value[0] in {'"', "'"}: return value[1:-1] if value.lower() == "true": return True if value.lower() == "false": return False if value and value.lstrip("-").isdigit(): return int(value) return value def _connect_session(self): # 这里和样例接收器一致:创建 Session(),然后使用 transport/video 配置建立连接。 assert self._session_cls is not None config = self._load_config() transport_cfg = config.get("transport", {}) video_cfg = config.get("video_receiver", {}) session = self._session_cls() session.connect( server_addr=str(transport_cfg.get("server_addr", "127.0.0.1:10909")), peer_id=str(video_cfg.get("peer_id", "peer-a-video")), relay_via=str(transport_cfg.get("relay_via", "")), bind_ip=str(transport_cfg.get("bind_ip", "")), bind_device=str(transport_cfg.get("bind_device", "")), **self._video_defaults, ) return session, int(video_cfg.get("buffer_bytes", 1024 * 1024)) def _extract_jpeg_frame(self, frame: bytes) -> bytes | None: # 同时兼容两种帧格式: # 1. 纯 JPEG 二进制 # 2. 前 8 字节是序号,后面才是真正的 JPEG 数据 if frame.startswith(b"\xff\xd8"): return frame if len(frame) > 8 and frame[8:10] == b"\xff\xd8": return frame[8:] return None def _extract_sequence(self, frame: bytes) -> int | None: if len(frame) >= 8 and not frame.startswith(b"\xff\xd8"): return int.from_bytes(frame[:8], "big") return None def _run(self) -> None: # 后台持续接收循环: # connect -> recv_into(buffer) -> 按 body_len 截出有效内容 -> 把最新 JPEG 帧缓存在内存里 while True: try: session, buffer_bytes = self._connect_session() self._session = session self._last_error = "" buffer = bytearray(buffer_bytes) while True: # 从 OmniSocket / C 侧收一帧原始二进制数据到缓冲区 meta = session.recv_into(buffer, timeout_ms=1000) if meta is None: continue if meta.get("msg_type") != self._binary_msg_type: continue # 真正收到的有效部分切出来,形成当前这一帧 frame = bytes(buffer[: meta["body_len"]]) jpeg_frame = self._extract_jpeg_frame(frame) if jpeg_frame is None: self._last_error = "received non-JPEG binary frame" continue with self._lock: # 这里只保留最新的一张 JPEG 帧,供 Web 接口直接返回给前端。 self._latest_frame = jpeg_frame self._latest_received_at = time.time() self._latest_sequence = self._extract_sequence(frame) self._frames_received += 1 except Exception as error: # pragma: no cover - 运行时集成路径 self._last_error = str(error) time.sleep(2) finally: if self._session is not None: try: self._session.close() except Exception: pass self._session = None def get_latest_frame(self) -> bytes | None: # 把内存里最新的一张真实 JPEG 帧暴露给 Django 视图层。 # 如果这张帧已经过旧,就返回 None,让上层回退到本地模拟帧。 self.ensure_started() with self._lock: if self._latest_frame is None: return None if time.time() - self._latest_received_at > OMNISOCKET_FRAME_FRESH_SECONDS: return None return self._latest_frame def get_status(self) -> dict[str, Any]: self.ensure_started() config = self._load_config() config_path = resolve_omnisocket_config_path() transport_cfg = config.get("transport", {}) video_cfg = config.get("video_receiver", {}) with self._lock: has_recent_frame = self._latest_frame is not None and ( time.time() - self._latest_received_at <= OMNISOCKET_FRAME_FRESH_SECONDS ) return { "backend_ready": self._session_cls is not None, "mode": VIDEO_SOURCE_MODE, "connected": self._session is not None, "has_recent_frame": has_recent_frame, "frames_received": self._frames_received, "latest_sequence": self._latest_sequence, "last_error": self._last_error, "config_path": str(config_path), "server_addr": str(transport_cfg.get("server_addr", "")), "relay_via": str(transport_cfg.get("relay_via", "")), "peer_id": str(video_cfg.get("peer_id", "")), "buffer_bytes": int(video_cfg.get("buffer_bytes", 0)), } class VideoFrameService: def __init__(self) -> None: self._lock = threading.Lock() self._frame_paths = sorted(JPEG_FRAME_DIR.glob("*.jpg")) self._index = 0 self._receiver = OmniSocketVideoReceiver() def get_status(self) -> dict[str, Any]: receiver_status = self._receiver.get_status() receiver_frame = self._receiver.get_latest_frame() # 如果已经收到了真实视频帧,就把当前状态标记为实时模式,给前端显示。 if receiver_frame is not None: return { "available": True, "source_mode": "omnisocket-jpeg-live", "frame_count": receiver_status["frames_received"], "fps": 30, "frame_dir": str(JPEG_FRAME_DIR), "source_detail": f"peer stream active, frames={receiver_status['frames_received']}", "receiver": receiver_status, } # 强制实时模式时,如果还没收到真实帧,就明确告诉前端“正在等待”, # 不再悄悄回退到本地样例图,避免调试时误以为链路已接通。 if VIDEO_SOURCE_MODE == "omnisocket": wait_detail = receiver_status["last_error"] or ( "等待 OmniSocket 实时 JPEG 帧," f"接收端 peer_id={receiver_status['peer_id']}," f"服务器={receiver_status['server_addr']}" ) return { "available": False, "source_mode": "omnisocket-waiting", "frame_count": receiver_status["frames_received"], "fps": 30, "frame_dir": str(JPEG_FRAME_DIR), "source_detail": wait_detail, "receiver": receiver_status, } return { "available": bool(self._frame_paths), "source_mode": "sample-jpeg-frame-loop" if self._frame_paths else "unavailable", "frame_count": len(self._frame_paths), "fps": 30, "frame_dir": str(JPEG_FRAME_DIR), "source_detail": receiver_status["last_error"] or "fallback to local sample frames", "receiver": receiver_status, } def get_next_frame(self) -> bytes: # 优先返回从 Python/C 视频接收器拿到的最新真实 JPEG 帧。 receiver_frame = self._receiver.get_latest_frame() if receiver_frame is not None: return receiver_frame # 强制实时模式下,如果还没收到真实帧,直接报错, # 这样前端就能明确知道实时链路还没接通。 if VIDEO_SOURCE_MODE == "omnisocket": raise RuntimeError( "OmniSocket 实时 JPEG 帧暂未就绪,请检查 server_addr、peer_id、" "target_peer,以及视频发送端是否已经启动。" ) # 如果当前没有真实帧,就回退到本地演示 JPEG 文件,保证页面仍然可用。 if not self._frame_paths: raise FileNotFoundError(f"No JPEG frames found in {JPEG_FRAME_DIR}") with self._lock: frame_path = self._frame_paths[self._index] self._index = (self._index + 1) % len(self._frame_paths) return frame_path.read_bytes() def iter_mjpeg(self, fps: float = 6.0) -> Iterator[bytes]: frame_interval = 1.0 / max(1.0, min(fps, 30.0)) while True: frame = self.get_next_frame() header = ( b"--frame\r\n" b"Content-Type: image/jpeg\r\n" + f"Content-Length: {len(frame)}\r\n\r\n".encode("ascii") ) yield header + frame + b"\r\n" time.sleep(frame_interval) class GpsDataService: def get_latest(self) -> dict[str, Any]: # 优先使用由 GeoStream C 解析器生成的最新数据包。 payload = self._read_geostream_payload() if payload is not None: payload["source_mode"] = "geostream-json" payload["updated_at"] = utc_iso_now() return payload # 当没有最新的 GPS 文件可用时,退回到使用一个移动的演示点位。 return self._build_simulated_payload() def _read_geostream_payload(self) -> dict[str, Any] | None: # Django 后端目前还不直接解析串口数据流。 # 它只读取由 GeoStream/parse_gps.c 生成的 JSON 文件。 if not GEOSTREAM_JSON_PATH.exists(): return None # 忽略过期的文件,以便 UI 可以退回到使用模拟数据, # 而不是把旧位置当作实时位置来显示。 age_seconds = time.time() - GEOSTREAM_JSON_PATH.stat().st_mtime if age_seconds > GEOSTREAM_STALE_SECONDS: return None try: with GEOSTREAM_JSON_PATH.open("r", encoding="utf-8") as file: return json.load(file) except (OSError, json.JSONDecodeError): return None def _build_simulated_payload(self) -> dict[str, Any]: # 为本地 UI 开发构建一个平滑的伪造轨迹。 tick = time.time() / 12.0 latitude = 31.2304 + math.sin(tick) * 0.0014 longitude = 121.4737 + math.cos(tick) * 0.0018 return { "has_fix": True, "utc_time": datetime.now(UTC).strftime("%H:%M:%S"), "latitude": round(latitude, 6), "longitude": round(longitude, 6), "satellites": 14 + int((math.sin(tick * 0.7) + 1.0) * 2), "altitude_m": round(6.5 + math.cos(tick * 0.5) * 1.2, 2), "coordinate_system": "WGS84", "source_sentence": "SIMULATED", "raw_coordinate_format": "decimal degrees", "source_mode": "simulated", "updated_at": utc_iso_now(), } class NetworkTelemetryService: def get_latest(self) -> dict[str, Any]: tick = time.time() latency_ms = 28 + math.sin(tick / 4.0) * 6 jitter_ms = 3 + math.cos(tick / 5.0) * 1.4 tx_kbps = 780 + math.sin(tick / 6.0) * 160 rx_kbps = 720 + math.cos(tick / 7.0) * 140 packet_loss_pct = max(0.0, 0.35 + math.sin(tick / 9.0) * 0.25) signal_dbm = -53 - abs(math.sin(tick / 8.0) * 7) return { "peer_status": "online", "latency_ms": round(latency_ms, 1), "jitter_ms": round(jitter_ms, 1), "packet_loss_pct": round(packet_loss_pct, 2), "tx_kbps": int(tx_kbps), "rx_kbps": int(rx_kbps), "signal_dbm": round(signal_dbm, 1), "transport": "OmniSocket / simulated", "source_mode": "simulated", "updated_at": utc_iso_now(), } video_service = VideoFrameService() gps_service = GpsDataService() network_service = NetworkTelemetryService()