Files
OmniSocketGo/scripts/boot/rndis_dial.py
2026-04-13 17:22:18 +08:00

855 lines
26 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""RM520N-GL RNDIS 自动拨号脚本。
流程:
1. 检测 USB 设备是否存在
2. 打开 AT 口并检查 SIM 状态
3. 配置 RNDIS 模式: AT+QCFG="usbnet",3
4. 重启模块: AT+CFUN=1,1
5. 等待模块重新枚举并识别 5G 网卡
6. 如果网卡还没有 IPv4, 自动尝试 DHCP
用法:
sudo python3 rndis_dial.py
sudo python3 rndis_dial.py --serial-port /dev/ttyUSB7
sudo python3 rndis_dial.py --interface eth0 #指定网口
"""
from __future__ import annotations
import argparse
import errno
import ipaddress
import json
import os
import select
import shlex
import shutil
import subprocess
import sys
import termios
import time
import tty
USB_ID = "2c7c:0801"
DEFAULT_SERIAL_PORT = "/dev/ttyUSB7" #串口设备节点
DEFAULT_BAUD_RATE = 115200
CHECK_INTERVAL = 2
SERIAL_READ_TIMEOUT = 0.2
SERIAL_POLL_INTERVAL = 0.1
SERIAL_SETTLE_DELAY = 0.3
AT_SYNC_RETRIES = 3
AT_SYNC_TIMEOUT = 2.5
# 示例地址 192.168.225.38/22 所在网段。
# 拨号成功后会用这个网段来最终确认哪个接口是 5G 模组。
DEFAULT_MODEM_SUBNET = "192.168.224.0/22"
DEFAULT_MODEM_GATEWAY = "192.168.225.1"
DEFAULT_PUBLIC_TARGETS = ("81.70.156.140", "106.55.173.235")
DEFAULT_INFO_JSON = "modem_network_info.json"
SKIP_INTERFACES = {"lo", "docker0", "l4tbr0"}
BAUD_RATE_MAP = {
9600: termios.B9600,
19200: termios.B19200,
38400: termios.B38400,
57600: termios.B57600,
115200: termios.B115200,
}
def run_cmd(cmd, timeout=30, check=False):
print(f"[CMD] {format_shell_cmd(cmd)}")
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=timeout,
check=False,
)
output = (result.stdout or "") + (result.stderr or "")
if check and result.returncode != 0:
raise RuntimeError(f"命令执行失败: {' '.join(cmd)}\n{output.strip()}")
return result.returncode, output.strip()
def format_shell_cmd(cmd):
"""把命令参数格式化成可直接阅读的 shell 形式。"""
return " ".join(shlex.quote(part) for part in cmd)
def parse_ipv4_address(value):
try:
return str(ipaddress.IPv4Address(value))
except ipaddress.AddressValueError as exc:
raise argparse.ArgumentTypeError(f"无效的 IPv4 地址: {value}") from exc
def dedupe_keep_order(values):
seen = set()
result = []
for value in values:
if value in seen:
continue
seen.add(value)
result.append(value)
return result
def require_root():
if os.geteuid() != 0:
print("[FAIL] 请使用 sudo 运行此脚本")
sys.exit(1)
def require_commands():
missing = [cmd for cmd in ("lsusb", "ip") if shutil.which(cmd) is None]
if missing:
print(f"[FAIL] 缺少系统命令: {', '.join(missing)}")
sys.exit(1)
def usb_device_present():
# 1. 第一次检测 lsusb确认模块已经被系统识别。
"""通过 lsusb 检查模块是否已经被系统识别。"""
code, output = run_cmd(["lsusb"], timeout=10)
if code != 0:
return False, output
for line in output.splitlines():
if USB_ID in line:
return True, line.strip()
return False, output
def wait_for_usb_device(expected_present, timeout):
"""等待模块 USB 设备下线或重新上线。"""
deadline = time.time() + timeout
last_seen = ""
while time.time() < deadline:
present, detail = usb_device_present()
last_seen = detail
if present == expected_present:
return True, detail
time.sleep(CHECK_INTERVAL)
return False, last_seen
def wait_for_path(path, timeout):
"""等待串口节点或其他路径重新出现。"""
deadline = time.time() + timeout
while time.time() < deadline:
if os.path.exists(path):
return True
time.sleep(1)
return False
def normalize_serial_output(text):
"""整理串口原始输出,便于后续匹配关键字。"""
cleaned = text.replace("\r", "\n")
return "\n".join(line for line in cleaned.splitlines() if line.strip()).strip()
def serial_response_complete(text):
if not text:
return False
for line in reversed(text.splitlines()):
stripped = line.strip()
if stripped == "OK":
return True
if "ERROR" in stripped:
return True
return False
class RawSerialSession:
"""使用 Python 标准库直接控制 Linux 串口,尽量贴近 stty/raw 行为。"""
def __init__(self, port, baudrate):
if baudrate not in BAUD_RATE_MAP:
raise RuntimeError(f"不支持的波特率: {baudrate}")
self.port = port
self.fd = None
self._original_attrs = None
try:
self.fd = os.open(port, os.O_RDWR | os.O_NOCTTY | os.O_NONBLOCK)
self._original_attrs = termios.tcgetattr(self.fd)
tty.setraw(self.fd, when=termios.TCSANOW)
attrs = termios.tcgetattr(self.fd)
attrs[0] = 0
attrs[1] = 0
attrs[2] &= ~(termios.PARENB | termios.CSTOPB | termios.CSIZE)
attrs[2] |= termios.CS8 | termios.CLOCAL | termios.CREAD
attrs[3] = 0
attrs[4] = BAUD_RATE_MAP[baudrate]
attrs[5] = BAUD_RATE_MAP[baudrate]
attrs[6][termios.VMIN] = 0
attrs[6][termios.VTIME] = 0
termios.tcsetattr(self.fd, termios.TCSANOW, attrs)
termios.tcflush(self.fd, termios.TCIOFLUSH)
except OSError as exc:
self.close()
raise RuntimeError(f"无法打开串口 {port}: {exc}") from exc
@property
def is_open(self):
return self.fd is not None
def reset_input_buffer(self):
if self.fd is not None:
termios.tcflush(self.fd, termios.TCIFLUSH)
def reset_output_buffer(self):
if self.fd is not None:
termios.tcflush(self.fd, termios.TCOFLUSH)
def write(self, data):
if self.fd is None:
raise OSError("串口未打开")
sent = 0
while sent < len(data):
try:
written = os.write(self.fd, data[sent:])
except BlockingIOError:
time.sleep(SERIAL_POLL_INTERVAL)
continue
if written <= 0:
raise OSError("串口写入返回 0 字节")
sent += written
def flush(self):
if self.fd is not None:
termios.tcdrain(self.fd)
def read_chunk(self, timeout, size=4096):
if self.fd is None:
return b""
ready, _, _ = select.select([self.fd], [], [], timeout)
if not ready:
return b""
try:
return os.read(self.fd, size)
except BlockingIOError:
return b""
def close(self):
if self.fd is None:
return
fd = self.fd
self.fd = None
if self._original_attrs is not None:
try:
termios.tcsetattr(fd, termios.TCSANOW, self._original_attrs)
except termios.error:
pass
os.close(fd)
def read_serial_output(session, timeout, allow_disconnect=False):
"""在给定时间窗口内读取 AT 响应,直到出现结束标记或超时。"""
deadline = time.time() + timeout
chunks = []
saw_terminal_line = False
last_data_time = None
while time.time() < deadline:
try:
chunk = session.read_chunk(timeout=min(SERIAL_READ_TIMEOUT, max(deadline - time.time(), 0)))
except OSError as exc:
if allow_disconnect and exc.errno in (errno.EIO, errno.ENODEV, errno.EBADF):
break
raise RuntimeError(f"读取串口响应失败: {exc}") from exc
if chunk:
chunks.append(chunk.decode(errors="ignore"))
last_data_time = time.time()
current_text = normalize_serial_output("".join(chunks))
if serial_response_complete(current_text):
saw_terminal_line = True
continue
if saw_terminal_line and last_data_time is not None and time.time() - last_data_time >= SERIAL_SETTLE_DELAY:
break
time.sleep(SERIAL_POLL_INTERVAL)
return normalize_serial_output("".join(chunks))
def open_serial_session(port):
"""打开 AT 串口会话,后续在同一连接里顺序发送多条命令。"""
ser = RawSerialSession(port=port, baudrate=DEFAULT_BAUD_RATE)
time.sleep(0.2)
ser.reset_input_buffer()
ser.reset_output_buffer()
return ser
def execute_serial_step(ser, command, expect=None, timeout=3, allow_disconnect=False):
"""在当前串口会话里发送一条 AT 命令并校验响应。"""
print(f"[AT] {command}")
try:
ser.reset_input_buffer()
ser.write((command + "\r").encode())
ser.flush()
except OSError as exc:
raise RuntimeError(f"AT 命令 `{command}` 发送失败: {exc}") from exc
response = read_serial_output(ser, timeout=timeout, allow_disconnect=allow_disconnect)
if response:
print(response)
else:
print("(无响应)")
if "ERROR" in response:
raise RuntimeError(f"AT 命令 `{command}` 执行失败: {response}")
if expect and expect not in response and not allow_disconnect:
raise RuntimeError(f"AT 命令 `{command}` 响应异常: {response or '空响应'}")
return response
def synchronize_at_channel(ser):
"""某些模组 AT 口在刚打开时需要先用 AT 做一次预热。"""
last_error = None
for attempt in range(1, AT_SYNC_RETRIES + 1):
try:
print(f"[INFO] 预热 AT 通道,第 {attempt}")
response = execute_serial_step(ser, "AT", expect="OK", timeout=AT_SYNC_TIMEOUT)
if "OK" in response:
return
except RuntimeError as exc:
last_error = exc
time.sleep(0.5)
if last_error is not None:
raise RuntimeError(
"AT 通道预热失败,请确认串口是否是 AT 命令口,例如 /dev/ttyUSB2"
) from last_error
raise RuntimeError("AT 通道预热失败")
def run_serial_steps(port, steps):
"""在同一个串口会话里顺序执行多条 AT 命令。"""
ser = None
try:
ser = open_serial_session(port)
synchronize_at_channel(ser)
for step in steps:
execute_serial_step(
ser,
step["command"],
expect=step.get("expect"),
timeout=step.get("timeout", 3),
allow_disconnect=step.get("allow_disconnect", False),
)
finally:
if ser is not None and ser.is_open:
ser.close()
def configure_rndis(port):
# 2. 用 Python 串口库在同一会话里顺序执行拨号相关 AT 命令。
"""切换到 RNDIS 模式并触发模块重启。"""
if not wait_for_path(port, timeout=30):
raise RuntimeError(f"串口不存在: {port}")
print(f"[OK] 串口已打开: {port}")
run_serial_steps(
port,
[
{"command": "AT+CPIN?", "expect": "READY", "timeout": 4},
{"command": 'AT+QCFG="usbnet",3', "expect": "OK", "timeout": 5},
{"command": "AT+CFUN=1,1", "timeout": 4, "allow_disconnect": True},
],
)
def get_interfaces():
"""列出当前系统中的接口,过滤明显无关的本地接口。"""
interfaces = []
try:
for name in os.listdir("/sys/class/net"):
if name in SKIP_INTERFACES or is_usb_gadget(name):
continue
interfaces.append(name)
except FileNotFoundError:
return []
return sorted(interfaces)
def is_usb_gadget(iface):
"""过滤 Jetson 自己暴露出去的 gadget 网卡。"""
sysfs_path = f"/sys/class/net/{iface}"
if not os.path.exists(sysfs_path):
return False
return "/gadget/" in os.path.realpath(sysfs_path)
def is_usb_network_interface(iface):
"""判断接口是否来自 USB 设备。"""
device_path = f"/sys/class/net/{iface}/device"
if not os.path.exists(device_path):
return False
real_path = os.path.realpath(device_path)
return "/usb" in real_path
def get_ipv4_addrs():
"""返回所有接口的 IPv4/CIDR 信息。"""
code, output = run_cmd(["ip", "-o", "-4", "addr", "show"], timeout=10)
if code != 0:
return {}
ipv4_addrs = {}
for line in output.splitlines():
parts = line.split()
if len(parts) >= 4:
iface = parts[1]
ipv4_addrs.setdefault(iface, []).append(parts[3])
return ipv4_addrs
def get_ipv6_addrs():
"""返回所有接口的 IPv6/CIDR 信息。"""
code, output = run_cmd(["ip", "-o", "-6", "addr", "show"], timeout=10)
if code != 0:
return {}
ipv6_addrs = {}
for line in output.splitlines():
parts = line.split()
if len(parts) >= 4:
iface = parts[1]
ipv6_addrs.setdefault(iface, []).append(parts[3])
return ipv6_addrs
def interface_priority(iface):
if iface.startswith("wwan"):
return 0
if iface.startswith("enx"):
return 1
if iface.startswith("usb"):
return 2
return 10
def list_usb_network_candidates(explicit_iface=None):
"""列出拨号前可尝试的 USB 网卡候选项。
这里不靠固定网口名确认 5G 模组,只是在还没有 IP 的时候先缩小范围。
真正确认模组接口,会在 DHCP 之后根据 IP 网段判断。
"""
candidates = []
for iface in get_interfaces():
if explicit_iface and iface != explicit_iface:
continue
if not is_usb_network_interface(iface):
continue
candidates.append((interface_priority(iface), iface))
if not candidates:
return []
candidates.sort()
return [iface for _, iface in candidates]
def ip_in_subnet(ip_cidr, subnet):
"""判断接口地址是否落在指定网段内。"""
try:
return ipaddress.ip_interface(ip_cidr).ip in ipaddress.ip_network(subnet, strict=False)
except ValueError:
return False
def find_interface_by_subnet(modem_subnet, explicit_iface=None):
"""拨号成功后,通过 IP 网段确认 5G 模组网卡。"""
candidates = []
for iface, addrs in get_ipv4_addrs().items():
if iface in SKIP_INTERFACES or is_usb_gadget(iface):
continue
if not is_usb_network_interface(iface):
continue
if explicit_iface and iface != explicit_iface:
continue
matched_addrs = [addr for addr in addrs if ip_in_subnet(addr, modem_subnet)]
if matched_addrs:
candidates.append((interface_priority(iface), iface, matched_addrs))
if not candidates:
return None, []
candidates.sort()
_, iface, matched_addrs = candidates[0]
return iface, matched_addrs
def wait_for_usb_candidates(explicit_iface=None, timeout=90):
"""等待模块枚举出 USB 网卡候选项。"""
deadline = time.time() + timeout
while time.time() < deadline:
candidates = list_usb_network_candidates(explicit_iface=explicit_iface)
if candidates:
return candidates
time.sleep(CHECK_INTERVAL)
return []
def bring_interface_up(iface):
code, output = run_cmd(["ip", "link", "set", "dev", iface, "up"], timeout=10)
if code != 0:
raise RuntimeError(f"拉起网卡失败: {iface}\n{output}")
def renew_dhcp(iface):
dhclient = shutil.which("dhclient")
udhcpc = shutil.which("udhcpc")
if dhclient:
print(f"[INFO] 使用 dhclient 为 {iface} 获取 IP")
code, output = run_cmd(["dhclient", "-1", "-v", iface], timeout=45)
return code == 0, output
if udhcpc:
print(f"[INFO] 使用 udhcpc 为 {iface} 获取 IP")
code, output = run_cmd(["udhcpc", "-n", "-q", "-i", iface], timeout=45)
return code == 0, output
return False, "系统中未找到 dhclient 或 udhcpc"
def get_default_routes(iface):
code, output = run_cmd(["ip", "-o", "route", "show", "default", "dev", iface], timeout=10)
if code != 0:
return []
return [line.strip() for line in output.splitlines() if line.strip()]
def resolve_gateway(iface, fallback_gateway):
for route in get_default_routes(iface):
tokens = route.split()
for index, token in enumerate(tokens[:-1]):
if token == "via":
gateway = tokens[index + 1]
print(f"[INFO] 从默认路由检测到 {iface} 网关: {gateway}")
return gateway
print(f"[INFO] 未从默认路由检测到 {iface} 网关,回退到 {fallback_gateway}")
return fallback_gateway
def delete_default_routes(iface):
removed = 0
while True:
routes = get_default_routes(iface)
if not routes:
return removed
deleted_this_round = False
for route in routes:
cmd = ["ip", "route", "del", *route.split()]
code, output = run_cmd(cmd, timeout=10)
if code != 0:
code, output = run_cmd(["ip", "route", "del", "default", "dev", iface], timeout=10)
if code != 0:
raise RuntimeError(f"删除默认路由失败: {iface}\n{output}")
removed += 1
deleted_this_round = True
if not deleted_this_round:
raise RuntimeError(f"未能删除 {iface} 的默认路由")
def install_host_routes(iface, gateway, targets):
for target in dedupe_keep_order(targets):
cmd = ["ip", "route", "replace", f"{target}/32", "via", gateway, "dev", iface]
code, output = run_cmd(cmd, timeout=10)
if code != 0:
raise RuntimeError(f"添加主机路由失败: {target} via {gateway} dev {iface}\n{output}")
print(f"[OK] 已添加主机路由: {target}/32 via {gateway} dev {iface}")
def enforce_route_policy(iface, fallback_gateway, route_targets):
gateway = resolve_gateway(iface, fallback_gateway)
removed = delete_default_routes(iface)
print(f"[OK] 已删除 {iface} 上的 {removed} 条默认路由")
if route_targets:
install_host_routes(iface, gateway, route_targets)
else:
print(f"[WARN] {iface} 未配置任何主机路由目标5G 将不再承载公网流量")
def ensure_ipv4(iface):
"""为指定接口申请 IPv4 地址。"""
ipv4_addrs = get_ipv4_addrs().get(iface, [])
if ipv4_addrs:
return ipv4_addrs
bring_interface_up(iface)
ok, output = renew_dhcp(iface)
if output:
print(output)
if not ok:
return []
return get_ipv4_addrs().get(iface, [])
def acquire_modem_interface(modem_subnet, explicit_iface=None):
"""通过 DHCP + IP 网段识别真正的模组接口。"""
iface, matched_addrs = find_interface_by_subnet(
modem_subnet,
explicit_iface=explicit_iface,
)
if iface:
return iface, matched_addrs
candidates = list_usb_network_candidates(explicit_iface=explicit_iface)
if not candidates:
raise RuntimeError("未找到可尝试 DHCP 的 USB 网卡候选项")
print(f"[INFO] 当前 USB 网卡候选项: {', '.join(candidates)}")
for iface in candidates:
print(f"[INFO] 尝试为 {iface} 获取 IPv4")
ensure_ipv4(iface)
matched_iface, matched_addrs = find_interface_by_subnet(
modem_subnet,
explicit_iface=explicit_iface,
)
if matched_iface:
return matched_iface, matched_addrs
return None, []
def print_interface_status(iface):
# 3. 拨号成功后,打印 ip/ifconfig确认模组网口和地址。
print(f"[OK] 检测到 5G 网卡: {iface}")
code, output = run_cmd(["ip", "-4", "addr", "show", "dev", iface], timeout=10)
if code == 0 and output:
print(output)
if shutil.which("ifconfig"):
code, ifconfig_output = run_cmd(["ifconfig", iface], timeout=10)
if code == 0 and ifconfig_output:
print("\n===== ifconfig =====")
print(ifconfig_output)
def save_interface_info(iface, output_file=DEFAULT_INFO_JSON):
"""把网口名称、IPv4、IPv6 保存到 JSON 文件。"""
data = {
"interface": iface,
"ipv4": get_ipv4_addrs().get(iface, []),
"ipv6": get_ipv6_addrs().get(iface, []),
}
with open(output_file, "w", encoding="utf-8") as json_file:
json.dump(data, json_file, ensure_ascii=False, indent=2)
print(f"[OK] 网口信息已保存到 {output_file}")
def ping_target(iface, target, count=3, timeout=15):
"""通过指定网口 ping 一个目标。"""
code, output = run_cmd(
["ping", "-I", iface, "-c", str(count), "-W", "3", target],
timeout=timeout,
)
return code == 0, output
def print_ping_summary(output):
"""只打印 ping 的关键结果。"""
for line in output.splitlines():
if "packets transmitted" in line or "rtt " in line or "Destination " in line:
print(line)
def verify_connectivity(iface, gateway=DEFAULT_MODEM_GATEWAY, targets=DEFAULT_PUBLIC_TARGETS, retry_interval=3, max_wait=45):
# 4. 最后先 ping 模组网关,再重试公网连通性。
"""先测模组网关,再轮询公网目标地址。"""
ok, output = ping_target(iface, gateway, count=3, timeout=15)
if ok:
print(f"[OK] {iface} 可到达模组网关 {gateway}")
print_ping_summary(output)
else:
print(f"[WARN] {iface} 无法到达模组网关 {gateway}")
if output:
print(output)
return False
deadline = time.time() + max_wait
attempt = 1
while True:
for target in targets:
ok, output = ping_target(iface, target, count=3, timeout=15)
if ok:
print(f"[OK] {iface} 可通过 {target}")
print_ping_summary(output)
return True
print(f"[WARN] 第 {attempt} 次 Ping {target} 失败")
if output:
print_ping_summary(output)
if time.time() >= deadline:
print(f"[WARN] {iface}{max_wait} 秒内仍无法连通 {', '.join(targets)}")
return False
attempt += 1
time.sleep(retry_interval)
def ping_via_interface(iface, targets=DEFAULT_PUBLIC_TARGETS):
"""保留原调用点,内部走完整连通性检查。"""
return verify_connectivity(iface, targets=targets)
def parse_args():
parser = argparse.ArgumentParser(description="RM520N-GL RNDIS 自动拨号脚本")
parser.add_argument(
"--serial-port",
default=DEFAULT_SERIAL_PORT,
help=f"AT 串口路径,默认 {DEFAULT_SERIAL_PORT}",
)
parser.add_argument(
"--interface",
help="指定期望的 5G 网卡名,例如 eth0",
)
parser.add_argument(
"--modem-subnet",
default=DEFAULT_MODEM_SUBNET,
help=f"拨号成功后用于识别模组接口的 IPv4 网段,默认 {DEFAULT_MODEM_SUBNET}",
)
parser.add_argument(
"--gateway",
type=parse_ipv4_address,
default=DEFAULT_MODEM_GATEWAY,
help=f"5G 模组网关地址,默认 {DEFAULT_MODEM_GATEWAY}",
)
parser.add_argument(
"--skip-dhcp",
action="store_true",
help="只等待 USB 网卡出现,不主动申请 IPv4",
)
parser.add_argument(
"--remove-default-route",
action="store_true",
help="拨号成功后删除 5G 接口上的默认路由,只保留显式主机路由",
)
parser.add_argument(
"--route-target",
action="append",
default=[],
type=parse_ipv4_address,
help="拨号完成后通过 5G 接口保留的 IPv4 主机路由目标,可重复传入",
)
return parser.parse_args()
def main():
args = parse_args()
require_root()
require_commands()
print("===== RM520N-GL RNDIS 自动拨号 =====")
print(f"[INFO] 目标模组网段: {args.modem_subnet}")
#1.检测 lsusb确认是否识别到模块
present, detail = usb_device_present()
if not present:
print(f"[FAIL] 未检测到模块 USB 设备 {USB_ID}")
if detail:
print(detail)
sys.exit(1)
print(f"[OK] 检测到 USB 设备: {detail}")
print(f"[INFO] 使用 AT 口: {args.serial_port}")
#2.进行 Python 串口拨号
try:
configure_rndis(args.serial_port)
print("[INFO] 已发送 AT+CFUN=1,1等待模块重启")
disappeared, _ = wait_for_usb_device(expected_present=False, timeout=25)
if disappeared:
print("[OK] 模块已下线,继续等待重新枚举")
else:
print("[WARN] 未观察到模块下线,继续等待重新枚举")
reappeared, detail = wait_for_usb_device(expected_present=True, timeout=90)
if not reappeared:
print(f"[FAIL] 模块重启后未重新枚举: {USB_ID}")
sys.exit(1)
print(f"[OK] 模块已重新枚举: {detail}")
candidates = wait_for_usb_candidates(explicit_iface=args.interface, timeout=90)
if not candidates:
print("[FAIL] 未检测到 5G 模组枚举出的 USB 网卡")
sys.exit(1)
if args.skip_dhcp:
print(f"[INFO] 当前 USB 网卡候选项: {', '.join(candidates)}")
iface, ipv4_addrs = find_interface_by_subnet(
args.modem_subnet,
explicit_iface=args.interface,
)
if not iface:
print(f"[WARN] 当前还没有接口拿到目标网段 {args.modem_subnet} 的地址")
sys.exit(1)
else:
iface, ipv4_addrs = acquire_modem_interface(
args.modem_subnet,
explicit_iface=args.interface,
)
if not iface:
print(f"[FAIL] 未找到落在目标网段 {args.modem_subnet} 内的模组接口")
sys.exit(1)
print_interface_status(iface)
if ipv4_addrs:
for addr in ipv4_addrs:
print(f"[OK] {iface} 已获取 IPv4: {addr}")
save_interface_info(iface)
route_targets = dedupe_keep_order(args.route_target)
if args.remove_default_route:
enforce_route_policy(iface, args.gateway, route_targets)
connectivity_targets = route_targets or list(DEFAULT_PUBLIC_TARGETS)
ping_via_interface(iface, targets=connectivity_targets)
print(f"[DONE] RNDIS 拨号完成,可执行: sudo python3 speed_test.py {iface}")
return
print(f"[WARN] {iface} 已出现,但还没有 IPv4 地址")
print(f"[INFO] 可手动检查: ip addr show {iface}")
sys.exit(1)
except (RuntimeError, subprocess.TimeoutExpired) as exc:
print(f"[FAIL] {exc}")
sys.exit(1)
if __name__ == "__main__":
main()