diff --git a/.gitignore b/.gitignore index 275b936..1b82a02 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ bin/* inbox/* +*.jsonl \ No newline at end of file diff --git a/cmd/internal/peer/client.go b/cmd/internal/peer/client.go index 8ab0dd8..da6c488 100644 --- a/cmd/internal/peer/client.go +++ b/cmd/internal/peer/client.go @@ -16,9 +16,10 @@ import ( var dialServer = dialServerWithOptions type clientOptions struct { - logger latencylog.Logger - bindIP string - bindDevice string + logger latencylog.Logger + txTimestampDebugLogger transport.TXTimestampDebugLogger + bindIP string + bindDevice string } // Option 用于配置 Client 的可选行为,例如时延日志。 @@ -31,6 +32,13 @@ func WithLogger(logger latencylog.Logger) Option { } } +// WithTXTimestampDebugLogger 为 client 注入 TX errqueue 调试日志器。 +func WithTXTimestampDebugLogger(logger transport.TXTimestampDebugLogger) Option { + return func(options *clientOptions) { + options.txTimestampDebugLogger = logger + } +} + // WithBindIP 指定拨号时使用的本地源 IP。 func WithBindIP(ip string) Option { return func(options *clientOptions) { @@ -74,6 +82,7 @@ func Dial(serverAddr, peerID string, opts ...Option) (*Client, error) { conn, err := transport.NewTCPConn( rawConn, transport.WithLogger(options.logger, latencylog.NodeRolePeer, peerID), + transport.WithTXTimestampDebugLogger(options.txTimestampDebugLogger), ) if err != nil { _ = rawConn.Close() diff --git a/cmd/internal/transport/tcp.go b/cmd/internal/transport/tcp.go index d0a0f8b..6073ca6 100644 --- a/cmd/internal/transport/tcp.go +++ b/cmd/internal/transport/tcp.go @@ -19,12 +19,14 @@ type TCPConn struct { conn net.Conn raw syscall.RawConn // 连接对应的底层 syscall 句柄,用于 Linux socket timestamping 收发。 - logger latencylog.Logger - nodeRole string // 日志中记录的节点角色,例如 "server" 或 "peer" - nodeID string // 日志中记录的节点 ID,例如 peer 的 ID 或 server 的 "hub" - writeMu sync.Mutex // 保护 Send 方法的互斥锁,确保同一时刻只有一条完整协议消息被写入连接,防止多条消息字节交叉 - closeOnce sync.Once // 保护 Close 方法的 sync.Once,确保连接只被关闭一次 - closeErr error // 连接关闭时的错误,如果连接成功关闭则为 nil,重复调用 Close 时会返回同样的错误 + logger latencylog.Logger + txTimestampDebugLogger TXTimestampDebugLogger + nodeRole string // 日志中记录的节点角色,例如 "server" 或 "peer" + nodeID string // 日志中记录的节点 ID,例如 peer 的 ID 或 server 的 "hub" + writeMu sync.Mutex // 保护 Send 方法的互斥锁,确保同一时刻只有一条完整协议消息被写入连接,防止多条消息字节交叉 + txWriteSeq uint32 // Linux TX timestamp OPT_ID_TCP 的本地镜像,按成功写出的字节推进。 + closeOnce sync.Once // 保护 Close 方法的 sync.Once,确保连接只被关闭一次 + closeErr error // 连接关闭时的错误,如果连接成功关闭则为 nil,重复调用 Close 时会返回同样的错误 } // Option 用于为 TCPConn 注入可选行为,例如时延日志。 @@ -39,6 +41,13 @@ func WithLogger(logger latencylog.Logger, nodeRole, nodeID string) Option { } } +// WithTXTimestampDebugLogger 为连接注入可选的 TX errqueue 调试日志器。 +func WithTXTimestampDebugLogger(logger TXTimestampDebugLogger) Option { + return func(conn *TCPConn) { + conn.txTimestampDebugLogger = logger + } +} + // NewTCPConn 用已有的 net.Conn 创建 transport 连接封装。 func NewTCPConn(conn net.Conn, opts ...Option) (*TCPConn, error) { tcpConn := &TCPConn{ diff --git a/cmd/internal/transport/tcp_linux.go b/cmd/internal/transport/tcp_linux.go index 7dc2fe6..e753791 100644 --- a/cmd/internal/transport/tcp_linux.go +++ b/cmd/internal/transport/tcp_linux.go @@ -33,8 +33,44 @@ const ( linuxSOFTimestampingTXSched = 1 << 8 // 打开 TX sched timestamp。 linuxSOFTimestampingOptTSONLY = 1 << 11 // 只回时间戳。 linuxSOFTimestampingOptIDTCP = 1 << 16 // 让 TCP 也带 timestamp ID。 + + linuxTXTimestampPhasePreSendDrain = "pre_send_drain" + linuxTXTimestampPhasePostSendCollect = "post_send_collect" + linuxTXTimestampPhasePostSelectDrain = "post_select_drain" ) +type txSendChunk struct { + SendCallIndex int + FrameOffsetStart int + FrameOffsetEnd int // 包含当前 sendmsg 成功写出的最后一个 frame 字节偏移。 + BytesWritten int + ExpectedTXID uint32 +} + +type txTimestampEvent struct { + EventName string + TSUnixNano int64 + EEInfo uint32 + EEData uint32 +} + +type observedTXTimestampEvent struct { + Phase string + ReadIndex int + Event txTimestampEvent +} + +type txTimestampSelection struct { + SelectedID uint32 + Timestamps map[string]int64 + HasEvent bool +} + +type socketExtendedErrInfo struct { + Info uint32 + Data uint32 +} + // 拿到底层 fd,并打开 Linux timestamping。 func (c *TCPConn) initLinuxTimestamping() error { sysConn, ok := c.conn.(interface { @@ -116,39 +152,56 @@ func (c *TCPConn) sendMessageLinux(msg protocol.Message) error { binary.BigEndian.PutUint32(frame[:4], uint32(len(payload))) copy(frame[4:], payload) - if err := c.writeFrameLinux(frame); err != nil { + readIndex := 0 + c.drainPendingTXTimestampEvents(msg, nil, linuxTXTimestampPhasePreSendDrain, &readIndex) + + chunks, err := c.writeFrameLinux(frame, msg) + if err != nil { return fmt.Errorf("protocol: write frame: %w", err) } //记录发送延时日志 - c.logTXTimestampEvents(msg) + c.logTXTimestampEvents(msg, chunks, &readIndex) return nil } // writeFrameLinux 用 sendmsg 写完整帧。 -func (c *TCPConn) writeFrameLinux(frame []byte) error { +func (c *TCPConn) writeFrameLinux(frame []byte, msg protocol.Message) ([]txSendChunk, error) { written := 0 + sendCallIndex := 0 + chunks := make([]txSendChunk, 0, 1) for written < len(frame) { n, err := c.sendmsgDataOnce(frame[written:]) switch { case err == nil: if n <= 0 { - return io.ErrShortWrite + return nil, io.ErrShortWrite } + chunk := txSendChunk{ + SendCallIndex: sendCallIndex, + FrameOffsetStart: written, + FrameOffsetEnd: written + n - 1, + BytesWritten: n, + ExpectedTXID: c.txWriteSeq + uint32(n) - 1, + } + c.txWriteSeq += uint32(n) + chunks = append(chunks, chunk) + c.logTXSendChunkDebugRecord(msg, chunk) + sendCallIndex++ written += n case isWouldBlock(err): time.Sleep(linuxDataPollInterval) default: - return err + return nil, err } } - return nil + return chunks, nil } // 把 A_TX_SCHED / A_TX_SOFTWARE 写入日志。(发送过程中) -func (c *TCPConn) logTXTimestampEvents(msg protocol.Message) { - timestamps := c.collectTXTimestampEvents() +func (c *TCPConn) logTXTimestampEvents(msg protocol.Message, chunks []txSendChunk, readIndex *int) { + timestamps := c.collectTXTimestampEvents(msg, chunks, readIndex) if ts, ok := timestamps[latencylog.EventATXSched]; ok { latencylog.LogMessageEventAt(c.logger, c.nodeRole, c.nodeID, latencylog.EventATXSched, ts, msg) @@ -159,14 +212,14 @@ func (c *TCPConn) logTXTimestampEvents(msg protocol.Message) { } // 在 errqueue 里等两类 TX 时间戳。 -func (c *TCPConn) collectTXTimestampEvents() map[string]int64 { - timestamps := make(map[string]int64, 2) - //设置合理等待上限 +func (c *TCPConn) collectTXTimestampEvents(msg protocol.Message, chunks []txSendChunk, readIndex *int) map[string]int64 { deadline := time.Now().Add(linuxTXTimestampWaitTimeout) + observed := make([]observedTXTimestampEvent, 0, 4) + finalChunk, hasFinalChunk := finalTXSendChunk(chunks) - //轮询 errqueue 直到拿到两类时间戳,或超时,或遇到非 EAGAIN 错误。 - for len(timestamps) < 2 && time.Now().Before(deadline) { - eventName, ts, err := c.recvTXTimestampOnce() + // 轮询 errqueue,优先等待本次消息最后一个 sendmsg chunk 的 sched/software 两类时间戳。 + for time.Now().Before(deadline) { + event, err := c.recvTXTimestampOnce() if err != nil { if isWouldBlock(err) { time.Sleep(linuxTXTimestampPollInterval) @@ -174,23 +227,33 @@ func (c *TCPConn) collectTXTimestampEvents() map[string]int64 { } break } - if eventName == "" || ts <= 0 { + if event.EventName == "" || event.TSUnixNano <= 0 { continue } - if _, exists := timestamps[eventName]; !exists { - timestamps[eventName] = ts + observed = append(observed, observedTXTimestampEvent{ + Phase: linuxTXTimestampPhasePostSendCollect, + ReadIndex: *readIndex, + Event: event, + }) + *readIndex = *readIndex + 1 + + selection := selectTXTimestampEvents(observed, finalChunk.ExpectedTXID, hasFinalChunk) + if hasFinalChunk && selection.HasEvent && selection.SelectedID == finalChunk.ExpectedTXID && hasCompleteTXTimestampPair(selection.Timestamps) { + break } } - return timestamps + selection := selectTXTimestampEvents(observed, finalChunk.ExpectedTXID, hasFinalChunk) + c.logObservedTXTimestampEvents(msg, chunks, observed, selection) + c.drainPendingTXTimestampEvents(msg, chunks, linuxTXTimestampPhasePostSelectDrain, readIndex) + return selection.Timestamps } // recvTXTimestampOnce 从 errqueue 读一次时间戳事件。 -func (c *TCPConn) recvTXTimestampOnce() (string, int64, error) { +func (c *TCPConn) recvTXTimestampOnce() (txTimestampEvent, error) { var ( - eventName string // 事件名,例如 A_TX_SCHED 或 A_TX_SOFTWARE。 - tsUnixNS int64 // 时间戳的 UnixNano 表示。 - opErr error + event txTimestampEvent + opErr error ) err := c.raw.Control(func(fd uintptr) { @@ -203,63 +266,61 @@ func (c *TCPConn) recvTXTimestampOnce() (string, int64, error) { return } //解析控制消息,看看是不是我们关心的 TX 时间戳事件,如果是就拿到事件名和时间戳。 - eventName, tsUnixNS = parseTXTimestampControlMessages(oob[:oobn]) + event, _ = parseTXTimestampControlMessages(oob[:oobn]) }) if err != nil { - return "", 0, err + return txTimestampEvent{}, err } if opErr != nil { - return "", 0, opErr + return txTimestampEvent{}, opErr } - return eventName, tsUnixNS, nil //如果成功拿到时间戳事件,eventName 会是 A_TX_SCHED 或 A_TX_SOFTWARE 之一,tsUnixNS 是对应的时间戳;如果没有拿到事件或时间戳无效,eventName 会是空字符串,tsUnixNS 会是 0。 + return event, nil } // 把底层时间戳映射成日志事件名。 -func parseTXTimestampControlMessages(oob []byte) (string, int64) { +func parseTXTimestampControlMessages(oob []byte) (txTimestampEvent, bool) { if len(oob) == 0 { - return "", 0 + return txTimestampEvent{}, false } //解析控制消息,看看是不是我们关心的 TX 时间戳事件,如果是就拿到事件名和时间戳。 controlMessages, err := syscall.ParseSocketControlMessage(oob) if err != nil { - return "", 0 + return txTimestampEvent{}, false } + return parseTXTimestampSocketControlMessages(controlMessages) +} + +func parseTXTimestampSocketControlMessages(controlMessages []syscall.SocketControlMessage) (txTimestampEvent, bool) { var ( - tsUnixNS int64 //时间戳的 UnixNano 表示。 - tsKind uint32 //extended err里,告诉我们这个时间戳是 sched 还是 software。 - hasTS bool // 是否拿到时间戳了。 - hasKind bool // 是否拿到时间戳类型了。 + event txTimestampEvent + hasTS bool + hasKind bool ) - //一个 recvmsg 可能会收到多个控制消息,循环找我们关心的时间戳事件,拿到时间戳和事件类型。 + for _, controlMessage := range controlMessages { switch { case controlMessage.Header.Level == syscall.SOL_SOCKET && controlMessage.Header.Type == linuxSCMTimestampingNew: if ts := parseSCMTimestampingData(controlMessage.Data); ts > 0 { - tsUnixNS = ts + event.TSUnixNano = ts hasTS = true } case isSocketExtendedErr(controlMessage): //判断时间戳是否进入了errqueue, if info, ok := parseSocketExtendedErrInfo(controlMessage.Data); ok { - tsKind = info //时间戳类型被内核放在 extended err 的附加信息里,解析出来。 + event.EEInfo = info.Info + event.EEData = info.Data + event.EventName = mapLinuxTXTimestampEventName(info.Info) hasKind = true } } } - if !hasTS || !hasKind { - return "", 0 + if !hasTS || !hasKind || event.EventName == "" { + return txTimestampEvent{}, false } - switch tsKind { //把内核的时间戳类型映射成日志事件名。(记录时只关心 sched 和 software 两类时间戳) - case linuxSCMTstampSched: - return latencylog.EventATXSched, tsUnixNS - case linuxSCMTstampSnd: - return latencylog.EventATXSoftware, tsUnixNS - default: - return "", 0 - } + return event, true } // 判断控制消息是否来自 socket extended err。 @@ -276,15 +337,217 @@ func isSocketExtendedErr(controlMessage syscall.SocketControlMessage) bool { } // 从 socket extended err 的数据里取 origin timestamping 信息。 -func parseSocketExtendedErrInfo(data []byte) (uint32, bool) { +func parseSocketExtendedErrInfo(data []byte) (socketExtendedErrInfo, bool) { if len(data) < 16 { - return 0, false + return socketExtendedErrInfo{}, false } if data[4] != linuxSOEEOriginTimestamping { - return 0, false + return socketExtendedErrInfo{}, false } - return binary.NativeEndian.Uint32(data[8:12]), true + return socketExtendedErrInfo{ + Info: binary.NativeEndian.Uint32(data[8:12]), + Data: binary.NativeEndian.Uint32(data[12:16]), + }, true +} + +func mapLinuxTXTimestampEventName(tsKind uint32) string { + switch tsKind { + case linuxSCMTstampSched: + return latencylog.EventATXSched + case linuxSCMTstampSnd: + return latencylog.EventATXSoftware + default: + return fmt.Sprintf("TX_TIMESTAMP_KIND_%d", tsKind) + } +} + +func finalTXSendChunk(chunks []txSendChunk) (txSendChunk, bool) { + if len(chunks) == 0 { + return txSendChunk{}, false + } + + return chunks[len(chunks)-1], true +} + +func isBusinessTXTimestampEventName(eventName string) bool { + return eventName == latencylog.EventATXSched || eventName == latencylog.EventATXSoftware +} + +func hasCompleteTXTimestampPair(timestamps map[string]int64) bool { + if len(timestamps) == 0 { + return false + } + + return timestamps[latencylog.EventATXSched] > 0 && timestamps[latencylog.EventATXSoftware] > 0 +} + +func selectTXTimestampEvents(events []observedTXTimestampEvent, expectedFinalTXID uint32, hasExpectedFinalTXID bool) txTimestampSelection { + byID := make(map[uint32]map[string]int64) + selection := txTimestampSelection{ + Timestamps: make(map[string]int64, 2), + } + + var ( + highestID uint32 + haveHighest bool + ) + for _, observed := range events { + event := observed.Event + selection.HasEvent = true + if !haveHighest || event.EEData > highestID { + highestID = event.EEData + haveHighest = true + } + + if !isBusinessTXTimestampEventName(event.EventName) { + continue + } + if _, ok := byID[event.EEData]; !ok { + byID[event.EEData] = make(map[string]int64, 2) + } + if existing, exists := byID[event.EEData][event.EventName]; !exists || event.TSUnixNano < existing { + byID[event.EEData][event.EventName] = event.TSUnixNano + } + } + + if !selection.HasEvent { + return selection + } + + switch { + case hasExpectedFinalTXID && len(byID[expectedFinalTXID]) > 0: + selection.SelectedID = expectedFinalTXID + selection.Timestamps = copyTXTimestampMap(byID[expectedFinalTXID]) + case len(byID[highestID]) > 0: + selection.SelectedID = highestID + selection.Timestamps = copyTXTimestampMap(byID[highestID]) + default: + selection.SelectedID = highestID + } + + return selection +} + +func copyTXTimestampMap(src map[string]int64) map[string]int64 { + dst := make(map[string]int64, len(src)) + for key, value := range src { + dst[key] = value + } + return dst +} + +func (c *TCPConn) drainPendingTXTimestampEvents(msg protocol.Message, chunks []txSendChunk, phase string, readIndex *int) { + for { + event, err := c.recvTXTimestampOnce() + if err != nil { + if isWouldBlock(err) { + return + } + return + } + if event.EventName == "" || event.TSUnixNano <= 0 { + continue + } + c.logTXErrqueueDebugRecord(msg, chunks, observedTXTimestampEvent{ + Phase: phase, + ReadIndex: *readIndex, + Event: event, + }, false) + *readIndex = *readIndex + 1 + } +} + +func (c *TCPConn) logObservedTXTimestampEvents(msg protocol.Message, chunks []txSendChunk, observed []observedTXTimestampEvent, selection txTimestampSelection) { + if len(observed) == 0 { + return + } + + for _, entry := range observed { + selected := selection.HasEvent && + entry.Event.EEData == selection.SelectedID && + isBusinessTXTimestampEventName(entry.Event.EventName) && + selection.Timestamps[entry.Event.EventName] == entry.Event.TSUnixNano + c.logTXErrqueueDebugRecord(msg, chunks, entry, selected) + } +} + +func (c *TCPConn) logTXSendChunkDebugRecord(msg protocol.Message, chunk txSendChunk) { + if c.txTimestampDebugLogger == nil { + return + } + + sendCallIndex := chunk.SendCallIndex + frameOffsetStart := chunk.FrameOffsetStart + frameOffsetEnd := chunk.FrameOffsetEnd + bytesWritten := chunk.BytesWritten + expectedTXID := chunk.ExpectedTXID + + record := c.newTXTimestampDebugRecord(msg) + record.RecordType = txTimestampDebugRecordTypeSendChunk + record.SendCallIndex = &sendCallIndex + record.FrameOffsetStart = &frameOffsetStart + record.FrameOffsetEnd = &frameOffsetEnd + record.BytesWritten = &bytesWritten + record.ExpectedTXID = &expectedTXID + c.logTXTimestampDebugRecord(record) +} + +func (c *TCPConn) logTXErrqueueDebugRecord(msg protocol.Message, chunks []txSendChunk, observed observedTXTimestampEvent, selected bool) { + if c.txTimestampDebugLogger == nil { + return + } + + readIndex := observed.ReadIndex + tsUnixNano := observed.Event.TSUnixNano + eeInfo := observed.Event.EEInfo + eeData := observed.Event.EEData + selectedForLatency := selected + + record := c.newTXTimestampDebugRecord(msg) + record.RecordType = txTimestampDebugRecordTypeErrqueueEvent + record.Phase = observed.Phase + record.ReadIndex = &readIndex + record.EventName = observed.Event.EventName + record.TSUnixNano = &tsUnixNano + record.EEInfo = &eeInfo + record.EEData = &eeData + record.SelectedForLatency = &selectedForLatency + if matchedSendCallIndex, ok := matchTXTimestampEventToSendChunk(observed.Event.EEData, chunks); ok { + record.MatchedSendCallIndex = &matchedSendCallIndex + } + c.logTXTimestampDebugRecord(record) +} + +func matchTXTimestampEventToSendChunk(txID uint32, chunks []txSendChunk) (int, bool) { + for _, chunk := range chunks { + if chunk.ExpectedTXID == txID { + return chunk.SendCallIndex, true + } + } + + return 0, false +} + +func (c *TCPConn) newTXTimestampDebugRecord(msg protocol.Message) TXTimestampDebugRecord { + return TXTimestampDebugRecord{ + NodeRole: c.nodeRole, + NodeID: c.nodeID, + MessageType: msg.Type, + MessageID: msg.ID, + From: msg.From, + To: msg.To, + FileName: msg.FileName, + BodySize: len(msg.Body), + } +} + +func (c *TCPConn) logTXTimestampDebugRecord(record TXTimestampDebugRecord) { + if c.txTimestampDebugLogger == nil { + return + } + + _ = c.txTimestampDebugLogger.LogTXTimestampDebugRecord(record) } // 读一条完整消息,并记录 B_RX_SOFTWARE。 diff --git a/cmd/internal/transport/tcp_linux_test.go b/cmd/internal/transport/tcp_linux_test.go index 5d99c20..ac1e46e 100644 --- a/cmd/internal/transport/tcp_linux_test.go +++ b/cmd/internal/transport/tcp_linux_test.go @@ -3,8 +3,10 @@ package transport import ( + "encoding/binary" "net" "reflect" + "syscall" "testing" "omnisocketgo/cmd/internal/latencylog" @@ -87,6 +89,235 @@ func TestLinuxTimestampingRecordsKernelEvents(t *testing.T) { } } +func TestParseTXTimestampSocketControlMessagesExtractsEventKindAndID(t *testing.T) { + tests := []struct { + name string + kind uint32 + wantEvent string + }{ + { + name: "sched", + kind: linuxSCMTstampSched, + wantEvent: latencylog.EventATXSched, + }, + { + name: "software", + kind: linuxSCMTstampSnd, + wantEvent: latencylog.EventATXSoftware, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + controlMessages := []syscall.SocketControlMessage{ + makeTimestampSocketControlMessage(1_700_000_000_123_456_789), + makeSocketExtendedErrControlMessage(tt.kind, 42), + } + + event, ok := parseTXTimestampSocketControlMessages(controlMessages) + if !ok { + t.Fatal("parseTXTimestampSocketControlMessages() ok = false, want true") + } + if event.EventName != tt.wantEvent { + t.Fatalf("event name = %q, want %q", event.EventName, tt.wantEvent) + } + if event.TSUnixNano != 1_700_000_000_123_456_789 { + t.Fatalf("timestamp = %d, want %d", event.TSUnixNano, int64(1_700_000_000_123_456_789)) + } + if event.EEInfo != tt.kind { + t.Fatalf("ee_info = %d, want %d", event.EEInfo, tt.kind) + } + if event.EEData != 42 { + t.Fatalf("ee_data = %d, want 42", event.EEData) + } + }) + } +} + +func TestSelectTXTimestampEventsPrefersExactFinalChunkID(t *testing.T) { + events := []observedTXTimestampEvent{ + {Event: txTimestampEvent{EventName: latencylog.EventATXSched, TSUnixNano: 100, EEData: 7}}, + {Event: txTimestampEvent{EventName: latencylog.EventATXSoftware, TSUnixNano: 110, EEData: 7}}, + {Event: txTimestampEvent{EventName: latencylog.EventATXSched, TSUnixNano: 200, EEData: 9}}, + {Event: txTimestampEvent{EventName: latencylog.EventATXSoftware, TSUnixNano: 210, EEData: 9}}, + } + + selection := selectTXTimestampEvents(events, 9, true) + if !selection.HasEvent { + t.Fatal("selection.HasEvent = false, want true") + } + if selection.SelectedID != 9 { + t.Fatalf("SelectedID = %d, want 9", selection.SelectedID) + } + if got := selection.Timestamps[latencylog.EventATXSched]; got != 200 { + t.Fatalf("selected sched = %d, want 200", got) + } + if got := selection.Timestamps[latencylog.EventATXSoftware]; got != 210 { + t.Fatalf("selected software = %d, want 210", got) + } +} + +func TestSelectTXTimestampEventsFallsBackToHighestObservedID(t *testing.T) { + events := []observedTXTimestampEvent{ + {Event: txTimestampEvent{EventName: latencylog.EventATXSched, TSUnixNano: 100, EEData: 11}}, + {Event: txTimestampEvent{EventName: latencylog.EventATXSoftware, TSUnixNano: 120, EEData: 11}}, + {Event: txTimestampEvent{EventName: latencylog.EventATXSched, TSUnixNano: 200, EEData: 15}}, + {Event: txTimestampEvent{EventName: latencylog.EventATXSoftware, TSUnixNano: 220, EEData: 15}}, + } + + selection := selectTXTimestampEvents(events, 20, true) + if !selection.HasEvent { + t.Fatal("selection.HasEvent = false, want true") + } + if selection.SelectedID != 15 { + t.Fatalf("SelectedID = %d, want 15", selection.SelectedID) + } + if got := selection.Timestamps[latencylog.EventATXSched]; got != 200 { + t.Fatalf("selected sched = %d, want 200", got) + } + if got := selection.Timestamps[latencylog.EventATXSoftware]; got != 220 { + t.Fatalf("selected software = %d, want 220", got) + } +} + +func TestLinuxTimestampingDebugLoggerCapturesChunkAndErrqueueEvents(t *testing.T) { + clientConn, serverConn := newTCPPair(t) + setTCPWriteBuffer(t, clientConn, 4096) + + debugLogger := &recordingTXTimestampDebugLogger{} + senderLogger := &recordingLogger{} + receiverLogger := &recordingLogger{} + sender, err := NewTCPConn( + clientConn, + WithLogger(senderLogger, latencylog.NodeRolePeer, "peer-a"), + WithTXTimestampDebugLogger(debugLogger), + ) + if err != nil { + t.Fatalf("NewTCPConn(sender) error = %v", err) + } + receiver, err := NewTCPConn( + serverConn, + WithLogger(receiverLogger, latencylog.NodeRolePeer, "peer-b"), + ) + if err != nil { + t.Fatalf("NewTCPConn(receiver) error = %v", err) + } + t.Cleanup(func() { + _ = sender.Close() + _ = receiver.Close() + }) + + body := make([]byte, 1<<20) + for i := range body { + body[i] = byte(i % 251) + } + msg := protocol.Message{ + Type: protocol.MessageTypeFile, + ID: 99, + From: "peer-a", + To: "peer-b", + FileName: "payload.bin", + Body: body, + } + + sendErr := make(chan error, 1) + go func() { + sendErr <- sender.Send(msg) + }() + + got, err := receiver.Receive() + if err != nil { + t.Fatalf("Receive() error = %v", err) + } + if err := <-sendErr; err != nil { + t.Fatalf("Send() error = %v", err) + } + if !reflect.DeepEqual(got, msg) { + t.Fatalf("message mismatch: got %+v want %+v", got, msg) + } + + assertHasEvent(t, senderLogger.Events(), latencylog.EventATXSched, msg.ID) + assertHasEvent(t, senderLogger.Events(), latencylog.EventATXSoftware, msg.ID) + assertHasEvent(t, receiverLogger.Events(), latencylog.EventBRXSoftware, msg.ID) + + sendChunkRecords := debugRecordsByType(debugLogger.Records(), txTimestampDebugRecordTypeSendChunk) + errqueueRecords := debugRecordsByType(debugLogger.Records(), txTimestampDebugRecordTypeErrqueueEvent) + if len(sendChunkRecords) == 0 { + t.Fatal("send_chunk debug records = 0, want at least 1") + } + if len(errqueueRecords) == 0 { + t.Fatal("errqueue_event debug records = 0, want at least 1") + } + + finalChunkRecord := sendChunkRecords[len(sendChunkRecords)-1] + if finalChunkRecord.ExpectedTXID == nil { + t.Fatal("final send_chunk expected_tx_id = nil, want non-nil") + } + finalExpectedTXID := *finalChunkRecord.ExpectedTXID + + selectedRecords := selectedErrqueueRecords(errqueueRecords) + if len(selectedRecords) == 0 { + t.Fatal("selected errqueue debug records = 0, want at least 1") + } + + highestObservedID := uint32(0) + haveHighestObservedID := false + haveExactFinalID := false + for _, record := range errqueueRecords { + if record.EEData == nil { + continue + } + if !haveHighestObservedID || *record.EEData > highestObservedID { + highestObservedID = *record.EEData + haveHighestObservedID = true + } + if *record.EEData == finalExpectedTXID && isBusinessTXTimestampRecord(record) { + haveExactFinalID = true + } + } + if !haveHighestObservedID { + t.Fatal("highestObservedID missing, want at least one ee_data") + } + + wantSelectedID := highestObservedID + if haveExactFinalID { + wantSelectedID = finalExpectedTXID + } + for _, record := range selectedRecords { + if record.EEData == nil { + t.Fatalf("selected record missing ee_data: %+v", record) + } + if *record.EEData != wantSelectedID { + t.Fatalf("selected ee_data = %d, want %d", *record.EEData, wantSelectedID) + } + } + + selectedByEventName := make(map[string]int64, len(selectedRecords)) + for _, record := range selectedRecords { + if record.TSUnixNano == nil { + t.Fatalf("selected record missing timestamp: %+v", record) + } + selectedByEventName[record.EventName] = *record.TSUnixNano + } + + senderEventsByName := make(map[string]int64) + for _, event := range senderLogger.Events() { + if event.MessageID != msg.ID { + continue + } + if !isBusinessTXTimestampEventName(event.Event) { + continue + } + senderEventsByName[event.Event] = event.TsUnixNano + } + + for eventName, selectedTS := range selectedByEventName { + if senderEventsByName[eventName] != selectedTS { + t.Fatalf("sender latency event %s = %d, want %d from selected debug record", eventName, senderEventsByName[eventName], selectedTS) + } + } +} + func newTCPPair(t *testing.T) (net.Conn, net.Conn) { t.Helper() @@ -138,3 +369,74 @@ func assertHasEvent(t *testing.T, events []latencylog.Event, wantEvent string, w t.Fatalf("missing event %s for message %d in %+v", wantEvent, wantMessageID, events) } + +func makeTimestampSocketControlMessage(tsUnixNano int64) syscall.SocketControlMessage { + const timespec64Size = 16 + + data := make([]byte, timespec64Size*3) + sec := uint64(tsUnixNano / int64(1e9)) + nsec := uint64(tsUnixNano % int64(1e9)) + binary.NativeEndian.PutUint64(data[:8], sec) + binary.NativeEndian.PutUint64(data[8:16], nsec) + + return syscall.SocketControlMessage{ + Header: syscall.Cmsghdr{ + Level: syscall.SOL_SOCKET, + Type: linuxSCMTimestampingNew, + }, + Data: data, + } +} + +func makeSocketExtendedErrControlMessage(kind, id uint32) syscall.SocketControlMessage { + data := make([]byte, 16) + data[4] = linuxSOEEOriginTimestamping + binary.NativeEndian.PutUint32(data[8:12], kind) + binary.NativeEndian.PutUint32(data[12:16], id) + + return syscall.SocketControlMessage{ + Header: syscall.Cmsghdr{ + Level: syscall.SOL_IP, + Type: syscall.IP_RECVERR, + }, + Data: data, + } +} + +func setTCPWriteBuffer(t *testing.T, conn net.Conn, size int) { + t.Helper() + + tcpConn, ok := conn.(*net.TCPConn) + if !ok { + t.Fatalf("conn type %T does not support SetWriteBuffer", conn) + } + if err := tcpConn.SetWriteBuffer(size); err != nil { + t.Fatalf("SetWriteBuffer(%d) error = %v", size, err) + } +} + +func debugRecordsByType(records []TXTimestampDebugRecord, recordType string) []TXTimestampDebugRecord { + filtered := make([]TXTimestampDebugRecord, 0, len(records)) + for _, record := range records { + if record.RecordType != recordType { + continue + } + filtered = append(filtered, record) + } + return filtered +} + +func selectedErrqueueRecords(records []TXTimestampDebugRecord) []TXTimestampDebugRecord { + selected := make([]TXTimestampDebugRecord, 0, len(records)) + for _, record := range records { + if record.SelectedForLatency == nil || !*record.SelectedForLatency { + continue + } + selected = append(selected, record) + } + return selected +} + +func isBusinessTXTimestampRecord(record TXTimestampDebugRecord) bool { + return record.EventName == latencylog.EventATXSched || record.EventName == latencylog.EventATXSoftware +} diff --git a/cmd/internal/transport/tcp_test.go b/cmd/internal/transport/tcp_test.go index 8d7d5ca..878c759 100644 --- a/cmd/internal/transport/tcp_test.go +++ b/cmd/internal/transport/tcp_test.go @@ -38,6 +38,26 @@ func (failingLogger) LogEvent(latencylog.Event) error { return errors.New("log failed") } +type recordingTXTimestampDebugLogger struct { + mu sync.Mutex + records []TXTimestampDebugRecord +} + +func (l *recordingTXTimestampDebugLogger) LogTXTimestampDebugRecord(record TXTimestampDebugRecord) error { + l.mu.Lock() + defer l.mu.Unlock() + + l.records = append(l.records, record) + return nil +} + +func (l *recordingTXTimestampDebugLogger) Records() []TXTimestampDebugRecord { + l.mu.Lock() + defer l.mu.Unlock() + + return append([]TXTimestampDebugRecord(nil), l.records...) +} + // TestSendReceiveMessage 验证 transport 可以在单条连接上正常收发 text 和 file 消息。 func TestSendReceiveMessage(t *testing.T) { tests := []struct { diff --git a/cmd/internal/transport/tx_timestamp_debug.go b/cmd/internal/transport/tx_timestamp_debug.go new file mode 100644 index 0000000..ad7712f --- /dev/null +++ b/cmd/internal/transport/tx_timestamp_debug.go @@ -0,0 +1,97 @@ +package transport + +import ( + "encoding/json" + "fmt" + "os" + "path/filepath" + "sync" + + "omnisocketgo/cmd/internal/protocol" +) + +const ( + txTimestampDebugRecordTypeSendChunk = "send_chunk" + txTimestampDebugRecordTypeErrqueueEvent = "errqueue_event" +) + +// TXTimestampDebugRecord 是 TX errqueue 调试日志的一条 JSONL 记录。 +type TXTimestampDebugRecord struct { + RecordType string `json:"record_type"` + NodeRole string `json:"node_role,omitempty"` + NodeID string `json:"node_id,omitempty"` + MessageType protocol.MessageType `json:"message_type"` + MessageID uint64 `json:"message_id"` + From string `json:"from"` + To string `json:"to"` + FileName string `json:"file_name,omitempty"` + BodySize int `json:"body_size"` + + Phase string `json:"phase,omitempty"` + SendCallIndex *int `json:"send_call_index,omitempty"` + FrameOffsetStart *int `json:"frame_offset_start,omitempty"` + FrameOffsetEnd *int `json:"frame_offset_end,omitempty"` + BytesWritten *int `json:"bytes_written,omitempty"` + ExpectedTXID *uint32 `json:"expected_tx_id,omitempty"` + ReadIndex *int `json:"read_index,omitempty"` + EventName string `json:"event_name,omitempty"` + TSUnixNano *int64 `json:"ts_unix_nano,omitempty"` + EEInfo *uint32 `json:"ee_info,omitempty"` + EEData *uint32 `json:"ee_data,omitempty"` + MatchedSendCallIndex *int `json:"matched_send_call_index,omitempty"` + SelectedForLatency *bool `json:"selected_for_latency,omitempty"` +} + +// TXTimestampDebugLogger 接收 TX errqueue 调试记录。 +type TXTimestampDebugLogger interface { + LogTXTimestampDebugRecord(record TXTimestampDebugRecord) error +} + +// JSONLTXTimestampDebugLogger 以 JSONL 形式追加写 TX errqueue 调试日志。 +type JSONLTXTimestampDebugLogger struct { + mu sync.Mutex + closeOnce sync.Once + closeErr error + file *os.File +} + +// NewJSONLTXTimestampDebugLogger 创建一个线程安全的 TX errqueue JSONL 日志器。 +func NewJSONLTXTimestampDebugLogger(path string) (*JSONLTXTimestampDebugLogger, error) { + dir := filepath.Dir(path) + if err := os.MkdirAll(dir, 0o755); err != nil { + return nil, fmt.Errorf("transport: create tx timestamp debug log dir %s: %w", dir, err) + } + + file, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0o644) + if err != nil { + return nil, fmt.Errorf("transport: open tx timestamp debug log %s: %w", path, err) + } + + return &JSONLTXTimestampDebugLogger{file: file}, nil +} + +// LogTXTimestampDebugRecord 以单行 JSON 的形式追加一条调试记录。 +func (l *JSONLTXTimestampDebugLogger) LogTXTimestampDebugRecord(record TXTimestampDebugRecord) error { + line, err := json.Marshal(record) + if err != nil { + return err + } + + l.mu.Lock() + defer l.mu.Unlock() + + if _, err := l.file.Write(append(line, '\n')); err != nil { + return err + } + + return nil +} + +// Close 关闭底层文件;重复调用是安全的。 +func (l *JSONLTXTimestampDebugLogger) Close() error { + l.closeOnce.Do(func() { + l.closeErr = l.file.Close() + }) + + return l.closeErr +} diff --git a/cmd/peer/main.go b/cmd/peer/main.go index 73e2a0c..845cab8 100644 --- a/cmd/peer/main.go +++ b/cmd/peer/main.go @@ -11,6 +11,7 @@ import ( "omnisocketgo/cmd/internal/latencylog" peerpkg "omnisocketgo/cmd/internal/peer" "omnisocketgo/cmd/internal/protocol" + "omnisocketgo/cmd/internal/transport" ) func main() { @@ -23,10 +24,11 @@ func main() { bindDevice := flag.String("bind-device", "", "optional Linux network device used when dialing the server") inboxDir := flag.String("inbox-dir", "inbox", "directory used to persist received text and file messages") logPath := flag.String("latency-log", "", "optional JSONL file path for latency timestamp logs") + txTimestampDebugLogPath := flag.String("tx-ts-debug-log", "", "optional JSONL file path for TX errqueue debug records") interactive := flag.Bool("interactive", true, "enable interactive REPL for repeated text/file sends on the same connection") flag.Parse() - clientOptions := make([]peerpkg.Option, 0, 3) + clientOptions := make([]peerpkg.Option, 0, 4) if *logPath != "" { logger, err := latencylog.NewJSONLLogger(*logPath) if err != nil { @@ -35,6 +37,14 @@ func main() { defer logger.Close() clientOptions = append(clientOptions, peerpkg.WithLogger(logger)) } + if *txTimestampDebugLogPath != "" { + logger, err := transport.NewJSONLTXTimestampDebugLogger(*txTimestampDebugLogPath) + if err != nil { + log.Fatalf("create tx timestamp debug logger %s: %v", *txTimestampDebugLogPath, err) + } + defer logger.Close() + clientOptions = append(clientOptions, peerpkg.WithTXTimestampDebugLogger(logger)) + } if *bindIP != "" { clientOptions = append(clientOptions, peerpkg.WithBindIP(*bindIP)) }