From 5be3ff670f725b81cc5442ca432d92dee61f4554 Mon Sep 17 00:00:00 2001 From: nnbcccscdscdsc <2709767634@qq.com> Date: Fri, 27 Mar 2026 19:08:22 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=B0=86=E6=9E=B6=E6=9E=84=E6=94=B9?= =?UTF-8?q?=E6=88=90=E6=94=AF=E6=8C=81=E4=B8=AD=E9=97=B4=E4=B8=A4=E4=B8=AA?= =?UTF-8?q?server?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cmd/internal/peer/kcp_client_test.go | 283 +++++++++++++++++++++++++++ cmd/internal/server/kcp_hub.go | 211 +++++++++++++++++++- cmd/kcpserver/main.go | 38 ++++ scripts/run-kcp-batch-test.sh | 85 ++++++++ 4 files changed, 609 insertions(+), 8 deletions(-) diff --git a/cmd/internal/peer/kcp_client_test.go b/cmd/internal/peer/kcp_client_test.go index 3ebc3a7..51392c4 100644 --- a/cmd/internal/peer/kcp_client_test.go +++ b/cmd/internal/peer/kcp_client_test.go @@ -1,11 +1,14 @@ package peer import ( + "bytes" + "net" "os" "path/filepath" "reflect" "strings" "sync" + "sync/atomic" "testing" kcp "github.com/xtaci/kcp-go/v5" @@ -202,6 +205,185 @@ func TestKCPClientsExchangeMessagesWithLatencyLogs(t *testing.T) { }) } +func TestKCPClientsExchangeMessagesAcrossRelayedServers(t *testing.T) { + fixture := startRelayedKCPHubs(t) + defer fixture.cleanup() + + peerA, err := DialKCP(fixture.serverCAddr, "peer-a") + if err != nil { + t.Fatalf("DialKCP(peer-a) error = %v", err) + } + defer func() { _ = peerA.Close() }() + + peerB, err := DialKCP(fixture.serverDAddr, "peer-b") + if err != nil { + t.Fatalf("DialKCP(peer-b) error = %v", err) + } + defer func() { _ = peerB.Close() }() + + waitFor(t, func() bool { return fixture.hubC.HasPeer("peer-a") && fixture.hubD.HasPeer("peer-b") }, "both relayed peers to be registered") + + if err := peerA.SendText("peer-b", "hello via relay"); err != nil { + t.Fatalf("peerA.SendText() error = %v", err) + } + gotAtB, err := peerB.Receive() + if err != nil { + t.Fatalf("peerB.Receive() error = %v", err) + } + wantAtB := protocol.Message{ + Type: protocol.MessageTypeText, + ID: 1, + From: "peer-a", + To: "peer-b", + Body: []byte("hello via relay"), + } + if !reflect.DeepEqual(gotAtB, wantAtB) { + t.Fatalf("peerB received %+v, want %+v", gotAtB, wantAtB) + } + + if err := peerB.SendText("peer-a", "hello back"); err != nil { + t.Fatalf("peerB.SendText() error = %v", err) + } + gotAtA, err := peerA.Receive() + if err != nil { + t.Fatalf("peerA.Receive() error = %v", err) + } + wantAtA := protocol.Message{ + Type: protocol.MessageTypeText, + ID: 1, + From: "peer-b", + To: "peer-a", + Body: []byte("hello back"), + } + if !reflect.DeepEqual(gotAtA, wantAtA) { + t.Fatalf("peerA received %+v, want %+v", gotAtA, wantAtA) + } + + if got := fixture.relayC.WriteCount(); got != 1 { + t.Fatalf("relayC write count = %d, want 1", got) + } + if got := fixture.relayD.WriteCount(); got != 1 { + t.Fatalf("relayD write count = %d, want 1", got) + } +} + +func TestKCPHubPrefersLocalPeerBeforeRelay(t *testing.T) { + fixture := startRelayedKCPHubs(t) + defer fixture.cleanup() + + peerA, err := DialKCP(fixture.serverCAddr, "peer-a") + if err != nil { + t.Fatalf("DialKCP(peer-a) error = %v", err) + } + defer func() { _ = peerA.Close() }() + + peerB, err := DialKCP(fixture.serverCAddr, "peer-b") + if err != nil { + t.Fatalf("DialKCP(peer-b) error = %v", err) + } + defer func() { _ = peerB.Close() }() + + waitFor(t, func() bool { return fixture.hubC.HasPeer("peer-a") && fixture.hubC.HasPeer("peer-b") }, "local peers on hubC to be registered") + + if err := peerA.SendText("peer-b", "local delivery"); err != nil { + t.Fatalf("peerA.SendText() error = %v", err) + } + got, err := peerB.Receive() + if err != nil { + t.Fatalf("peerB.Receive() error = %v", err) + } + want := protocol.Message{ + Type: protocol.MessageTypeText, + ID: 1, + From: "peer-a", + To: "peer-b", + Body: []byte("local delivery"), + } + if !reflect.DeepEqual(got, want) { + t.Fatalf("peerB received %+v, want %+v", got, want) + } + + if got := fixture.relayC.WriteCount(); got != 0 { + t.Fatalf("relayC write count = %d, want 0 for local delivery", got) + } + if got := fixture.relayD.WriteCount(); got != 0 { + t.Fatalf("relayD write count = %d, want 0 for local delivery", got) + } +} + +func TestKCPRelayedUnknownTargetReturnsErrorToOriginalSender(t *testing.T) { + fixture := startRelayedKCPHubs(t) + defer fixture.cleanup() + + peerA, err := DialKCP(fixture.serverCAddr, "peer-a") + if err != nil { + t.Fatalf("DialKCP(peer-a) error = %v", err) + } + defer func() { _ = peerA.Close() }() + + waitFor(t, func() bool { return fixture.hubC.HasPeer("peer-a") }, "peer-a to be registered on hubC") + + if err := peerA.SendText("remote-missing", "hello"); err != nil { + t.Fatalf("peerA.SendText() error = %v", err) + } + + got, err := peerA.Receive() + if err != nil { + t.Fatalf("peerA.Receive() error = %v", err) + } + if got.Type != protocol.MessageTypeError { + t.Fatalf("got type %s, want %s", got.Type, protocol.MessageTypeError) + } + if got.From != protocol.ServerPeerID { + t.Fatalf("error from = %s, want %s", got.From, protocol.ServerPeerID) + } + if got.To != "peer-a" { + t.Fatalf("error to = %s, want peer-a", got.To) + } + if string(got.Body) != "unknown target: remote-missing" { + t.Fatalf("error body = %q, want unknown target from relayed hub", got.Body) + } + + if got := fixture.relayC.WriteCount(); got != 1 { + t.Fatalf("relayC write count = %d, want 1 for outbound relay", got) + } + if got := fixture.relayD.WriteCount(); got != 1 { + t.Fatalf("relayD write count = %d, want 1 for return error relay", got) + } +} + +func TestKCPHubRejectsOversizeRelayedMessage(t *testing.T) { + fixture := startRelayedKCPHubs(t) + defer fixture.cleanup() + + peerA, err := DialKCP(fixture.serverCAddr, "peer-a") + if err != nil { + t.Fatalf("DialKCP(peer-a) error = %v", err) + } + defer func() { _ = peerA.Close() }() + + waitFor(t, func() bool { return fixture.hubC.HasPeer("peer-a") }, "peer-a to be registered on hubC") + + body := bytes.Repeat([]byte("a"), 70*1024) + if err := peerA.SendFile("remote-peer", "payload.bin", body); err != nil { + t.Fatalf("peerA.SendFile() error = %v", err) + } + + got, err := peerA.Receive() + if err != nil { + t.Fatalf("peerA.Receive() error = %v", err) + } + if got.Type != protocol.MessageTypeError { + t.Fatalf("got type %s, want %s", got.Type, protocol.MessageTypeError) + } + if string(got.Body) != "message too large for relay udp" { + t.Fatalf("error body = %q, want oversize relay error", got.Body) + } + if got := fixture.relayC.WriteCount(); got != 0 { + t.Fatalf("relayC write count = %d, want 0 when relay rejects oversize payload", got) + } +} + func startRealKCPHubServer(t *testing.T, hub *server.KCPHub) (string, func()) { t.Helper() @@ -253,6 +435,98 @@ func startRealKCPHubServer(t *testing.T, hub *server.KCPHub) (string, func()) { return listener.Addr().String(), cleanup } +type relayedKCPHubFixture struct { + hubC *server.KCPHub + hubD *server.KCPHub + serverCAddr string + serverDAddr string + relayC *countingPacketConn + relayD *countingPacketConn + cleanup func() +} + +func startRelayedKCPHubs(t *testing.T) relayedKCPHubFixture { + t.Helper() + + hubC := server.NewKCPHub() + serverCAddr, cleanupC := startRealKCPHubServer(t, hubC) + + hubD := server.NewKCPHub() + serverDAddr, cleanupD := startRealKCPHubServer(t, hubD) + + baseRelayC, err := net.ListenPacket("udp", "127.0.0.1:0") + if err != nil { + cleanupD() + cleanupC() + t.Fatalf("ListenPacket(relayC) error = %v", err) + } + relayC := &countingPacketConn{PacketConn: baseRelayC} + + baseRelayD, err := net.ListenPacket("udp", "127.0.0.1:0") + if err != nil { + _ = relayC.Close() + cleanupD() + cleanupC() + t.Fatalf("ListenPacket(relayD) error = %v", err) + } + relayD := &countingPacketConn{PacketConn: baseRelayD} + + hubC.SetRelaySocket(relayC, relayD.LocalAddr(), false) + hubD.SetRelaySocket(relayD, relayC.LocalAddr(), false) + + stopRelayC := startRelayLoop(t, hubC, relayC) + stopRelayD := startRelayLoop(t, hubD, relayD) + + cleanup := func() { + stopRelayC() + stopRelayD() + cleanupD() + cleanupC() + } + + return relayedKCPHubFixture{ + hubC: hubC, + hubD: hubD, + serverCAddr: serverCAddr, + serverDAddr: serverDAddr, + relayC: relayC, + relayD: relayD, + cleanup: cleanup, + } +} + +func startRelayLoop(t *testing.T, hub *server.KCPHub, conn net.PacketConn) func() { + t.Helper() + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + if err := hub.ServeRelay(); err != nil && !isExpectedKCPRelayServeExit(err) { + t.Errorf("hub.ServeRelay() error = %v", err) + } + }() + + return func() { + _ = conn.Close() + wg.Wait() + } +} + +type countingPacketConn struct { + net.PacketConn + writeCount int32 +} + +func (c *countingPacketConn) WriteTo(p []byte, addr net.Addr) (int, error) { + atomic.AddInt32(&c.writeCount, 1) + return c.PacketConn.WriteTo(p, addr) +} + +func (c *countingPacketConn) WriteCount() int { + return int(atomic.LoadInt32(&c.writeCount)) +} + func isExpectedKCPHubServeExit(err error) bool { if err == nil { return true @@ -261,3 +535,12 @@ func isExpectedKCPHubServeExit(err error) bool { message := err.Error() return strings.Contains(message, "closed") || strings.Contains(message, "broken pipe") || strings.Contains(message, "io: read/write on closed pipe") } + +func isExpectedKCPRelayServeExit(err error) bool { + if err == nil { + return true + } + + message := err.Error() + return strings.Contains(message, "closed") || strings.Contains(message, "use of closed network connection") +} diff --git a/cmd/internal/server/kcp_hub.go b/cmd/internal/server/kcp_hub.go index 96f7b1d..6e9289e 100644 --- a/cmd/internal/server/kcp_hub.go +++ b/cmd/internal/server/kcp_hub.go @@ -1,7 +1,11 @@ package server import ( + "errors" "fmt" + "log" + "net" + "strings" "sync" "time" @@ -12,6 +16,15 @@ import ( "omnisocketgo/cmd/internal/transport" ) +const kcpRelayMaxDatagramSize = 60 * 1024 + +var ( + errKCPRelayUnavailable = errors.New("server: kcp relay socket is not configured") + errKCPRelayPeerUnknown = errors.New("server: kcp relay peer address is unknown") + errKCPRelayTooLarge = errors.New("server: kcp relay message too large") + errKCPUnknownLocalTarget = errors.New("server: unknown local kcp target") +) + // KCPOption 用于配置 KCPHub 的可选行为。 type KCPOption func(*KCPHub) @@ -37,6 +50,9 @@ type KCPHub struct { logger latencylog.Logger sessionStatsLogger transport.KCPSessionStatsLogger sessionStatsInterval time.Duration + relaySocket net.PacketConn + relayPeerAddr net.Addr + relayLearnPeer bool } // NewKCPHub 创建一个空的 KCP 连接中心。 @@ -54,6 +70,16 @@ func NewKCPHub(opts ...KCPOption) *KCPHub { return hub } +// SetRelaySocket 配置 KCPHub 的原始 UDP relay 信道。 +func (h *KCPHub) SetRelaySocket(conn net.PacketConn, peerAddr net.Addr, learnPeer bool) { + h.mu.Lock() + defer h.mu.Unlock() + + h.relaySocket = conn + h.relayPeerAddr = cloneRelayAddr(peerAddr) + h.relayLearnPeer = learnPeer +} + // HasPeer 返回给定 ID 是否已经注册到 hub。 func (h *KCPHub) HasPeer(peerID string) bool { h.mu.RLock() @@ -63,6 +89,48 @@ func (h *KCPHub) HasPeer(peerID string) bool { return ok } +// ServeRelay 持续从 relay UDP socket 读取消息,并尝试本地投递。 +func (h *KCPHub) ServeRelay() error { + h.mu.RLock() + conn := h.relaySocket + h.mu.RUnlock() + + if conn == nil { + return errKCPRelayUnavailable + } + + buffer := make([]byte, kcpRelayMaxDatagramSize) + for { + n, addr, err := conn.ReadFrom(buffer) + if err != nil { + if isExpectedRelayServeExit(err) { + return nil + } + return fmt.Errorf("server: relay receive packet: %w", err) + } + + if !h.acceptRelayPeer(addr) { + log.Printf("kcp relay dropped packet from unexpected peer %s", addr) + continue + } + + msg, err := protocol.DecodeMessage(buffer[:n]) + if err != nil { + log.Printf("kcp relay dropped invalid packet from %s: %v", addr, err) + continue + } + + if !isRelayBusinessOrErrorMessage(msg.Type) { + log.Printf("kcp relay dropped unsupported message type %s from %s", msg.Type, addr) + continue + } + + if err := h.deliverRelayedMessage(msg); err != nil { + log.Printf("kcp relay delivery for %s -> %s failed: %v", msg.From, msg.To, err) + } + } +} + // ServeSession 处理一条新接入的 KCP 会话。 func (h *KCPHub) ServeSession(session *kcp.UDPSession) error { conn, err := transport.NewKCPConn( @@ -118,16 +186,24 @@ func (h *KCPHub) handlePeerMessage(peerID string, conn *transport.KCPConn, msg p switch msg.Type { case protocol.MessageTypeText, protocol.MessageTypeFile: msg.From = peerID - targetConn, ok := h.lookup(msg.To) - if !ok { - return sendKCPServerError(conn, peerID, fmt.Sprintf("unknown target: %s", msg.To)) - } - if err := targetConn.Send(msg); err != nil { - h.unregister(msg.To, targetConn) - _ = targetConn.Close() + + if err := h.deliverToLocalPeer(msg); err == nil { + return nil + } else if !errors.Is(err, errKCPUnknownLocalTarget) { return sendKCPServerError(conn, peerID, fmt.Sprintf("failed to forward to %s", msg.To)) } - return nil + + err := h.forwardToRelay(msg) + switch { + case err == nil: + return nil + case errors.Is(err, errKCPRelayUnavailable): + return sendKCPServerError(conn, peerID, fmt.Sprintf("unknown target: %s", msg.To)) + case errors.Is(err, errKCPRelayTooLarge): + return sendKCPServerError(conn, peerID, "message too large for relay udp") + default: + return sendKCPServerError(conn, peerID, "failed to relay to remote peer") + } case protocol.MessageTypeRegister, protocol.MessageTypeError: if err := sendKCPServerError(conn, peerID, "registered peers can only send text or file messages"); err != nil { return fmt.Errorf("server: send kcp protocol error: %w", err) @@ -157,6 +233,88 @@ func (h *KCPHub) receivePeerLoop(peerID string, conn *transport.KCPConn) error { } } +func (h *KCPHub) deliverRelayedMessage(msg protocol.Message) error { + if err := h.deliverToLocalPeer(msg); err == nil { + return nil + } else if !errors.Is(err, errKCPUnknownLocalTarget) { + if msg.Type == protocol.MessageTypeError { + log.Printf("kcp relay dropped undeliverable server error to %s: %v", msg.To, err) + return nil + } + return h.forwardRelayServerError(msg.From, fmt.Sprintf("failed to forward to %s", msg.To)) + } + + if msg.Type == protocol.MessageTypeError { + log.Printf("kcp relay dropped server error for unknown local peer %s", msg.To) + return nil + } + + return h.forwardRelayServerError(msg.From, fmt.Sprintf("unknown target: %s", msg.To)) +} + +func (h *KCPHub) deliverToLocalPeer(msg protocol.Message) error { + targetConn, ok := h.lookup(msg.To) + if !ok { + return fmt.Errorf("%w: %s", errKCPUnknownLocalTarget, msg.To) + } + if err := targetConn.Send(msg); err != nil { + h.unregister(msg.To, targetConn) + _ = targetConn.Close() + return fmt.Errorf("server: forward to local peer %s: %w", msg.To, err) + } + return nil +} + +func (h *KCPHub) forwardToRelay(msg protocol.Message) error { + payload, err := protocol.EncodeMessage(msg) + if err != nil { + return fmt.Errorf("server: encode relay message: %w", err) + } + if len(payload) > kcpRelayMaxDatagramSize { + return errKCPRelayTooLarge + } + + h.mu.RLock() + conn := h.relaySocket + peerAddr := cloneRelayAddr(h.relayPeerAddr) + h.mu.RUnlock() + + if conn == nil { + return errKCPRelayUnavailable + } + if peerAddr == nil { + return errKCPRelayPeerUnknown + } + + if _, err := conn.WriteTo(payload, peerAddr); err != nil { + return fmt.Errorf("server: relay write to %s: %w", peerAddr, err) + } + return nil +} + +func (h *KCPHub) forwardRelayServerError(to, message string) error { + return h.forwardToRelay(protocol.Message{ + Type: protocol.MessageTypeError, + From: protocol.ServerPeerID, + To: to, + Body: []byte(message), + }) +} + +func (h *KCPHub) acceptRelayPeer(addr net.Addr) bool { + h.mu.Lock() + defer h.mu.Unlock() + + if h.relayPeerAddr == nil && h.relayLearnPeer { + h.relayPeerAddr = cloneRelayAddr(addr) + return true + } + if h.relayPeerAddr == nil { + return true + } + return sameRelayAddr(h.relayPeerAddr, addr) +} + func (h *KCPHub) lookup(peerID string) (*transport.KCPConn, bool) { h.mu.RLock() defer h.mu.RUnlock() @@ -183,3 +341,40 @@ func sendKCPServerError(conn *transport.KCPConn, to, message string) error { Body: []byte(message), }) } + +func isRelayBusinessOrErrorMessage(messageType protocol.MessageType) bool { + switch messageType { + case protocol.MessageTypeText, protocol.MessageTypeFile, protocol.MessageTypeError: + return true + default: + return false + } +} + +func isExpectedRelayServeExit(err error) bool { + return errors.Is(err, net.ErrClosed) || strings.Contains(err.Error(), "use of closed network connection") +} + +func cloneRelayAddr(addr net.Addr) net.Addr { + if addr == nil { + return nil + } + udpAddr, ok := addr.(*net.UDPAddr) + if !ok { + return addr + } + ipCopy := make([]byte, len(udpAddr.IP)) + copy(ipCopy, udpAddr.IP) + return &net.UDPAddr{ + IP: ipCopy, + Port: udpAddr.Port, + Zone: udpAddr.Zone, + } +} + +func sameRelayAddr(left, right net.Addr) bool { + if left == nil || right == nil { + return left == right + } + return left.String() == right.String() +} diff --git a/cmd/kcpserver/main.go b/cmd/kcpserver/main.go index d6c6b62..99d1956 100644 --- a/cmd/kcpserver/main.go +++ b/cmd/kcpserver/main.go @@ -3,6 +3,7 @@ package main import ( "flag" "log" + "net" "strings" kcp "github.com/xtaci/kcp-go/v5" @@ -19,6 +20,9 @@ func main() { kcpTimestampDebugLogPath := flag.String("kcp-ts-debug-log", "", "optional JSONL file path for KCP packet kernel timestamp debug records") kcpSessionStatsLogPath := flag.String("kcp-session-stats-log", "", "optional JSONL file path for KCP session stats records") kcpSessionStatsInterval := flag.String("kcp-session-stats-interval", transport.DefaultKCPSessionStatsInterval.String(), "sampling interval for KCP session stats, for example 100ms") + relayListenAddr := flag.String("relay-listen", "", "optional raw UDP relay listen address") + relayPeerAddr := flag.String("relay-peer", "", "optional fixed raw UDP relay peer address") + relayLearnPeer := flag.Bool("relay-learn-peer", false, "learn the relay peer address from the first inbound relay packet") flag.Parse() statsInterval, err := transport.ParseKCPSessionStatsInterval(*kcpSessionStatsInterval) @@ -62,6 +66,40 @@ func main() { defer listener.Close() hub := server.NewKCPHub(hubOptions...) + + if *relayPeerAddr != "" && *relayListenAddr == "" { + log.Fatal("flag -relay-listen is required when -relay-peer is set") + } + if *relayLearnPeer && *relayListenAddr == "" { + log.Fatal("flag -relay-listen is required when -relay-learn-peer is set") + } + if *relayListenAddr != "" { + relayConn, err := net.ListenPacket("udp", *relayListenAddr) + if err != nil { + log.Fatalf("listen relay udp on %s: %v", *relayListenAddr, err) + } + defer relayConn.Close() + + var relayPeer net.Addr + if *relayPeerAddr != "" { + relayPeer, err = net.ResolveUDPAddr("udp", *relayPeerAddr) + if err != nil { + log.Fatalf("resolve relay peer %s: %v", *relayPeerAddr, err) + } + } + + hub.SetRelaySocket(relayConn, relayPeer, *relayLearnPeer) + go func() { + if serveErr := hub.ServeRelay(); serveErr != nil { + log.Printf("kcp relay loop ended: %v", serveErr) + } + }() + log.Printf("kcp relay listening on %s", relayConn.LocalAddr()) + if relayPeer != nil { + log.Printf("kcp relay peer configured as %s", relayPeer) + } + } + log.Printf("kcp server listening on %s", listener.Addr()) for { diff --git a/scripts/run-kcp-batch-test.sh b/scripts/run-kcp-batch-test.sh index 5481699..676169b 100755 --- a/scripts/run-kcp-batch-test.sh +++ b/scripts/run-kcp-batch-test.sh @@ -219,6 +219,46 @@ clean_log_directories() { remove_remote_log_dir "$peerb_ssh" "$peerb_log_dir" "peer-b" } +truncate_local_file() { + local path="$1" + local dir="" + + dir="$(dirname "$path")" + mkdir -p "$dir" + : > "$path" +} + +truncate_remote_file() { + local target="$1" + local path="$2" + local script="" + + script="$(cat <<'EOF' +set -euo pipefail + +mkdir -p "$(dirname "$FILE_PATH")" +: > "$FILE_PATH" +EOF +)" + + run_remote_script "$target" "$script" "FILE_PATH=$path" +} + +reset_logs_after_probe() { + log "resetting peer logs after connectivity probe" + + rm -f "$local_peer_a_messages_log" + truncate_local_file "$local_peer_a_stdout_log" + truncate_local_file "$local_peer_a_latency_log" + truncate_local_file "$local_peer_a_ts_debug_log" + truncate_local_file "$local_peer_a_session_stats_log" + + truncate_remote_file "$peerb_ssh" "$peerb_stdout_log" + truncate_remote_file "$peerb_ssh" "$peerb_latency_log" + truncate_remote_file "$peerb_ssh" "$peerb_ts_debug_log" + truncate_remote_file "$peerb_ssh" "$peerb_session_stats_log" +} + fetch_remote_peer_b_logs() { log "copying peer-b logs from $peerb_ssh:$peerb_log_dir to $local_log_dir" copy_remote_file_to_local "$peerb_ssh:$peerb_stdout_log" "$local_peer_b_stdout_log" @@ -562,6 +602,49 @@ EOF return 1 } +probe_peer_b_to_local_peer_a() { + local marker="" + local command_line="" + local quoted_command="" + local script="" + local start_time="$SECONDS" + + marker="probe-$(date +%s)-$$" + printf -v command_line 'text peer-a %s' "$marker" + printf -v quoted_command '%q' "$command_line" + + script="$(cat <> "\$COMMAND_FILE" +EOF +)" + + log "probing peer-b -> peer-a message delivery before batch" + run_remote_script "$peerb_ssh" "$script" "COMMAND_FILE=$peerb_command_file" + + while (( SECONDS - start_time < ready_timeout )); do + if [[ -f "$local_peer_a_messages_log" ]] && grep -Fq -- "$marker" "$local_peer_a_messages_log"; then + log "peer-b -> peer-a probe succeeded" + reset_logs_after_probe + return 0 + fi + + if [[ -n "$peer_a_pid" ]] && ! kill -0 "$peer_a_pid" 2>/dev/null; then + log "local peer-a exited during connectivity probe" + dump_local_log_head "$local_peer_a_stdout_log" + return 1 + fi + + sleep 1 + done + + log "timed out waiting for peer-b -> peer-a probe delivery after ${ready_timeout}s" + dump_local_log_head "$local_peer_a_stdout_log" + dump_remote_log_head "$peerb_ssh" "$peerb_stdout_log" "peer-b" + return 1 +} + run_remote_peer_b_batch() { local script="" local batch_commands="" @@ -804,6 +887,7 @@ inbox_dir_name="${log_prefix}inbox" local_log_dir="$(join_path "$local_workdir" "$log_dir_name")" local_peer_a_inbox="$(join_path "$local_workdir" "$inbox_dir_name/peer-a")" +local_peer_a_messages_log="$(join_path "$local_peer_a_inbox" "messages.log")" local_peer_a_stdout_log="$(join_path "$local_log_dir" "peer-a.stdout.log")" local_peer_a_latency_log="$(join_path "$local_log_dir" "peer-a-kcp-latency.jsonl")" local_peer_a_ts_debug_log="$(join_path "$local_log_dir" "peer-a-kcp-packet-debug.jsonl")" @@ -846,6 +930,7 @@ start_local_peer_a start_remote_peer_b wait_for_local_peer_a_ready wait_for_remote_peer_b_ready +probe_peer_b_to_local_peer_a run_remote_peer_b_batch log "batch send completed"