Compare commits

..

2 Commits

Author SHA1 Message Date
nnbcccscdscdsc
2f507a7546 fix:修改默认网路只有5G模组问题 2026-04-13 17:22:18 +08:00
nnbcccscdscdsc
7dc47d310d feat:5g代码迁移&修改配置文件 2026-04-13 15:55:25 +08:00
7 changed files with 995 additions and 32 deletions

3
.gitignore vendored
View File

@@ -22,3 +22,6 @@ c/bin
/.venv
**/build/
ros-control-py/install
ros-control-py/log

View File

@@ -7,23 +7,64 @@ source "${SCRIPT_DIR}/common.sh"
STEP="5g-dial"
run_dial() {
local rc
append_route_targets() {
local raw_list="$1"
local target
export TERM="${TERM:-xterm}"
export LANG="${LANG:-C.UTF-8}"
export LC_ALL="${LC_ALL:-C.UTF-8}"
blitz_log "${STEP}" "dial-env" "start" "TERM=${TERM} LANG=${LANG} LC_ALL=${LC_ALL} interface=${BLITZ_5G_INTERFACE}" 0
if blitz_run "${STEP}" "dial" python3 rndis_dial.py --serial-port "${BLITZ_5G_SERIAL_PORT}" --interface "${BLITZ_5G_INTERFACE}"; then
if [[ -z "${raw_list}" ]]; then
return 0
fi
rc=$?
blitz_log "${STEP}" "dial-retry" "start" "first dial attempt failed rc=${rc}; retrying after 3s" "${rc}"
sleep 3
blitz_run "${STEP}" "dial-retry" python3 rndis_dial.py --serial-port "${BLITZ_5G_SERIAL_PORT}" --interface "${BLITZ_5G_INTERFACE}"
for target in ${raw_list//,/ }; do
if [[ -z "${target}" ]]; then
continue
fi
dial_cmd+=(--route-target "${target}")
done
}
read_detected_interface() {
local info_json="$1"
if [[ ! -f "${info_json}" ]]; then
return 1
fi
python3 -c 'import json, sys; print((json.load(open(sys.argv[1], encoding="utf-8")).get("interface") or "").strip())' "${info_json}"
}
disable_interfaces() {
local raw_list="$1"
local iface
local nmcli_available=0
if [[ -z "${raw_list}" ]]; then
return 0
fi
if command -v nmcli >/dev/null 2>&1; then
nmcli_available=1
fi
for iface in ${raw_list//,/ }; do
if [[ -z "${iface}" ]]; then
continue
fi
blitz_log "${STEP}" "disable-interface" "start" "iface=${iface}" 0
if [[ "${nmcli_available}" -eq 1 ]]; then
nmcli device disconnect "${iface}" >/dev/null 2>&1 || true
fi
if ip link show dev "${iface}" >/dev/null 2>&1; then
if ip link set dev "${iface}" down; then
blitz_log "${STEP}" "disable-interface" "success" "iface=${iface}" 0
else
rc=$?
blitz_log "${STEP}" "disable-interface" "failure" "iface=${iface}" "${rc}"
return "${rc}"
fi
else
blitz_log "${STEP}" "disable-interface" "success" "iface=${iface} not present, skipping" 0
fi
done
}
wait_for_serial() {
@@ -81,22 +122,59 @@ if [[ -z "${BLITZ_TIME_SERVER_IP}" ]]; then
blitz_log "${STEP}" "precheck" "failure" "BLITZ_TIME_SERVER_IP is empty and no fallback could be derived" 1
exit 1
fi
if [[ -z "${BLITZ_5G_INTERFACE:-}" ]]; then
blitz_log "${STEP}" "precheck" "failure" "BLITZ_5G_INTERFACE must not be empty" 1
exit 1
fi
route_output="$(blitz_route_ready "${BLITZ_TIME_SERVER_IP}" "${BLITZ_5G_INTERFACE}" || true)"
if [[ -n "${route_output}" ]]; then
blitz_log "${STEP}" "dial" "already_up" "target_ip=${BLITZ_TIME_SERVER_IP} interface=${BLITZ_5G_INTERFACE} route=${route_output}" 0
exit 0
disable_interfaces "${BLITZ_5G_DISABLE_INTERFACES:-}"
if [[ -n "${BLITZ_5G_INTERFACE:-}" ]]; then
route_output="$(blitz_route_ready "${BLITZ_TIME_SERVER_IP}" "${BLITZ_5G_INTERFACE}" || true)"
if [[ -n "${route_output}" ]]; then
blitz_log "${STEP}" "dial" "already_up" "target_ip=${BLITZ_TIME_SERVER_IP} interface=${BLITZ_5G_INTERFACE} route=${route_output}" 0
exit 0
fi
else
blitz_log "${STEP}" "route-check" "info" "BLITZ_5G_INTERFACE is empty, skipping pre-dial route shortcut and using auto-detect mode" 0
fi
wait_for_serial "${BLITZ_5G_SERIAL_PORT}" "${BLITZ_5G_SERIAL_WAIT_SEC}"
dial_cmd=(
python3
rndis_dial.py
--serial-port "${BLITZ_5G_SERIAL_PORT}"
--modem-subnet "${BLITZ_5G_MODEM_SUBNET}"
)
if [[ -n "${BLITZ_5G_INTERFACE:-}" ]]; then
dial_cmd+=(--interface "${BLITZ_5G_INTERFACE}")
fi
case "${BLITZ_5G_SKIP_DHCP:-0}" in
1|true|TRUE|yes|YES)
dial_cmd+=(--skip-dhcp)
;;
esac
case "${BLITZ_5G_REMOVE_DEFAULT_ROUTE:-1}" in
1|true|TRUE|yes|YES)
dial_cmd+=(--remove-default-route --gateway "${BLITZ_5G_GATEWAY}" --route-target "${BLITZ_TIME_SERVER_IP}")
append_route_targets "${BLITZ_5G_ROUTE_TARGETS:-}"
;;
esac
pushd "${BLITZ_5G_DIAL_DIR}" >/dev/null
run_dial
blitz_run "${STEP}" "dial" "${dial_cmd[@]}"
popd >/dev/null
wait_for_route "${BLITZ_TIME_SERVER_IP}" "${BLITZ_5G_ROUTE_WAIT_SEC}" "${BLITZ_5G_INTERFACE}"
blitz_log "${STEP}" "complete" "success" "5G dial completed and route is ready on ${BLITZ_5G_INTERFACE}" 0
resolved_interface="${BLITZ_5G_INTERFACE:-}"
if [[ -z "${resolved_interface}" ]]; then
resolved_interface="$(read_detected_interface "${BLITZ_5G_INFO_JSON}" || true)"
if [[ -n "${resolved_interface}" ]]; then
blitz_log "${STEP}" "resolve-interface" "success" "resolved interface from ${BLITZ_5G_INFO_JSON}: ${resolved_interface}" 0
else
blitz_log "${STEP}" "resolve-interface" "failure" "failed to read detected interface from ${BLITZ_5G_INFO_JSON}" 1
fi
fi
if [[ -n "${resolved_interface}" ]]; then
wait_for_route "${BLITZ_TIME_SERVER_IP}" "${BLITZ_5G_ROUTE_WAIT_SEC}" "${resolved_interface}"
blitz_log "${STEP}" "complete" "success" "5G dial completed and route is ready on ${resolved_interface}" 0
else
blitz_log "${STEP}" "complete" "success" "5G dial completed but route wait was skipped because no interface could be resolved; refer to rndis_dial.py logs" 0
fi

View File

@@ -66,7 +66,7 @@ timestamp | step | action | result | details | exit_code
- `robot-boot.env.local`:本机覆盖配置,建议把你自己的配置写这里
- `common.sh`:公共环境加载和统一日志函数
- `boot-gate.sh`:启动闸门,当前逻辑是固定等待 30 秒
- `5g-dial.sh`:等待 5G 串口出现,执行 `rndis_dial.py`检查路由是否真的起来
- `5g-dial.sh`:等待 5G 串口出现,执行 `rndis_dial.py`删除 5G 默认路由并补齐目标主机路由,然后检查路由是否真的起来
- `time-sync.sh`:把 `chrony` 指向白名单服务器 IP 和端口,并执行一次同步
- `start-ros-receiver-service.sh`:开机版 ROS receiver 启动包装
- `wait-for-unix-socket.sh`:等待 ROS receiver 建好本地 unix socket
@@ -178,6 +178,9 @@ BLITZ_LOG_FILE="/var/log/blitz-robot/startup.log"
BLITZ_5G_DIAL_DIR="/home/nvidia/5g-test/5G"
BLITZ_5G_SERIAL_PORT="/dev/ttyUSB7"
BLITZ_5G_GATEWAY="192.168.225.1"
BLITZ_5G_REMOVE_DEFAULT_ROUTE="1"
BLITZ_5G_ROUTE_TARGETS="106.55.173.235"
BLITZ_TIME_SERVER_IP="你的白名单云服务器IP"
BLITZ_TIME_SERVER_PORT="10910"
@@ -187,6 +190,8 @@ BLITZ_ROS_USER="nvidia"
如果 `BLITZ_TIME_SERVER_IP` 留空,脚本会自动回退到 `ROBOT_SIDE_OMNISOCKET_SERVER_ADDR` 的 IP 部分。
`BLITZ_5G_REMOVE_DEFAULT_ROUTE="1"` 时,脚本会在 5G 拨号完成后删除该接口上的默认路由,避免整机默认出口切到 5G。此时 `BLITZ_TIME_SERVER_IP``BLITZ_5G_ROUTE_TARGETS` 中的目标 IP 会显式走 5G其它流量继续走有线或 Wi-Fi 的默认路由。
## 如何安装和使用
下面假设你当前目录就在 `OmniSocketGo` 仓库根目录。
@@ -272,7 +277,8 @@ systemctl status blitz-b-side-omnid.service
```bash
journalctl -u blitz-robot.target -u blitz-boot-gate.service -u blitz-5g-dial.service \
-u blitz-time-sync.service -u blitz-ros-receiver.service -u blitz-b-side-omnid.service -f
-u blitz-time-sync.service -u blitz-ros-receiver.service \
-u blitz-b-side-omnid.service -f
```
## 当前时钟同步会做什么

View File

@@ -52,9 +52,16 @@ blitz_load_boot_env() {
export BLITZ_BOOT_DELAY_SEC="${BLITZ_BOOT_DELAY_SEC:-30}"
export BLITZ_LOG_FILE="${BLITZ_LOG_FILE:-/var/log/blitz-robot/startup.log}"
export BLITZ_5G_DIAL_DIR="${BLITZ_5G_DIAL_DIR:-/home/nvidia/5g-test/5G}"
export BLITZ_5G_DIAL_DIR="${BLITZ_5G_DIAL_DIR:-${BOOT_SCRIPT_DIR}}"
export BLITZ_5G_SERIAL_PORT="${BLITZ_5G_SERIAL_PORT:-/dev/ttyUSB7}"
export BLITZ_5G_INTERFACE="${BLITZ_5G_INTERFACE:-eth0}"
export BLITZ_5G_INTERFACE="${BLITZ_5G_INTERFACE:-}"
export BLITZ_5G_MODEM_SUBNET="${BLITZ_5G_MODEM_SUBNET:-192.168.224.0/22}"
export BLITZ_5G_GATEWAY="${BLITZ_5G_GATEWAY:-192.168.225.1}"
export BLITZ_5G_SKIP_DHCP="${BLITZ_5G_SKIP_DHCP:-0}"
export BLITZ_5G_REMOVE_DEFAULT_ROUTE="${BLITZ_5G_REMOVE_DEFAULT_ROUTE:-1}"
export BLITZ_5G_ROUTE_TARGETS="${BLITZ_5G_ROUTE_TARGETS:-106.55.173.235}"
export BLITZ_5G_INFO_JSON="${BLITZ_5G_INFO_JSON:-${BLITZ_5G_DIAL_DIR}/modem_network_info.json}"
export BLITZ_5G_DISABLE_INTERFACES="${BLITZ_5G_DISABLE_INTERFACES:-}"
export BLITZ_5G_SERIAL_WAIT_SEC="${BLITZ_5G_SERIAL_WAIT_SEC:-60}"
export BLITZ_5G_ROUTE_WAIT_SEC="${BLITZ_5G_ROUTE_WAIT_SEC:-30}"
export BLITZ_TIME_SERVER_IP="${BLITZ_TIME_SERVER_IP:-${default_time_server}}"

View File

@@ -0,0 +1,9 @@
{
"interface": "enx78886c7fbd46",
"ipv4": [
"192.168.225.62/22"
],
"ipv6": [
"fe80::a335:b50d:622d:92e8/64"
]
}

854
scripts/boot/rndis_dial.py Normal file
View File

@@ -0,0 +1,854 @@
#!/usr/bin/env python3
"""RM520N-GL RNDIS 自动拨号脚本。
流程:
1. 检测 USB 设备是否存在
2. 打开 AT 口并检查 SIM 状态
3. 配置 RNDIS 模式: AT+QCFG="usbnet",3
4. 重启模块: AT+CFUN=1,1
5. 等待模块重新枚举并识别 5G 网卡
6. 如果网卡还没有 IPv4, 自动尝试 DHCP
用法:
sudo python3 rndis_dial.py
sudo python3 rndis_dial.py --serial-port /dev/ttyUSB7
sudo python3 rndis_dial.py --interface eth0 #指定网口
"""
from __future__ import annotations
import argparse
import errno
import ipaddress
import json
import os
import select
import shlex
import shutil
import subprocess
import sys
import termios
import time
import tty
USB_ID = "2c7c:0801"
DEFAULT_SERIAL_PORT = "/dev/ttyUSB7" #串口设备节点
DEFAULT_BAUD_RATE = 115200
CHECK_INTERVAL = 2
SERIAL_READ_TIMEOUT = 0.2
SERIAL_POLL_INTERVAL = 0.1
SERIAL_SETTLE_DELAY = 0.3
AT_SYNC_RETRIES = 3
AT_SYNC_TIMEOUT = 2.5
# 示例地址 192.168.225.38/22 所在网段。
# 拨号成功后会用这个网段来最终确认哪个接口是 5G 模组。
DEFAULT_MODEM_SUBNET = "192.168.224.0/22"
DEFAULT_MODEM_GATEWAY = "192.168.225.1"
DEFAULT_PUBLIC_TARGETS = ("81.70.156.140", "106.55.173.235")
DEFAULT_INFO_JSON = "modem_network_info.json"
SKIP_INTERFACES = {"lo", "docker0", "l4tbr0"}
BAUD_RATE_MAP = {
9600: termios.B9600,
19200: termios.B19200,
38400: termios.B38400,
57600: termios.B57600,
115200: termios.B115200,
}
def run_cmd(cmd, timeout=30, check=False):
print(f"[CMD] {format_shell_cmd(cmd)}")
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=timeout,
check=False,
)
output = (result.stdout or "") + (result.stderr or "")
if check and result.returncode != 0:
raise RuntimeError(f"命令执行失败: {' '.join(cmd)}\n{output.strip()}")
return result.returncode, output.strip()
def format_shell_cmd(cmd):
"""把命令参数格式化成可直接阅读的 shell 形式。"""
return " ".join(shlex.quote(part) for part in cmd)
def parse_ipv4_address(value):
try:
return str(ipaddress.IPv4Address(value))
except ipaddress.AddressValueError as exc:
raise argparse.ArgumentTypeError(f"无效的 IPv4 地址: {value}") from exc
def dedupe_keep_order(values):
seen = set()
result = []
for value in values:
if value in seen:
continue
seen.add(value)
result.append(value)
return result
def require_root():
if os.geteuid() != 0:
print("[FAIL] 请使用 sudo 运行此脚本")
sys.exit(1)
def require_commands():
missing = [cmd for cmd in ("lsusb", "ip") if shutil.which(cmd) is None]
if missing:
print(f"[FAIL] 缺少系统命令: {', '.join(missing)}")
sys.exit(1)
def usb_device_present():
# 1. 第一次检测 lsusb确认模块已经被系统识别。
"""通过 lsusb 检查模块是否已经被系统识别。"""
code, output = run_cmd(["lsusb"], timeout=10)
if code != 0:
return False, output
for line in output.splitlines():
if USB_ID in line:
return True, line.strip()
return False, output
def wait_for_usb_device(expected_present, timeout):
"""等待模块 USB 设备下线或重新上线。"""
deadline = time.time() + timeout
last_seen = ""
while time.time() < deadline:
present, detail = usb_device_present()
last_seen = detail
if present == expected_present:
return True, detail
time.sleep(CHECK_INTERVAL)
return False, last_seen
def wait_for_path(path, timeout):
"""等待串口节点或其他路径重新出现。"""
deadline = time.time() + timeout
while time.time() < deadline:
if os.path.exists(path):
return True
time.sleep(1)
return False
def normalize_serial_output(text):
"""整理串口原始输出,便于后续匹配关键字。"""
cleaned = text.replace("\r", "\n")
return "\n".join(line for line in cleaned.splitlines() if line.strip()).strip()
def serial_response_complete(text):
if not text:
return False
for line in reversed(text.splitlines()):
stripped = line.strip()
if stripped == "OK":
return True
if "ERROR" in stripped:
return True
return False
class RawSerialSession:
"""使用 Python 标准库直接控制 Linux 串口,尽量贴近 stty/raw 行为。"""
def __init__(self, port, baudrate):
if baudrate not in BAUD_RATE_MAP:
raise RuntimeError(f"不支持的波特率: {baudrate}")
self.port = port
self.fd = None
self._original_attrs = None
try:
self.fd = os.open(port, os.O_RDWR | os.O_NOCTTY | os.O_NONBLOCK)
self._original_attrs = termios.tcgetattr(self.fd)
tty.setraw(self.fd, when=termios.TCSANOW)
attrs = termios.tcgetattr(self.fd)
attrs[0] = 0
attrs[1] = 0
attrs[2] &= ~(termios.PARENB | termios.CSTOPB | termios.CSIZE)
attrs[2] |= termios.CS8 | termios.CLOCAL | termios.CREAD
attrs[3] = 0
attrs[4] = BAUD_RATE_MAP[baudrate]
attrs[5] = BAUD_RATE_MAP[baudrate]
attrs[6][termios.VMIN] = 0
attrs[6][termios.VTIME] = 0
termios.tcsetattr(self.fd, termios.TCSANOW, attrs)
termios.tcflush(self.fd, termios.TCIOFLUSH)
except OSError as exc:
self.close()
raise RuntimeError(f"无法打开串口 {port}: {exc}") from exc
@property
def is_open(self):
return self.fd is not None
def reset_input_buffer(self):
if self.fd is not None:
termios.tcflush(self.fd, termios.TCIFLUSH)
def reset_output_buffer(self):
if self.fd is not None:
termios.tcflush(self.fd, termios.TCOFLUSH)
def write(self, data):
if self.fd is None:
raise OSError("串口未打开")
sent = 0
while sent < len(data):
try:
written = os.write(self.fd, data[sent:])
except BlockingIOError:
time.sleep(SERIAL_POLL_INTERVAL)
continue
if written <= 0:
raise OSError("串口写入返回 0 字节")
sent += written
def flush(self):
if self.fd is not None:
termios.tcdrain(self.fd)
def read_chunk(self, timeout, size=4096):
if self.fd is None:
return b""
ready, _, _ = select.select([self.fd], [], [], timeout)
if not ready:
return b""
try:
return os.read(self.fd, size)
except BlockingIOError:
return b""
def close(self):
if self.fd is None:
return
fd = self.fd
self.fd = None
if self._original_attrs is not None:
try:
termios.tcsetattr(fd, termios.TCSANOW, self._original_attrs)
except termios.error:
pass
os.close(fd)
def read_serial_output(session, timeout, allow_disconnect=False):
"""在给定时间窗口内读取 AT 响应,直到出现结束标记或超时。"""
deadline = time.time() + timeout
chunks = []
saw_terminal_line = False
last_data_time = None
while time.time() < deadline:
try:
chunk = session.read_chunk(timeout=min(SERIAL_READ_TIMEOUT, max(deadline - time.time(), 0)))
except OSError as exc:
if allow_disconnect and exc.errno in (errno.EIO, errno.ENODEV, errno.EBADF):
break
raise RuntimeError(f"读取串口响应失败: {exc}") from exc
if chunk:
chunks.append(chunk.decode(errors="ignore"))
last_data_time = time.time()
current_text = normalize_serial_output("".join(chunks))
if serial_response_complete(current_text):
saw_terminal_line = True
continue
if saw_terminal_line and last_data_time is not None and time.time() - last_data_time >= SERIAL_SETTLE_DELAY:
break
time.sleep(SERIAL_POLL_INTERVAL)
return normalize_serial_output("".join(chunks))
def open_serial_session(port):
"""打开 AT 串口会话,后续在同一连接里顺序发送多条命令。"""
ser = RawSerialSession(port=port, baudrate=DEFAULT_BAUD_RATE)
time.sleep(0.2)
ser.reset_input_buffer()
ser.reset_output_buffer()
return ser
def execute_serial_step(ser, command, expect=None, timeout=3, allow_disconnect=False):
"""在当前串口会话里发送一条 AT 命令并校验响应。"""
print(f"[AT] {command}")
try:
ser.reset_input_buffer()
ser.write((command + "\r").encode())
ser.flush()
except OSError as exc:
raise RuntimeError(f"AT 命令 `{command}` 发送失败: {exc}") from exc
response = read_serial_output(ser, timeout=timeout, allow_disconnect=allow_disconnect)
if response:
print(response)
else:
print("(无响应)")
if "ERROR" in response:
raise RuntimeError(f"AT 命令 `{command}` 执行失败: {response}")
if expect and expect not in response and not allow_disconnect:
raise RuntimeError(f"AT 命令 `{command}` 响应异常: {response or '空响应'}")
return response
def synchronize_at_channel(ser):
"""某些模组 AT 口在刚打开时需要先用 AT 做一次预热。"""
last_error = None
for attempt in range(1, AT_SYNC_RETRIES + 1):
try:
print(f"[INFO] 预热 AT 通道,第 {attempt}")
response = execute_serial_step(ser, "AT", expect="OK", timeout=AT_SYNC_TIMEOUT)
if "OK" in response:
return
except RuntimeError as exc:
last_error = exc
time.sleep(0.5)
if last_error is not None:
raise RuntimeError(
"AT 通道预热失败,请确认串口是否是 AT 命令口,例如 /dev/ttyUSB2"
) from last_error
raise RuntimeError("AT 通道预热失败")
def run_serial_steps(port, steps):
"""在同一个串口会话里顺序执行多条 AT 命令。"""
ser = None
try:
ser = open_serial_session(port)
synchronize_at_channel(ser)
for step in steps:
execute_serial_step(
ser,
step["command"],
expect=step.get("expect"),
timeout=step.get("timeout", 3),
allow_disconnect=step.get("allow_disconnect", False),
)
finally:
if ser is not None and ser.is_open:
ser.close()
def configure_rndis(port):
# 2. 用 Python 串口库在同一会话里顺序执行拨号相关 AT 命令。
"""切换到 RNDIS 模式并触发模块重启。"""
if not wait_for_path(port, timeout=30):
raise RuntimeError(f"串口不存在: {port}")
print(f"[OK] 串口已打开: {port}")
run_serial_steps(
port,
[
{"command": "AT+CPIN?", "expect": "READY", "timeout": 4},
{"command": 'AT+QCFG="usbnet",3', "expect": "OK", "timeout": 5},
{"command": "AT+CFUN=1,1", "timeout": 4, "allow_disconnect": True},
],
)
def get_interfaces():
"""列出当前系统中的接口,过滤明显无关的本地接口。"""
interfaces = []
try:
for name in os.listdir("/sys/class/net"):
if name in SKIP_INTERFACES or is_usb_gadget(name):
continue
interfaces.append(name)
except FileNotFoundError:
return []
return sorted(interfaces)
def is_usb_gadget(iface):
"""过滤 Jetson 自己暴露出去的 gadget 网卡。"""
sysfs_path = f"/sys/class/net/{iface}"
if not os.path.exists(sysfs_path):
return False
return "/gadget/" in os.path.realpath(sysfs_path)
def is_usb_network_interface(iface):
"""判断接口是否来自 USB 设备。"""
device_path = f"/sys/class/net/{iface}/device"
if not os.path.exists(device_path):
return False
real_path = os.path.realpath(device_path)
return "/usb" in real_path
def get_ipv4_addrs():
"""返回所有接口的 IPv4/CIDR 信息。"""
code, output = run_cmd(["ip", "-o", "-4", "addr", "show"], timeout=10)
if code != 0:
return {}
ipv4_addrs = {}
for line in output.splitlines():
parts = line.split()
if len(parts) >= 4:
iface = parts[1]
ipv4_addrs.setdefault(iface, []).append(parts[3])
return ipv4_addrs
def get_ipv6_addrs():
"""返回所有接口的 IPv6/CIDR 信息。"""
code, output = run_cmd(["ip", "-o", "-6", "addr", "show"], timeout=10)
if code != 0:
return {}
ipv6_addrs = {}
for line in output.splitlines():
parts = line.split()
if len(parts) >= 4:
iface = parts[1]
ipv6_addrs.setdefault(iface, []).append(parts[3])
return ipv6_addrs
def interface_priority(iface):
if iface.startswith("wwan"):
return 0
if iface.startswith("enx"):
return 1
if iface.startswith("usb"):
return 2
return 10
def list_usb_network_candidates(explicit_iface=None):
"""列出拨号前可尝试的 USB 网卡候选项。
这里不靠固定网口名确认 5G 模组,只是在还没有 IP 的时候先缩小范围。
真正确认模组接口,会在 DHCP 之后根据 IP 网段判断。
"""
candidates = []
for iface in get_interfaces():
if explicit_iface and iface != explicit_iface:
continue
if not is_usb_network_interface(iface):
continue
candidates.append((interface_priority(iface), iface))
if not candidates:
return []
candidates.sort()
return [iface for _, iface in candidates]
def ip_in_subnet(ip_cidr, subnet):
"""判断接口地址是否落在指定网段内。"""
try:
return ipaddress.ip_interface(ip_cidr).ip in ipaddress.ip_network(subnet, strict=False)
except ValueError:
return False
def find_interface_by_subnet(modem_subnet, explicit_iface=None):
"""拨号成功后,通过 IP 网段确认 5G 模组网卡。"""
candidates = []
for iface, addrs in get_ipv4_addrs().items():
if iface in SKIP_INTERFACES or is_usb_gadget(iface):
continue
if not is_usb_network_interface(iface):
continue
if explicit_iface and iface != explicit_iface:
continue
matched_addrs = [addr for addr in addrs if ip_in_subnet(addr, modem_subnet)]
if matched_addrs:
candidates.append((interface_priority(iface), iface, matched_addrs))
if not candidates:
return None, []
candidates.sort()
_, iface, matched_addrs = candidates[0]
return iface, matched_addrs
def wait_for_usb_candidates(explicit_iface=None, timeout=90):
"""等待模块枚举出 USB 网卡候选项。"""
deadline = time.time() + timeout
while time.time() < deadline:
candidates = list_usb_network_candidates(explicit_iface=explicit_iface)
if candidates:
return candidates
time.sleep(CHECK_INTERVAL)
return []
def bring_interface_up(iface):
code, output = run_cmd(["ip", "link", "set", "dev", iface, "up"], timeout=10)
if code != 0:
raise RuntimeError(f"拉起网卡失败: {iface}\n{output}")
def renew_dhcp(iface):
dhclient = shutil.which("dhclient")
udhcpc = shutil.which("udhcpc")
if dhclient:
print(f"[INFO] 使用 dhclient 为 {iface} 获取 IP")
code, output = run_cmd(["dhclient", "-1", "-v", iface], timeout=45)
return code == 0, output
if udhcpc:
print(f"[INFO] 使用 udhcpc 为 {iface} 获取 IP")
code, output = run_cmd(["udhcpc", "-n", "-q", "-i", iface], timeout=45)
return code == 0, output
return False, "系统中未找到 dhclient 或 udhcpc"
def get_default_routes(iface):
code, output = run_cmd(["ip", "-o", "route", "show", "default", "dev", iface], timeout=10)
if code != 0:
return []
return [line.strip() for line in output.splitlines() if line.strip()]
def resolve_gateway(iface, fallback_gateway):
for route in get_default_routes(iface):
tokens = route.split()
for index, token in enumerate(tokens[:-1]):
if token == "via":
gateway = tokens[index + 1]
print(f"[INFO] 从默认路由检测到 {iface} 网关: {gateway}")
return gateway
print(f"[INFO] 未从默认路由检测到 {iface} 网关,回退到 {fallback_gateway}")
return fallback_gateway
def delete_default_routes(iface):
removed = 0
while True:
routes = get_default_routes(iface)
if not routes:
return removed
deleted_this_round = False
for route in routes:
cmd = ["ip", "route", "del", *route.split()]
code, output = run_cmd(cmd, timeout=10)
if code != 0:
code, output = run_cmd(["ip", "route", "del", "default", "dev", iface], timeout=10)
if code != 0:
raise RuntimeError(f"删除默认路由失败: {iface}\n{output}")
removed += 1
deleted_this_round = True
if not deleted_this_round:
raise RuntimeError(f"未能删除 {iface} 的默认路由")
def install_host_routes(iface, gateway, targets):
for target in dedupe_keep_order(targets):
cmd = ["ip", "route", "replace", f"{target}/32", "via", gateway, "dev", iface]
code, output = run_cmd(cmd, timeout=10)
if code != 0:
raise RuntimeError(f"添加主机路由失败: {target} via {gateway} dev {iface}\n{output}")
print(f"[OK] 已添加主机路由: {target}/32 via {gateway} dev {iface}")
def enforce_route_policy(iface, fallback_gateway, route_targets):
gateway = resolve_gateway(iface, fallback_gateway)
removed = delete_default_routes(iface)
print(f"[OK] 已删除 {iface} 上的 {removed} 条默认路由")
if route_targets:
install_host_routes(iface, gateway, route_targets)
else:
print(f"[WARN] {iface} 未配置任何主机路由目标5G 将不再承载公网流量")
def ensure_ipv4(iface):
"""为指定接口申请 IPv4 地址。"""
ipv4_addrs = get_ipv4_addrs().get(iface, [])
if ipv4_addrs:
return ipv4_addrs
bring_interface_up(iface)
ok, output = renew_dhcp(iface)
if output:
print(output)
if not ok:
return []
return get_ipv4_addrs().get(iface, [])
def acquire_modem_interface(modem_subnet, explicit_iface=None):
"""通过 DHCP + IP 网段识别真正的模组接口。"""
iface, matched_addrs = find_interface_by_subnet(
modem_subnet,
explicit_iface=explicit_iface,
)
if iface:
return iface, matched_addrs
candidates = list_usb_network_candidates(explicit_iface=explicit_iface)
if not candidates:
raise RuntimeError("未找到可尝试 DHCP 的 USB 网卡候选项")
print(f"[INFO] 当前 USB 网卡候选项: {', '.join(candidates)}")
for iface in candidates:
print(f"[INFO] 尝试为 {iface} 获取 IPv4")
ensure_ipv4(iface)
matched_iface, matched_addrs = find_interface_by_subnet(
modem_subnet,
explicit_iface=explicit_iface,
)
if matched_iface:
return matched_iface, matched_addrs
return None, []
def print_interface_status(iface):
# 3. 拨号成功后,打印 ip/ifconfig确认模组网口和地址。
print(f"[OK] 检测到 5G 网卡: {iface}")
code, output = run_cmd(["ip", "-4", "addr", "show", "dev", iface], timeout=10)
if code == 0 and output:
print(output)
if shutil.which("ifconfig"):
code, ifconfig_output = run_cmd(["ifconfig", iface], timeout=10)
if code == 0 and ifconfig_output:
print("\n===== ifconfig =====")
print(ifconfig_output)
def save_interface_info(iface, output_file=DEFAULT_INFO_JSON):
"""把网口名称、IPv4、IPv6 保存到 JSON 文件。"""
data = {
"interface": iface,
"ipv4": get_ipv4_addrs().get(iface, []),
"ipv6": get_ipv6_addrs().get(iface, []),
}
with open(output_file, "w", encoding="utf-8") as json_file:
json.dump(data, json_file, ensure_ascii=False, indent=2)
print(f"[OK] 网口信息已保存到 {output_file}")
def ping_target(iface, target, count=3, timeout=15):
"""通过指定网口 ping 一个目标。"""
code, output = run_cmd(
["ping", "-I", iface, "-c", str(count), "-W", "3", target],
timeout=timeout,
)
return code == 0, output
def print_ping_summary(output):
"""只打印 ping 的关键结果。"""
for line in output.splitlines():
if "packets transmitted" in line or "rtt " in line or "Destination " in line:
print(line)
def verify_connectivity(iface, gateway=DEFAULT_MODEM_GATEWAY, targets=DEFAULT_PUBLIC_TARGETS, retry_interval=3, max_wait=45):
# 4. 最后先 ping 模组网关,再重试公网连通性。
"""先测模组网关,再轮询公网目标地址。"""
ok, output = ping_target(iface, gateway, count=3, timeout=15)
if ok:
print(f"[OK] {iface} 可到达模组网关 {gateway}")
print_ping_summary(output)
else:
print(f"[WARN] {iface} 无法到达模组网关 {gateway}")
if output:
print(output)
return False
deadline = time.time() + max_wait
attempt = 1
while True:
for target in targets:
ok, output = ping_target(iface, target, count=3, timeout=15)
if ok:
print(f"[OK] {iface} 可通过 {target}")
print_ping_summary(output)
return True
print(f"[WARN] 第 {attempt} 次 Ping {target} 失败")
if output:
print_ping_summary(output)
if time.time() >= deadline:
print(f"[WARN] {iface}{max_wait} 秒内仍无法连通 {', '.join(targets)}")
return False
attempt += 1
time.sleep(retry_interval)
def ping_via_interface(iface, targets=DEFAULT_PUBLIC_TARGETS):
"""保留原调用点,内部走完整连通性检查。"""
return verify_connectivity(iface, targets=targets)
def parse_args():
parser = argparse.ArgumentParser(description="RM520N-GL RNDIS 自动拨号脚本")
parser.add_argument(
"--serial-port",
default=DEFAULT_SERIAL_PORT,
help=f"AT 串口路径,默认 {DEFAULT_SERIAL_PORT}",
)
parser.add_argument(
"--interface",
help="指定期望的 5G 网卡名,例如 eth0",
)
parser.add_argument(
"--modem-subnet",
default=DEFAULT_MODEM_SUBNET,
help=f"拨号成功后用于识别模组接口的 IPv4 网段,默认 {DEFAULT_MODEM_SUBNET}",
)
parser.add_argument(
"--gateway",
type=parse_ipv4_address,
default=DEFAULT_MODEM_GATEWAY,
help=f"5G 模组网关地址,默认 {DEFAULT_MODEM_GATEWAY}",
)
parser.add_argument(
"--skip-dhcp",
action="store_true",
help="只等待 USB 网卡出现,不主动申请 IPv4",
)
parser.add_argument(
"--remove-default-route",
action="store_true",
help="拨号成功后删除 5G 接口上的默认路由,只保留显式主机路由",
)
parser.add_argument(
"--route-target",
action="append",
default=[],
type=parse_ipv4_address,
help="拨号完成后通过 5G 接口保留的 IPv4 主机路由目标,可重复传入",
)
return parser.parse_args()
def main():
args = parse_args()
require_root()
require_commands()
print("===== RM520N-GL RNDIS 自动拨号 =====")
print(f"[INFO] 目标模组网段: {args.modem_subnet}")
#1.检测 lsusb确认是否识别到模块
present, detail = usb_device_present()
if not present:
print(f"[FAIL] 未检测到模块 USB 设备 {USB_ID}")
if detail:
print(detail)
sys.exit(1)
print(f"[OK] 检测到 USB 设备: {detail}")
print(f"[INFO] 使用 AT 口: {args.serial_port}")
#2.进行 Python 串口拨号
try:
configure_rndis(args.serial_port)
print("[INFO] 已发送 AT+CFUN=1,1等待模块重启")
disappeared, _ = wait_for_usb_device(expected_present=False, timeout=25)
if disappeared:
print("[OK] 模块已下线,继续等待重新枚举")
else:
print("[WARN] 未观察到模块下线,继续等待重新枚举")
reappeared, detail = wait_for_usb_device(expected_present=True, timeout=90)
if not reappeared:
print(f"[FAIL] 模块重启后未重新枚举: {USB_ID}")
sys.exit(1)
print(f"[OK] 模块已重新枚举: {detail}")
candidates = wait_for_usb_candidates(explicit_iface=args.interface, timeout=90)
if not candidates:
print("[FAIL] 未检测到 5G 模组枚举出的 USB 网卡")
sys.exit(1)
if args.skip_dhcp:
print(f"[INFO] 当前 USB 网卡候选项: {', '.join(candidates)}")
iface, ipv4_addrs = find_interface_by_subnet(
args.modem_subnet,
explicit_iface=args.interface,
)
if not iface:
print(f"[WARN] 当前还没有接口拿到目标网段 {args.modem_subnet} 的地址")
sys.exit(1)
else:
iface, ipv4_addrs = acquire_modem_interface(
args.modem_subnet,
explicit_iface=args.interface,
)
if not iface:
print(f"[FAIL] 未找到落在目标网段 {args.modem_subnet} 内的模组接口")
sys.exit(1)
print_interface_status(iface)
if ipv4_addrs:
for addr in ipv4_addrs:
print(f"[OK] {iface} 已获取 IPv4: {addr}")
save_interface_info(iface)
route_targets = dedupe_keep_order(args.route_target)
if args.remove_default_route:
enforce_route_policy(iface, args.gateway, route_targets)
connectivity_targets = route_targets or list(DEFAULT_PUBLIC_TARGETS)
ping_via_interface(iface, targets=connectivity_targets)
print(f"[DONE] RNDIS 拨号完成,可执行: sudo python3 speed_test.py {iface}")
return
print(f"[WARN] {iface} 已出现,但还没有 IPv4 地址")
print(f"[INFO] 可手动检查: ip addr show {iface}")
sys.exit(1)
except (RuntimeError, subprocess.TimeoutExpired) as exc:
print(f"[FAIL] {exc}")
sys.exit(1)
if __name__ == "__main__":
main()

View File

@@ -1,12 +1,18 @@
# Boot-time settings for the robot-side autostart chain.
# Override machine-specific values in robot-boot.env.local.
BLITZ_BOOT_DELAY_SEC="60"
BLITZ_BOOT_DELAY_SEC="30"
BLITZ_LOG_FILE="/var/log/blitz-robot/startup.log"
BLITZ_5G_DIAL_DIR="/home/nvidia/5g-test/5G"
BLITZ_5G_SERIAL_PORT="/dev/ttyUSB7"
BLITZ_5G_INTERFACE="eth0"
BLITZ_5G_DIAL_DIR="${OMNISOCKETGO_ROOT}/scripts/boot"
BLITZ_5G_SERIAL_PORT="/dev/ttyUSB2"
BLITZ_5G_INTERFACE=""
BLITZ_5G_MODEM_SUBNET="192.168.224.0/22"
BLITZ_5G_GATEWAY="192.168.225.1"
BLITZ_5G_SKIP_DHCP="0"
BLITZ_5G_REMOVE_DEFAULT_ROUTE="1"
BLITZ_5G_ROUTE_TARGETS="106.55.173.235"
BLITZ_5G_INFO_JSON="${OMNISOCKETGO_ROOT}/scripts/boot/modem_network_info.json"
BLITZ_5G_SERIAL_WAIT_SEC="60"
BLITZ_5G_ROUTE_WAIT_SEC="30"