From 40cd68db3d5d7e2f0df10e889d672b15d2d56e80 Mon Sep 17 00:00:00 2001 From: Mock Date: Fri, 10 Apr 2026 11:49:38 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E4=BF=AE=E5=A4=8D=E5=90=AF=E5=8A=A8bas?= =?UTF-8?q?h=E8=B7=AF=E5=BE=84=E3=80=81=E5=B0=86=E4=B8=A2=E5=A4=B1?= =?UTF-8?q?=E8=BF=9E=E6=8E=A5=E6=97=B6=E7=9A=84=E8=A7=86=E9=A2=91=E7=A8=8D?= =?UTF-8?q?=E5=BE=AE=E4=B8=8D=E5=A0=86=E7=A7=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- python/tests/test_sessions.py | 18 +++++++++++++++++- scripts/dev/README.md | 1 + scripts/dev/load-env.sh | 10 ++++++---- scripts/dev/start-backend.sh | 1 + scripts/dev/start-frontend.sh | 1 + src/server_kcp_hub.c | 8 +++++++- src/video_pipeline.c | 30 ++++++++++++++++++++++++++++++ src/video_pipeline_gps.c | 30 ++++++++++++++++++++++++++++++ 8 files changed, 93 insertions(+), 6 deletions(-) diff --git a/python/tests/test_sessions.py b/python/tests/test_sessions.py index 04fca02..ecd02c1 100644 --- a/python/tests/test_sessions.py +++ b/python/tests/test_sessions.py @@ -174,7 +174,7 @@ def test_kcp_idle_video_peers_survive_without_receive_loop() -> None: port = _reserve_port() listen_addr = f'127.0.0.1:{port}' sender_id = 'peer-b-video' - receiver_id = 'peer-a-video' + receiver_id = 'pytest-kcp-video-idle-receiver' with _run_server('kcpserver', listen_addr): sender = _connect_with_retry(Session, transport='kcp', server_addr=listen_addr, peer_id=sender_id) @@ -194,6 +194,22 @@ def test_kcp_idle_video_peers_survive_without_receive_loop() -> None: receiver.close() +def test_kcp_peer_a_video_stale_receiver_is_evicted() -> None: + port = _reserve_port() + listen_addr = f'127.0.0.1:{port}' + receiver_id = 'peer-a-video' + + with _run_server('kcpserver', listen_addr): + receiver = _connect_with_retry(Session, transport='kcp', server_addr=listen_addr, peer_id=receiver_id) + + try: + time.sleep(5.0) + with pytest.raises(OSError): + receiver.recv(timeout_ms=1000) + finally: + receiver.close() + + def test_udp_session_close_interrupts_blocking_recv() -> None: port = _reserve_port() listen_addr = f'127.0.0.1:{port}' diff --git a/scripts/dev/README.md b/scripts/dev/README.md index 803552d..c78a7a4 100644 --- a/scripts/dev/README.md +++ b/scripts/dev/README.md @@ -17,6 +17,7 @@ The scripts assume: - `robot-command-center` is a sibling directory next to it If your `robot-command-center` is elsewhere, set `ROBOT_COMMAND_CENTER_ROOT` in `robot-remote.env.local`. +`start-backend.sh` and `start-frontend.sh` need that repo; `start-ros-receiver.sh` and `start-b-side-omnid.sh` do not. ## Files diff --git a/scripts/dev/load-env.sh b/scripts/dev/load-env.sh index 07eeed4..9401236 100644 --- a/scripts/dev/load-env.sh +++ b/scripts/dev/load-env.sh @@ -19,6 +19,12 @@ is_robot_command_center_root() { [[ -f "${dir}/backend/config/asgi.py" && -f "${dir}/frontend/package.json" ]] } +require_robot_command_center_root() { + if ! is_robot_command_center_root "${ROBOT_COMMAND_CENTER_ROOT}"; then + die "ROBOT_COMMAND_CENTER_ROOT must point to the robot-command-center repo root. Current value: ${ROBOT_COMMAND_CENTER_ROOT}. Set it in ${SCRIPT_DIR}/robot-remote.env.local if needed." + fi +} + export OMNISOCKETGO_ROOT="${OMNISOCKETGO_ROOT:-${DEFAULT_OMNISOCKETGO_ROOT}}" ENV_FILES=( @@ -42,10 +48,6 @@ if ! is_omnisocketgo_root "${OMNISOCKETGO_ROOT}"; then die "OMNISOCKETGO_ROOT must point to the OmniSocketGo repo root. Current value: ${OMNISOCKETGO_ROOT}" fi -if ! is_robot_command_center_root "${ROBOT_COMMAND_CENTER_ROOT}"; then - die "ROBOT_COMMAND_CENTER_ROOT must point to the robot-command-center repo root. Current value: ${ROBOT_COMMAND_CENTER_ROOT}. Set it in ${SCRIPT_DIR}/robot-remote.env.local if needed." -fi - export BACKEND_DIR="${BACKEND_DIR:-${ROBOT_COMMAND_CENTER_ROOT}/backend}" export FRONTEND_DIR="${FRONTEND_DIR:-${ROBOT_COMMAND_CENTER_ROOT}/frontend}" export ROS_CONTROL_PY_DIR="${ROS_CONTROL_PY_DIR:-${OMNISOCKETGO_ROOT}/ros-control-py}" diff --git a/scripts/dev/start-backend.sh b/scripts/dev/start-backend.sh index 2ec10c4..0f1d79a 100755 --- a/scripts/dev/start-backend.sh +++ b/scripts/dev/start-backend.sh @@ -4,6 +4,7 @@ set -euo pipefail SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" # shellcheck disable=SC1091 source "${SCRIPT_DIR}/load-env.sh" +require_robot_command_center_root if [[ ! -d "${PYTHON_VENV_PATH}" ]]; then "${PYTHON3_BIN}" -m venv "${PYTHON_VENV_PATH}" diff --git a/scripts/dev/start-frontend.sh b/scripts/dev/start-frontend.sh index 2c53dc1..b33a87a 100755 --- a/scripts/dev/start-frontend.sh +++ b/scripts/dev/start-frontend.sh @@ -4,6 +4,7 @@ set -euo pipefail SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" # shellcheck disable=SC1091 source "${SCRIPT_DIR}/load-env.sh" +require_robot_command_center_root cd "${FRONTEND_DIR}" exec npm run dev -- --host "${FRONTEND_HOST}" --port "${FRONTEND_PORT}" diff --git a/src/server_kcp_hub.c b/src/server_kcp_hub.c index f422c38..8a0dfb1 100644 --- a/src/server_kcp_hub.c +++ b/src/server_kcp_hub.c @@ -113,11 +113,17 @@ static int kcp_hub_peer_is_telemetry(const char *peer_id) { return kcp_hub_peer_id_has_suffix(peer_id, "-telemetry"); } +static int kcp_hub_peer_is_video_receiver(const char *peer_id) { + return peer_id != NULL && strcmp(peer_id, "peer-a-video") == 0; +} + static int kcp_hub_peer_uses_server_lease(const char *peer_id) { if (peer_id == NULL || peer_id[0] == '\0') { return 0; } - return kcp_hub_peer_id_has_suffix(peer_id, "-ctrl") || kcp_hub_peer_is_telemetry(peer_id); + return kcp_hub_peer_id_has_suffix(peer_id, "-ctrl") + || kcp_hub_peer_is_telemetry(peer_id) + || kcp_hub_peer_is_video_receiver(peer_id); } static const char *kcp_hub_peer_node_id(const char *peer_id) { diff --git a/src/video_pipeline.c b/src/video_pipeline.c index 18b13c7..eadf31f 100644 --- a/src/video_pipeline.c +++ b/src/video_pipeline.c @@ -566,6 +566,32 @@ static int video_sender_init(video_sender_t *sender, const video_pipeline_config return 0; } +static int video_sender_drain_pending_messages(video_sender_t *sender) { + if (sender == NULL || sender->client == NULL) { + errno = EINVAL; + return -1; + } + + for (;;) { + message_t msg; + int rc; + + protocol_message_init(&msg); + rc = kcp_client_receive_timed(sender->client, &msg, 1); + if (rc == 1) { + protocol_message_clear(&msg); + return 0; + } + if (rc != 0) { + protocol_message_clear(&msg); + return -1; + } + + // Drain unread server errors so an offline receiver cannot back up the reverse KCP stream. + protocol_message_clear(&msg); + } +} + static int video_sender_send_packet(video_sender_t *sender, const AVPacket *encoded_pkt, uint64_t timestamp) { uint8_t *payload; size_t payload_len; @@ -585,6 +611,10 @@ static int video_sender_send_packet(video_sender_t *sender, const AVPacket *enco memcpy(payload, encoded_pkt->data, (size_t) encoded_pkt->size); memcpy(payload + encoded_pkt->size, ×tamp, sizeof(timestamp)); rc = kcp_client_send_binary(sender->client, sender->target_peer, payload, payload_len); + if (rc != 0) { + return rc; + } + rc = video_sender_drain_pending_messages(sender); return rc; } diff --git a/src/video_pipeline_gps.c b/src/video_pipeline_gps.c index 18b13c7..eadf31f 100644 --- a/src/video_pipeline_gps.c +++ b/src/video_pipeline_gps.c @@ -566,6 +566,32 @@ static int video_sender_init(video_sender_t *sender, const video_pipeline_config return 0; } +static int video_sender_drain_pending_messages(video_sender_t *sender) { + if (sender == NULL || sender->client == NULL) { + errno = EINVAL; + return -1; + } + + for (;;) { + message_t msg; + int rc; + + protocol_message_init(&msg); + rc = kcp_client_receive_timed(sender->client, &msg, 1); + if (rc == 1) { + protocol_message_clear(&msg); + return 0; + } + if (rc != 0) { + protocol_message_clear(&msg); + return -1; + } + + // Drain unread server errors so an offline receiver cannot back up the reverse KCP stream. + protocol_message_clear(&msg); + } +} + static int video_sender_send_packet(video_sender_t *sender, const AVPacket *encoded_pkt, uint64_t timestamp) { uint8_t *payload; size_t payload_len; @@ -585,6 +611,10 @@ static int video_sender_send_packet(video_sender_t *sender, const AVPacket *enco memcpy(payload, encoded_pkt->data, (size_t) encoded_pkt->size); memcpy(payload + encoded_pkt->size, ×tamp, sizeof(timestamp)); rc = kcp_client_send_binary(sender->client, sender->target_peer, payload, payload_len); + if (rc != 0) { + return rc; + } + rc = video_sender_drain_pending_messages(sender); return rc; }