diff --git a/cmd/internal/peer/udp_client_test.go b/cmd/internal/peer/udp_client_test.go index 95aa4c1..d23750c 100644 --- a/cmd/internal/peer/udp_client_test.go +++ b/cmd/internal/peer/udp_client_test.go @@ -4,15 +4,56 @@ import ( "net" "os" "path/filepath" + "sync" "testing" "time" + "omnisocketgo/cmd/internal/latencylog" "omnisocketgo/cmd/internal/protocol" "omnisocketgo/cmd/internal/server" "omnisocketgo/cmd/internal/transport" ) -// TestUDPDialAndSendText 验证 UDP 客户端可以成功连接、注册并发送文本消息。 +type recordingLatencyLogger struct { + mu sync.Mutex + events []latencylog.Event +} + +func (l *recordingLatencyLogger) LogEvent(event latencylog.Event) error { + l.mu.Lock() + defer l.mu.Unlock() + + l.events = append(l.events, event) + return nil +} + +func (l *recordingLatencyLogger) Events() []latencylog.Event { + l.mu.Lock() + defer l.mu.Unlock() + + return append([]latencylog.Event(nil), l.events...) +} + +type recordingUDPTXTimestampDebugLogger struct { + mu sync.Mutex + records []transport.TXTimestampDebugRecord +} + +func (l *recordingUDPTXTimestampDebugLogger) LogTXTimestampDebugRecord(record transport.TXTimestampDebugRecord) error { + l.mu.Lock() + defer l.mu.Unlock() + + l.records = append(l.records, record) + return nil +} + +func (l *recordingUDPTXTimestampDebugLogger) Records() []transport.TXTimestampDebugRecord { + l.mu.Lock() + defer l.mu.Unlock() + + return append([]transport.TXTimestampDebugRecord(nil), l.records...) +} + func TestUDPDialAndSendText(t *testing.T) { hubAddr := startUDPTestHub(t) @@ -28,15 +69,12 @@ func TestUDPDialAndSendText(t *testing.T) { } defer clientB.Close() - // 等待注册被处理 time.Sleep(50 * time.Millisecond) - // peer-a 发送文本给 peer-b if err := clientA.SendText("peer-b", "hello from udp"); err != nil { t.Fatalf("SendText() error = %v", err) } - // peer-b 接收 msg := receiveUDPClientMessage(t, clientB) if msg.Type != protocol.MessageTypeText { t.Fatalf("message type = %s, want text", msg.Type) @@ -46,7 +84,6 @@ func TestUDPDialAndSendText(t *testing.T) { } } -// TestUDPClientID 验证 ID() 返回正确的 peer 标识。 func TestUDPClientID(t *testing.T) { hubAddr := startUDPTestHub(t) @@ -61,7 +98,6 @@ func TestUDPClientID(t *testing.T) { } } -// TestUDPClientPersistMessage 验证 UDP 客户端可以将消息持久化到磁盘。 func TestUDPClientPersistMessage(t *testing.T) { hubAddr := startUDPTestHub(t) @@ -73,7 +109,6 @@ func TestUDPClientPersistMessage(t *testing.T) { inboxDir := t.TempDir() - // 持久化文本消息 textMsg := protocol.Message{ Type: protocol.MessageTypeText, ID: 1, @@ -90,7 +125,6 @@ func TestUDPClientPersistMessage(t *testing.T) { t.Fatalf("PersistMessage(text) returned empty path") } - // 持久化文件消息 fileMsg := protocol.Message{ Type: protocol.MessageTypeFile, ID: 2, @@ -114,7 +148,6 @@ func TestUDPClientPersistMessage(t *testing.T) { } } -// TestUDPClientSendFile 验证 UDP 客户端可以发送文件消息。 func TestUDPClientSendFile(t *testing.T) { hubAddr := startUDPTestHub(t) @@ -149,8 +182,83 @@ func TestUDPClientSendFile(t *testing.T) { } } -// startUDPTestHub 创建并启动一个测试用 UDPHub。 +func TestUDPClientBidirectionalLogsAndServerDiagnostics(t *testing.T) { + serverLogger := &recordingLatencyLogger{} + serverDebugLogger := &recordingUDPTXTimestampDebugLogger{} + hubAddr := startUDPTestHubWithOptions( + t, + server.WithUDPLogger(serverLogger), + server.WithUDPTXTimestampDebugLogger(serverDebugLogger), + ) + + clientALogger := &recordingLatencyLogger{} + clientADebugLogger := &recordingUDPTXTimestampDebugLogger{} + clientA, err := DialUDP( + hubAddr.String(), + "peer-a", + WithLogger(clientALogger), + WithTXTimestampDebugLogger(clientADebugLogger), + ) + if err != nil { + t.Fatalf("DialUDP(peer-a) error = %v", err) + } + defer clientA.Close() + + clientBLogger := &recordingLatencyLogger{} + clientBDebugLogger := &recordingUDPTXTimestampDebugLogger{} + clientB, err := DialUDP( + hubAddr.String(), + "peer-b", + WithLogger(clientBLogger), + WithTXTimestampDebugLogger(clientBDebugLogger), + ) + if err != nil { + t.Fatalf("DialUDP(peer-b) error = %v", err) + } + defer clientB.Close() + + time.Sleep(50 * time.Millisecond) + + if err := clientA.SendText("peer-b", "hello from peer-a"); err != nil { + t.Fatalf("clientA.SendText() error = %v", err) + } + msgFromA := receiveUDPClientMessage(t, clientB) + if string(msgFromA.Body) != "hello from peer-a" { + t.Fatalf("message body = %q, want %q", string(msgFromA.Body), "hello from peer-a") + } + + if err := clientB.SendText("peer-a", "hello from peer-b"); err != nil { + t.Fatalf("clientB.SendText() error = %v", err) + } + msgFromB := receiveUDPClientMessage(t, clientA) + if string(msgFromB.Body) != "hello from peer-b" { + t.Fatalf("message body = %q, want %q", string(msgFromB.Body), "hello from peer-b") + } + + assertPeerEvent(t, clientBLogger.Events(), latencylog.EventBRXSoftware, "peer-a", "peer-b", msgFromA.ID) + assertPeerEvent(t, clientBLogger.Events(), latencylog.EventBAppRecv, "peer-a", "peer-b", msgFromA.ID) + assertPeerEvent(t, clientALogger.Events(), latencylog.EventBRXSoftware, "peer-b", "peer-a", msgFromB.ID) + assertPeerEvent(t, clientALogger.Events(), latencylog.EventBAppRecv, "peer-b", "peer-a", msgFromB.ID) + + if len(clientADebugLogger.Records()) == 0 { + t.Fatal("clientA debug records = 0, want at least 1") + } + if len(clientBDebugLogger.Records()) == 0 { + t.Fatal("clientB debug records = 0, want at least 1") + } + if len(serverLogger.Events()) == 0 { + t.Fatal("server latency events = 0, want at least 1") + } + if len(serverDebugLogger.Records()) == 0 { + t.Fatal("server debug records = 0, want at least 1") + } +} + func startUDPTestHub(t *testing.T) *net.UDPAddr { + return startUDPTestHubWithOptions(t) +} + +func startUDPTestHubWithOptions(t *testing.T, opts ...server.UDPOption) *net.UDPAddr { t.Helper() addr, err := net.ResolveUDPAddr("udp", "127.0.0.1:0") @@ -163,7 +271,7 @@ func startUDPTestHub(t *testing.T) *net.UDPAddr { t.Fatalf("ListenUDP() error = %v", err) } - hub, err := server.NewUDPHub(conn) + hub, err := server.NewUDPHub(conn, opts...) if err != nil { _ = conn.Close() t.Fatalf("NewUDPHub() error = %v", err) @@ -180,7 +288,6 @@ func startUDPTestHub(t *testing.T) *net.UDPAddr { return conn.LocalAddr().(*net.UDPAddr) } -// receiveUDPClientMessage 从 UDP 客户端接收一条消息,带超时。 func receiveUDPClientMessage(t *testing.T, client *UDPClient) protocol.Message { t.Helper() @@ -207,5 +314,24 @@ func receiveUDPClientMessage(t *testing.T, client *UDPClient) protocol.Message { } } +func assertPeerEvent(t *testing.T, events []latencylog.Event, wantEvent, wantFrom, wantTo string, wantMessageID uint64) { + t.Helper() + + for _, event := range events { + if event.Event != wantEvent { + continue + } + if event.From != wantFrom || event.To != wantTo || event.MessageID != wantMessageID { + continue + } + if event.TsUnixNano <= 0 { + t.Fatalf("event %s timestamp must be positive: %+v", wantEvent, event) + } + return + } + + t.Fatalf("missing event %s from %s to %s for message %d in %+v", wantEvent, wantFrom, wantTo, wantMessageID, events) +} + // Ensure transport package is used (needed for WithTXTimestampDebugLogger option). var _ transport.TXTimestampDebugLogger = nil diff --git a/cmd/internal/server/udp_hub.go b/cmd/internal/server/udp_hub.go index 4275337..680b94d 100644 --- a/cmd/internal/server/udp_hub.go +++ b/cmd/internal/server/udp_hub.go @@ -21,16 +21,22 @@ func WithUDPLogger(logger latencylog.Logger) UDPOption { } } +// WithUDPTXTimestampDebugLogger 为 UDP hub 注入 TX errqueue 调试日志器。 +func WithUDPTXTimestampDebugLogger(logger transport.TXTimestampDebugLogger) UDPOption { + return func(hub *UDPHub) { + hub.txTimestampDebugLogger = logger + } +} + // UDPHub 管理通过 UDP 注册的 peer,并负责在它们之间转发消息。 -// 与 TCP Hub 不同,UDPHub 使用单个 net.UDPConn 与所有 peer 通信, -// 通过维护 peerID -> UDPAddr 映射表来寻址。 type UDPHub struct { mu sync.RWMutex - peers map[string]*net.UDPAddr // peerID -> 对端 UDP 地址 - addrs map[string]string // addr.String() -> peerID,用于反查 + peers map[string]*net.UDPAddr + addrs map[string]string - conn *transport.UDPConn - logger latencylog.Logger + conn *transport.UDPConn + logger latencylog.Logger + txTimestampDebugLogger transport.TXTimestampDebugLogger } // NewUDPHub 创建一个新的 UDP 连接中心。 @@ -53,28 +59,27 @@ func NewUDPHub(conn *net.UDPConn, opts ...UDPOption) (*UDPHub, error) { conn, nil, transport.WithUDPLogger(hub.logger, latencylog.NodeRoleServer, "hub"), + transport.WithUDPTXTimestampDebugLogger(hub.txTimestampDebugLogger), ) if err != nil { return nil, fmt.Errorf("server: create udp transport conn: %w", err) } hub.conn = udpConn - return hub, nil } // Serve 启动 UDP 接收主循环,持续读取消息并处理注册/转发。 -// 此方法会阻塞,直到底层连接关闭或发生不可恢复的错误。 func (h *UDPHub) Serve() error { return h.conn.ReceiveLoop(func(msg protocol.Message, addr *net.UDPAddr) error { if err := h.handleMessage(msg, addr); err != nil { log.Printf("udp hub: handle message from %s: %v", addr, err) } - return nil // 不因为单条消息处理失败而退出主循环 + return nil }) } -// HasPeer 返回给定 ID 是否已注册到 hub。 +// HasPeer 返回给定 ID 是否已经注册到 hub。 func (h *UDPHub) HasPeer(peerID string) bool { h.mu.RLock() defer h.mu.RUnlock() @@ -83,7 +88,6 @@ func (h *UDPHub) HasPeer(peerID string) bool { return ok } -// handleMessage 处理从指定地址收到的消息。 func (h *UDPHub) handleMessage(msg protocol.Message, addr *net.UDPAddr) error { switch msg.Type { case protocol.MessageTypeRegister: @@ -101,7 +105,6 @@ func (h *UDPHub) handleMessage(msg protocol.Message, addr *net.UDPAddr) error { } } -// registerPeer 处理 peer 的注册请求。 func (h *UDPHub) registerPeer(msg protocol.Message, addr *net.UDPAddr) error { peerID := msg.From if peerID == "" { @@ -111,9 +114,7 @@ func (h *UDPHub) registerPeer(msg protocol.Message, addr *net.UDPAddr) error { h.mu.Lock() defer h.mu.Unlock() - // 如果同一个 peerID 从新地址注册,更新地址映射(支持 peer 重启换端口)。 if existingAddr, exists := h.peers[peerID]; exists { - // 清理旧地址的反查映射 delete(h.addrs, existingAddr.String()) } @@ -123,26 +124,20 @@ func (h *UDPHub) registerPeer(msg protocol.Message, addr *net.UDPAddr) error { return nil } -// forwardMessage 转发业务消息到目标 peer。 func (h *UDPHub) forwardMessage(msg protocol.Message, senderAddr *net.UDPAddr) error { - // 通过来源地址反查发送者 peerID senderID := h.lookupPeerID(senderAddr) if senderID == "" { return h.sendErrorTo(senderAddr, msg.From, "not registered; send register first") } - // server 覆盖 From,不信任客户端自报身份 msg.From = senderID - // 查找目标 peer 地址 targetAddr := h.lookupAddr(msg.To) if targetAddr == nil { return h.sendErrorTo(senderAddr, senderID, fmt.Sprintf("unknown target: %s", msg.To)) } - // 转发消息 if err := h.conn.SendTo(msg, targetAddr); err != nil { - // 转发失败,通知发送方 _ = h.sendErrorTo(senderAddr, senderID, fmt.Sprintf("failed to forward to %s", msg.To)) return fmt.Errorf("forward to %s at %s: %w", msg.To, targetAddr, err) } @@ -166,7 +161,6 @@ func (h *UDPHub) lookupAddr(peerID string) *net.UDPAddr { return h.peers[peerID] } -// sendErrorTo 向指定地址发送错误消息。 func (h *UDPHub) sendErrorTo(addr *net.UDPAddr, to, message string) error { if to == "" { to = "unknown" diff --git a/cmd/internal/transport/udp.go b/cmd/internal/transport/udp.go index 07bba36..566d1ed 100644 --- a/cmd/internal/transport/udp.go +++ b/cmd/internal/transport/udp.go @@ -11,22 +11,32 @@ import ( ) // UDPConn 是对 UDP 连接的轻量封装。 -// server 侧:共享同一个 net.UDPConn,Send 时通过 peerAddr 指定对端地址。 -// peer 侧:独立的 net.UDPConn,已通过 Dial 连接到 server,Send 直接写即可。 +// server 侧共享一个 net.UDPConn,通过 SendTo 指定目标地址; +// peer 侧使用已连接的 net.UDPConn,直接 Send 即可。 type UDPConn struct { conn *net.UDPConn - peerAddr *net.UDPAddr // server 侧为对端地址;peer 侧为 nil(连接模式下直接 Write) - raw syscall.RawConn // 底层 syscall 句柄,用于 Linux socket timestamping + peerAddr *net.UDPAddr + raw syscall.RawConn logger latencylog.Logger txTimestampDebugLogger TXTimestampDebugLogger - nodeRole string // 日志中记录的节点角色,例如 "server" 或 "peer" - nodeID string // 日志中记录的节点 ID - writeMu sync.Mutex // 保护 Send 的互斥锁 + txPacketSeq uint32 + pendingTX map[uint32]udpTXPendingRecord + nodeRole string + nodeID string + writeMu sync.Mutex closeOnce sync.Once closeErr error } +type udpTXPendingRecord struct { + msg protocol.Message + sendCallIndex int + bytesWritten int + expectedTXID uint32 + observedTimestamps map[string]int64 +} + // UDPOption 用于为 UDPConn 注入可选行为。 type UDPOption func(*UDPConn) @@ -39,7 +49,7 @@ func WithUDPLogger(logger latencylog.Logger, nodeRole, nodeID string) UDPOption } } -// WithUDPTXTimestampDebugLogger 为 UDP 连接注入可选的 TX errqueue 调试日志器。 +// WithUDPTXTimestampDebugLogger 为 UDP 连接注入 TX errqueue 调试日志器。 func WithUDPTXTimestampDebugLogger(logger TXTimestampDebugLogger) UDPOption { return func(conn *UDPConn) { conn.txTimestampDebugLogger = logger @@ -47,13 +57,12 @@ func WithUDPTXTimestampDebugLogger(logger TXTimestampDebugLogger) UDPOption { } // NewUDPConn 创建 UDP transport 连接封装。 -// peerAddr 为 nil 时表示 peer 侧已连接模式(conn 已 Dial 到 server)。 -// peerAddr 非 nil 时表示 server 侧,Send 时需要指定目标地址。 func NewUDPConn(conn *net.UDPConn, peerAddr *net.UDPAddr, opts ...UDPOption) (*UDPConn, error) { udpConn := &UDPConn{ - conn: conn, - peerAddr: peerAddr, - logger: latencylog.NoopLogger{}, + conn: conn, + peerAddr: peerAddr, + logger: latencylog.NoopLogger{}, + pendingTX: make(map[uint32]udpTXPendingRecord), } for _, opt := range opts { @@ -72,39 +81,34 @@ func NewUDPConn(conn *net.UDPConn, peerAddr *net.UDPAddr, opts ...UDPOption) (*U } // Send 将一条协议消息编码为 UDP 数据报并发送。 -// 多个 goroutine 可以并发调用,内部会串行化写入。 func (c *UDPConn) Send(msg protocol.Message) error { c.writeMu.Lock() defer c.writeMu.Unlock() - latencylog.LogMessageEvent(c.logger, c.nodeRole, c.nodeID, latencylog.EventSendHandoffBegin, msg) + latencylog.LogMessageEvent(c.logger, c.nodeRole, c.nodeID, latencylog.EventSendHandoffBegin, msg) if err := c.sendMessageLinux(msg); err != nil { return fmt.Errorf("transport: udp send message: %w", err) } - latencylog.LogMessageEvent(c.logger, c.nodeRole, c.nodeID, latencylog.EventSendHandoffEnd, msg) return nil } // SendTo 将一条协议消息编码为 UDP 数据报并发送到指定地址。 -// 主要用于 server 侧向特定 peer 发送消息。 func (c *UDPConn) SendTo(msg protocol.Message, addr *net.UDPAddr) error { c.writeMu.Lock() defer c.writeMu.Unlock() - latencylog.LogMessageEvent(c.logger, c.nodeRole, c.nodeID, latencylog.EventSendHandoffBegin, msg) + latencylog.LogMessageEvent(c.logger, c.nodeRole, c.nodeID, latencylog.EventSendHandoffBegin, msg) if err := c.sendMessageToLinux(msg, addr); err != nil { return fmt.Errorf("transport: udp send message to %s: %w", addr, err) } - latencylog.LogMessageEvent(c.logger, c.nodeRole, c.nodeID, latencylog.EventSendHandoffEnd, msg) return nil } // Receive 从 UDP 连接读取一条完整协议消息。 -// 返回解码后的消息和来源地址(peer 侧来源地址始终为 server 地址)。 func (c *UDPConn) Receive() (protocol.Message, *net.UDPAddr, error) { msg, addr, err := c.receiveMessageLinux() if err != nil { @@ -115,7 +119,6 @@ func (c *UDPConn) Receive() (protocol.Message, *net.UDPAddr, error) { } // ReceiveLoop 持续从 UDP 连接读取消息并交给 handler 处理。 -// handler 的第二个参数是消息来源地址。 func (c *UDPConn) ReceiveLoop(handler func(protocol.Message, *net.UDPAddr) error) error { for { msg, addr, err := c.Receive() @@ -130,8 +133,6 @@ func (c *UDPConn) ReceiveLoop(handler func(protocol.Message, *net.UDPAddr) error } // Close 关闭底层 UDP 连接,保证重复调用安全。 -// 注意:server 侧多个 UDPConn 共享同一个 net.UDPConn 时, -// 只应由 UDPHub 负责关闭底层连接,不应通过此方法关闭。 func (c *UDPConn) Close() error { c.closeOnce.Do(func() { c.closeErr = c.conn.Close() diff --git a/cmd/internal/transport/udp_linux.go b/cmd/internal/transport/udp_linux.go index 7271594..0222a61 100644 --- a/cmd/internal/transport/udp_linux.go +++ b/cmd/internal/transport/udp_linux.go @@ -13,7 +13,7 @@ import ( "omnisocketgo/cmd/internal/protocol" ) -// UDP 接收缓冲区大小,足以容纳 MaxFrameSize 加上协议头。 +// UDP 接收缓冲区需要容纳完整 datagram 和协议头。 const udpReceiveBufferSize = protocol.MaxFrameSize + 1024 // initUDPLinuxTimestamping 拿到底层 fd,并打开 Linux timestamping。 @@ -26,7 +26,6 @@ func (c *UDPConn) initUDPLinuxTimestamping() error { return fmt.Errorf("transport: udp missing syscall conn") } - // UDP 不需要 OPT_ID_TCP,使用标准的 OPT_ID 即可。 flagCandidates := []int{ linuxSOFTimestampingTXSched | linuxSOFTimestampingTXSoftware | @@ -68,21 +67,7 @@ func (c *UDPConn) sendMessageLinux(msg protocol.Message) error { return fmt.Errorf("protocol: encode message: %w", err) } - readIndex := 0 - c.drainPendingUDPTXTimestampEvents(msg, linuxTXTimestampPhasePreSendDrain, &readIndex) - - if c.peerAddr != nil { - if err := c.udpSendTo(payload, c.peerAddr); err != nil { - return err - } - } else { - if err := c.udpSend(payload); err != nil { - return err - } - } - - c.collectAndLogUDPTXTimestampEvents(msg, &readIndex) - return nil + return c.sendUDPPayloadLinux(msg, payload, c.peerAddr) } // sendMessageToLinux 编码消息并通过 UDP 发送到指定地址,采集 TX 时间戳。 @@ -92,22 +77,65 @@ func (c *UDPConn) sendMessageToLinux(msg protocol.Message, addr *net.UDPAddr) er return fmt.Errorf("protocol: encode message: %w", err) } - readIndex := 0 - c.drainPendingUDPTXTimestampEvents(msg, linuxTXTimestampPhasePreSendDrain, &readIndex) + return c.sendUDPPayloadLinux(msg, payload, addr) +} - if err := c.udpSendTo(payload, addr); err != nil { +func (c *UDPConn) sendUDPPayloadLinux(msg protocol.Message, payload []byte, addr *net.UDPAddr) error { + readIndex := 0 + + // pre-send drain 可能读到上一条消息晚到的 errqueue 事件, + // 这里必须先清掉,并且按 ee_data 归还给原消息,不能污染当前消息。 + c.drainPendingUDPTXTimestampEvents(linuxTXTimestampPhasePreSendDrain, &readIndex) + + chunk := c.newUDPSendChunk(len(payload)) + var err error + if addr != nil { + err = c.udpSendTo(payload, addr) + } else { + err = c.udpSend(payload) + } + if err != nil { return err } - c.collectAndLogUDPTXTimestampEvents(msg, &readIndex) + c.commitUDPSend(msg, chunk) + c.collectAndLogUDPTXTimestampEvents(msg, chunk, &readIndex) return nil } +func (c *UDPConn) newUDPSendChunk(payloadLen int) txSendChunk { + return txSendChunk{ + SendCallIndex: 0, + FrameOffsetStart: 0, + FrameOffsetEnd: payloadLen - 1, + BytesWritten: payloadLen, + ExpectedTXID: c.txPacketSeq, + } +} + +func (c *UDPConn) commitUDPSend(msg protocol.Message, chunk txSendChunk) { + // Linux 对 UDP datagram 的 ee_data 是按包递增的 ID。 + // 这里把 ID 和原始消息元数据绑定起来,后续 drain 到的晚到事件才能记回原消息。 + if c.txTimestampDebugLogger != nil { + c.pendingTX[chunk.ExpectedTXID] = udpTXPendingRecord{ + msg: msg, + sendCallIndex: chunk.SendCallIndex, + bytesWritten: chunk.BytesWritten, + expectedTXID: chunk.ExpectedTXID, + observedTimestamps: make(map[string]int64, 2), + } + c.logUDPTXSendChunkDebugRecord(msg, chunk) + } + + c.txPacketSeq++ +} + // udpSend 通过已连接的 UDP socket 发送数据。 func (c *UDPConn) udpSend(payload []byte) error { if c.raw != nil { return c.udpSendmsgRaw(payload, nil) } + _, err := c.conn.Write(payload) return err } @@ -120,6 +148,7 @@ func (c *UDPConn) udpSendTo(payload []byte, addr *net.UDPAddr) error { return c.udpSendmsgRaw(payload, sa) } } + _, err := c.conn.WriteToUDP(payload, addr) return err } @@ -213,14 +242,13 @@ func (c *UDPConn) udpRecvmsgRaw() ([]byte, *net.UDPAddr, int64, error) { return nil, nil, 0, opErr } - addr := sockaddrToUDPAddr(from) - return buf[:n], addr, rxTimeNS, nil + return buf[:n], sockaddrToUDPAddr(from), rxTimeNS, nil } } // collectAndLogUDPTXTimestampEvents 采集并记录 UDP 发送的 TX 时间戳事件。 -func (c *UDPConn) collectAndLogUDPTXTimestampEvents(msg protocol.Message, readIndex *int) { - timestamps := c.collectUDPTXTimestampEvents(msg, readIndex) +func (c *UDPConn) collectAndLogUDPTXTimestampEvents(msg protocol.Message, chunk txSendChunk, readIndex *int) { + timestamps := c.collectUDPTXTimestampEvents(msg, chunk, readIndex) if ts, ok := timestamps[latencylog.EventATXSched]; ok { latencylog.LogMessageEventAt(c.logger, c.nodeRole, c.nodeID, latencylog.EventATXSched, ts, msg) @@ -231,13 +259,13 @@ func (c *UDPConn) collectAndLogUDPTXTimestampEvents(msg protocol.Message, readIn } // collectUDPTXTimestampEvents 在 errqueue 中等待 TX 时间戳。 -func (c *UDPConn) collectUDPTXTimestampEvents(msg protocol.Message, readIndex *int) map[string]int64 { +func (c *UDPConn) collectUDPTXTimestampEvents(msg protocol.Message, chunk txSendChunk, readIndex *int) map[string]int64 { if c.raw == nil { return nil } deadline := time.Now().Add(linuxTXTimestampWaitTimeout) - timestamps := make(map[string]int64, 2) + observed := make([]observedTXTimestampEvent, 0, 4) for time.Now().Before(deadline) { event, err := c.recvUDPTXTimestampOnce() @@ -251,25 +279,31 @@ func (c *UDPConn) collectUDPTXTimestampEvents(msg protocol.Message, readIndex *i if event.EventName == "" || event.TSUnixNano <= 0 { continue } - *readIndex++ - if isBusinessTXTimestampEventName(event.EventName) { - if _, exists := timestamps[event.EventName]; !exists { - timestamps[event.EventName] = event.TSUnixNano - } - } + observed = append(observed, observedTXTimestampEvent{ + Phase: linuxTXTimestampPhasePostSendCollect, + ReadIndex: *readIndex, + Event: event, + }) + c.recordUDPPendingEvent(event) + *readIndex = *readIndex + 1 - if hasCompleteTXTimestampPair(timestamps) { + selection := selectTXTimestampEvents(observed, chunk.ExpectedTXID, true) + if selection.HasEvent && selection.SelectedID == chunk.ExpectedTXID && hasCompleteTXTimestampPair(selection.Timestamps) { break } } - c.drainPendingUDPTXTimestampEvents(msg, linuxTXTimestampPhasePostSelectDrain, readIndex) - return timestamps + selection := selectTXTimestampEvents(observed, chunk.ExpectedTXID, true) + c.logObservedUDPTXTimestampEvents(msg, chunk, observed, selection) + c.releaseCompletedUDPPendingFromObserved(observed) + c.drainPendingUDPTXTimestampEvents(linuxTXTimestampPhasePostSelectDrain, readIndex) + c.releaseCompletedUDPPending(chunk.ExpectedTXID) + return selection.Timestamps } // drainPendingUDPTXTimestampEvents 清空 errqueue 中残留的时间戳事件。 -func (c *UDPConn) drainPendingUDPTXTimestampEvents(msg protocol.Message, phase string, readIndex *int) { +func (c *UDPConn) drainPendingUDPTXTimestampEvents(phase string, readIndex *int) { if c.raw == nil { return } @@ -277,12 +311,27 @@ func (c *UDPConn) drainPendingUDPTXTimestampEvents(msg protocol.Message, phase s for { event, err := c.recvUDPTXTimestampOnce() if err != nil { + if isWouldBlock(err) { + return + } return } if event.EventName == "" || event.TSUnixNano <= 0 { continue } - *readIndex++ + + complete := c.recordUDPPendingEvent(event) + if msg, chunks, ok := c.lookupUDPPendingDebugContext(event.EEData); ok { + c.logUDPTXErrqueueDebugRecord(msg, chunks, observedTXTimestampEvent{ + Phase: phase, + ReadIndex: *readIndex, + Event: event, + }, false) + if complete { + delete(c.pendingTX, event.EEData) + } + } + *readIndex = *readIndex + 1 } } @@ -312,6 +361,155 @@ func (c *UDPConn) recvUDPTXTimestampOnce() (txTimestampEvent, error) { return event, nil } +func (c *UDPConn) recordUDPPendingEvent(event txTimestampEvent) bool { + if !isBusinessTXTimestampEventName(event.EventName) { + return false + } + + record, ok := c.pendingTX[event.EEData] + if !ok { + return false + } + if record.observedTimestamps == nil { + record.observedTimestamps = make(map[string]int64, 2) + } + if existing, exists := record.observedTimestamps[event.EventName]; !exists || event.TSUnixNano < existing { + record.observedTimestamps[event.EventName] = event.TSUnixNano + } + c.pendingTX[event.EEData] = record + + return hasCompleteTXTimestampPair(record.observedTimestamps) +} + +func (c *UDPConn) releaseCompletedUDPPending(txID uint32) { + record, ok := c.pendingTX[txID] + if !ok { + return + } + if hasCompleteTXTimestampPair(record.observedTimestamps) { + delete(c.pendingTX, txID) + } +} + +func (c *UDPConn) releaseCompletedUDPPendingFromObserved(observed []observedTXTimestampEvent) { + seen := make(map[uint32]struct{}, len(observed)) + for _, entry := range observed { + if _, ok := seen[entry.Event.EEData]; ok { + continue + } + c.releaseCompletedUDPPending(entry.Event.EEData) + seen[entry.Event.EEData] = struct{}{} + } +} + +func (c *UDPConn) lookupUDPPendingDebugContext(txID uint32) (protocol.Message, []txSendChunk, bool) { + record, ok := c.pendingTX[txID] + if !ok { + return protocol.Message{}, nil, false + } + + chunk := txSendChunk{ + SendCallIndex: record.sendCallIndex, + FrameOffsetStart: 0, + FrameOffsetEnd: record.bytesWritten - 1, + BytesWritten: record.bytesWritten, + ExpectedTXID: record.expectedTXID, + } + return record.msg, []txSendChunk{chunk}, true +} + +func (c *UDPConn) logObservedUDPTXTimestampEvents(msg protocol.Message, chunk txSendChunk, observed []observedTXTimestampEvent, selection txTimestampSelection) { + if len(observed) == 0 { + return + } + + currentChunks := []txSendChunk{chunk} + for _, entry := range observed { + recordMsg := msg + recordChunks := currentChunks + if pendingMsg, pendingChunks, ok := c.lookupUDPPendingDebugContext(entry.Event.EEData); ok { + recordMsg = pendingMsg + recordChunks = pendingChunks + } + + // 理想情况应命中本次 expectedTXID;如果等待窗口里只看到了更高的 ee_data, + // 就退回到本轮实际观察到的最新事件,至少保留调试和定位线索。 + selected := selection.HasEvent && + entry.Event.EEData == selection.SelectedID && + isBusinessTXTimestampEventName(entry.Event.EventName) && + selection.Timestamps[entry.Event.EventName] == entry.Event.TSUnixNano + c.logUDPTXErrqueueDebugRecord(recordMsg, recordChunks, entry, selected) + } +} + +func (c *UDPConn) logUDPTXSendChunkDebugRecord(msg protocol.Message, chunk txSendChunk) { + if c.txTimestampDebugLogger == nil { + return + } + + sendCallIndex := chunk.SendCallIndex + frameOffsetStart := chunk.FrameOffsetStart + frameOffsetEnd := chunk.FrameOffsetEnd + bytesWritten := chunk.BytesWritten + expectedTXID := chunk.ExpectedTXID + + record := c.newUDPTXTimestampDebugRecord(msg) + record.RecordType = txTimestampDebugRecordTypeSendChunk + record.SendCallIndex = &sendCallIndex + record.FrameOffsetStart = &frameOffsetStart + record.FrameOffsetEnd = &frameOffsetEnd + record.BytesWritten = &bytesWritten + record.ExpectedTXID = &expectedTXID + c.logUDPTXTimestampDebugRecord(record) +} + +func (c *UDPConn) logUDPTXErrqueueDebugRecord(msg protocol.Message, chunks []txSendChunk, observed observedTXTimestampEvent, selected bool) { + if c.txTimestampDebugLogger == nil { + return + } + + readIndex := observed.ReadIndex + tsUnixNano := observed.Event.TSUnixNano + eeInfo := observed.Event.EEInfo + eeData := observed.Event.EEData + selectedForLatency := selected + + record := c.newUDPTXTimestampDebugRecord(msg) + record.RecordType = txTimestampDebugRecordTypeErrqueueEvent + record.Phase = observed.Phase + record.ReadIndex = &readIndex + record.EventName = observed.Event.EventName + record.TSUnixNano = &tsUnixNano + record.EEInfo = &eeInfo + record.EEData = &eeData + record.SelectedForLatency = &selectedForLatency + if matchedSendCallIndex, ok := matchTXTimestampEventToSendChunk(observed.Event.EEData, chunks); ok { + record.MatchedSendCallIndex = &matchedSendCallIndex + } + c.logUDPTXTimestampDebugRecord(record) +} + +func (c *UDPConn) newUDPTXTimestampDebugRecord(msg protocol.Message) TXTimestampDebugRecord { + return TXTimestampDebugRecord{ + NodeRole: c.nodeRole, + NodeID: c.nodeID, + MessageType: msg.Type, + MessageID: msg.ID, + From: msg.From, + To: msg.To, + FileName: msg.FileName, + BodySize: len(msg.Body), + } +} + +func (c *UDPConn) logUDPTXTimestampDebugRecord(record TXTimestampDebugRecord) { + if c.txTimestampDebugLogger == nil { + return + } + + _ = c.txTimestampDebugLogger.LogTXTimestampDebugRecord(record) +} + // udpAddrToSockaddr 将 net.UDPAddr 转换为 syscall.Sockaddr。 func udpAddrToSockaddr(addr *net.UDPAddr) syscall.Sockaddr { if ip4 := addr.IP.To4(); ip4 != nil { diff --git a/cmd/internal/transport/udp_linux_test.go b/cmd/internal/transport/udp_linux_test.go index 71c3496..1b1d33e 100644 --- a/cmd/internal/transport/udp_linux_test.go +++ b/cmd/internal/transport/udp_linux_test.go @@ -11,7 +11,6 @@ import ( "omnisocketgo/cmd/internal/protocol" ) -// TestUDPLinuxTimestampingRecordsKernelEvents 验证 UDP 在 Linux 上能正确采集内核时间戳。 func TestUDPLinuxTimestampingRecordsKernelEvents(t *testing.T) { tests := []struct { name string @@ -45,7 +44,6 @@ func TestUDPLinuxTimestampingRecordsKernelEvents(t *testing.T) { senderLogger := &recordingLogger{} receiverLogger := &recordingLogger{} - // 创建 server 侧监听 serverAddr, err := net.ResolveUDPAddr("udp", "127.0.0.1:0") if err != nil { t.Fatalf("ResolveUDPAddr() error = %v", err) @@ -65,7 +63,6 @@ func TestUDPLinuxTimestampingRecordsKernelEvents(t *testing.T) { } t.Cleanup(func() { _ = receiver.Close() }) - // 创建 peer 侧连接 peerRaw, err := net.DialUDP("udp", nil, serverRaw.LocalAddr().(*net.UDPAddr)) if err != nil { t.Fatalf("DialUDP() error = %v", err) @@ -103,3 +100,149 @@ func TestUDPLinuxTimestampingRecordsKernelEvents(t *testing.T) { }) } } + +func TestUDPLinuxTimestampingDebugLoggerCapturesDatagramAndErrqueueEvents(t *testing.T) { + debugLogger := &recordingTXTimestampDebugLogger{} + senderLogger := &recordingLogger{} + receiverLogger := &recordingLogger{} + + serverAddr, err := net.ResolveUDPAddr("udp", "127.0.0.1:0") + if err != nil { + t.Fatalf("ResolveUDPAddr() error = %v", err) + } + serverRaw, err := net.ListenUDP("udp", serverAddr) + if err != nil { + t.Fatalf("ListenUDP() error = %v", err) + } + receiver, err := NewUDPConn( + serverRaw, + nil, + WithUDPLogger(receiverLogger, latencylog.NodeRolePeer, "peer-b"), + ) + if err != nil { + _ = serverRaw.Close() + t.Fatalf("NewUDPConn(receiver) error = %v", err) + } + t.Cleanup(func() { _ = receiver.Close() }) + + peerRaw, err := net.DialUDP("udp", nil, serverRaw.LocalAddr().(*net.UDPAddr)) + if err != nil { + t.Fatalf("DialUDP() error = %v", err) + } + sender, err := NewUDPConn( + peerRaw, + nil, + WithUDPLogger(senderLogger, latencylog.NodeRolePeer, "peer-a"), + WithUDPTXTimestampDebugLogger(debugLogger), + ) + if err != nil { + _ = peerRaw.Close() + t.Fatalf("NewUDPConn(sender) error = %v", err) + } + t.Cleanup(func() { _ = sender.Close() }) + + msg := protocol.Message{ + Type: protocol.MessageTypeText, + ID: 99, + From: "peer-a", + To: "peer-b", + Body: []byte("hello udp debug"), + } + + sendErr := make(chan error, 1) + go func() { + sendErr <- sender.Send(msg) + }() + + got, _, err := receiver.Receive() + if err != nil { + t.Fatalf("Receive() error = %v", err) + } + if err := <-sendErr; err != nil { + t.Fatalf("Send() error = %v", err) + } + if !reflect.DeepEqual(got, msg) { + t.Fatalf("message mismatch: got %+v want %+v", got, msg) + } + + assertHasEvent(t, senderLogger.Events(), latencylog.EventATXSched, msg.ID) + assertHasEvent(t, senderLogger.Events(), latencylog.EventATXSoftware, msg.ID) + assertHasEvent(t, receiverLogger.Events(), latencylog.EventBRXSoftware, msg.ID) + + sendChunkRecords := debugRecordsByType(debugLogger.Records(), txTimestampDebugRecordTypeSendChunk) + errqueueRecords := debugRecordsByType(debugLogger.Records(), txTimestampDebugRecordTypeErrqueueEvent) + if len(sendChunkRecords) == 0 { + t.Fatal("send_chunk debug records = 0, want at least 1") + } + if len(errqueueRecords) == 0 { + t.Fatal("errqueue_event debug records = 0, want at least 1") + } + + finalChunkRecord := sendChunkRecords[len(sendChunkRecords)-1] + if finalChunkRecord.ExpectedTXID == nil { + t.Fatal("final send_chunk expected_tx_id = nil, want non-nil") + } + finalExpectedTXID := *finalChunkRecord.ExpectedTXID + + selectedRecords := selectedErrqueueRecords(errqueueRecords) + if len(selectedRecords) == 0 { + t.Fatal("selected errqueue debug records = 0, want at least 1") + } + + highestObservedID := uint32(0) + haveHighestObservedID := false + haveExactFinalID := false + for _, record := range errqueueRecords { + if record.EEData == nil { + continue + } + if !haveHighestObservedID || *record.EEData > highestObservedID { + highestObservedID = *record.EEData + haveHighestObservedID = true + } + if *record.EEData == finalExpectedTXID && isBusinessTXTimestampRecord(record) { + haveExactFinalID = true + } + } + if !haveHighestObservedID { + t.Fatal("highestObservedID missing, want at least one ee_data") + } + + wantSelectedID := highestObservedID + if haveExactFinalID { + wantSelectedID = finalExpectedTXID + } + for _, record := range selectedRecords { + if record.EEData == nil { + t.Fatalf("selected record missing ee_data: %+v", record) + } + if *record.EEData != wantSelectedID { + t.Fatalf("selected ee_data = %d, want %d", *record.EEData, wantSelectedID) + } + } + + selectedByEventName := make(map[string]int64, len(selectedRecords)) + for _, record := range selectedRecords { + if record.TSUnixNano == nil { + t.Fatalf("selected record missing timestamp: %+v", record) + } + selectedByEventName[record.EventName] = *record.TSUnixNano + } + + senderEventsByName := make(map[string]int64) + for _, event := range senderLogger.Events() { + if event.MessageID != msg.ID { + continue + } + if !isBusinessTXTimestampEventName(event.Event) { + continue + } + senderEventsByName[event.Event] = event.TsUnixNano + } + + for eventName, selectedTS := range selectedByEventName { + if senderEventsByName[eventName] != selectedTS { + t.Fatalf("sender latency event %s = %d, want %d from selected debug record", eventName, senderEventsByName[eventName], selectedTS) + } + } +} diff --git a/cmd/udpserver/main.go b/cmd/udpserver/main.go index 5a938e0..01bb347 100644 --- a/cmd/udpserver/main.go +++ b/cmd/udpserver/main.go @@ -7,14 +7,16 @@ import ( "omnisocketgo/cmd/internal/latencylog" "omnisocketgo/cmd/internal/server" + "omnisocketgo/cmd/internal/transport" ) func main() { listenAddr := flag.String("listen", ":9001", "UDP server listen address") logPath := flag.String("latency-log", "", "optional JSONL file path for latency timestamp logs") + txTimestampDebugLogPath := flag.String("tx-ts-debug-log", "", "optional JSONL file path for TX errqueue debug records") flag.Parse() - hubOptions := make([]server.UDPOption, 0, 1) + hubOptions := make([]server.UDPOption, 0, 2) if *logPath != "" { logger, err := latencylog.NewJSONLLogger(*logPath) if err != nil { @@ -23,6 +25,14 @@ func main() { defer logger.Close() hubOptions = append(hubOptions, server.WithUDPLogger(logger)) } + if *txTimestampDebugLogPath != "" { + logger, err := transport.NewJSONLTXTimestampDebugLogger(*txTimestampDebugLogPath) + if err != nil { + log.Fatalf("create tx timestamp debug logger %s: %v", *txTimestampDebugLogPath, err) + } + defer logger.Close() + hubOptions = append(hubOptions, server.WithUDPTXTimestampDebugLogger(logger)) + } udpAddr, err := net.ResolveUDPAddr("udp", *listenAddr) if err != nil {