fix: tx-debug-udp 日志的写入以及发送日志调试

This commit is contained in:
2026-03-24 18:02:38 +08:00
parent 9503862eda
commit 97eb3163db
6 changed files with 570 additions and 98 deletions

View File

@@ -13,7 +13,7 @@ import (
"omnisocketgo/cmd/internal/protocol"
)
// UDP 接收缓冲区大小,足以容纳 MaxFrameSize 加上协议头。
// UDP 接收缓冲区需要容纳完整 datagram 和协议头。
const udpReceiveBufferSize = protocol.MaxFrameSize + 1024
// initUDPLinuxTimestamping 拿到底层 fd并打开 Linux timestamping。
@@ -26,7 +26,6 @@ func (c *UDPConn) initUDPLinuxTimestamping() error {
return fmt.Errorf("transport: udp missing syscall conn")
}
// UDP 不需要 OPT_ID_TCP使用标准的 OPT_ID 即可。
flagCandidates := []int{
linuxSOFTimestampingTXSched |
linuxSOFTimestampingTXSoftware |
@@ -68,21 +67,7 @@ func (c *UDPConn) sendMessageLinux(msg protocol.Message) error {
return fmt.Errorf("protocol: encode message: %w", err)
}
readIndex := 0
c.drainPendingUDPTXTimestampEvents(msg, linuxTXTimestampPhasePreSendDrain, &readIndex)
if c.peerAddr != nil {
if err := c.udpSendTo(payload, c.peerAddr); err != nil {
return err
}
} else {
if err := c.udpSend(payload); err != nil {
return err
}
}
c.collectAndLogUDPTXTimestampEvents(msg, &readIndex)
return nil
return c.sendUDPPayloadLinux(msg, payload, c.peerAddr)
}
// sendMessageToLinux 编码消息并通过 UDP 发送到指定地址,采集 TX 时间戳。
@@ -92,22 +77,65 @@ func (c *UDPConn) sendMessageToLinux(msg protocol.Message, addr *net.UDPAddr) er
return fmt.Errorf("protocol: encode message: %w", err)
}
readIndex := 0
c.drainPendingUDPTXTimestampEvents(msg, linuxTXTimestampPhasePreSendDrain, &readIndex)
return c.sendUDPPayloadLinux(msg, payload, addr)
}
if err := c.udpSendTo(payload, addr); err != nil {
func (c *UDPConn) sendUDPPayloadLinux(msg protocol.Message, payload []byte, addr *net.UDPAddr) error {
readIndex := 0
// pre-send drain 可能读到上一条消息晚到的 errqueue 事件,
// 这里必须先清掉,并且按 ee_data 归还给原消息,不能污染当前消息。
c.drainPendingUDPTXTimestampEvents(linuxTXTimestampPhasePreSendDrain, &readIndex)
chunk := c.newUDPSendChunk(len(payload))
var err error
if addr != nil {
err = c.udpSendTo(payload, addr)
} else {
err = c.udpSend(payload)
}
if err != nil {
return err
}
c.collectAndLogUDPTXTimestampEvents(msg, &readIndex)
c.commitUDPSend(msg, chunk)
c.collectAndLogUDPTXTimestampEvents(msg, chunk, &readIndex)
return nil
}
func (c *UDPConn) newUDPSendChunk(payloadLen int) txSendChunk {
return txSendChunk{
SendCallIndex: 0,
FrameOffsetStart: 0,
FrameOffsetEnd: payloadLen - 1,
BytesWritten: payloadLen,
ExpectedTXID: c.txPacketSeq,
}
}
func (c *UDPConn) commitUDPSend(msg protocol.Message, chunk txSendChunk) {
// Linux 对 UDP datagram 的 ee_data 是按包递增的 ID。
// 这里把 ID 和原始消息元数据绑定起来,后续 drain 到的晚到事件才能记回原消息。
if c.txTimestampDebugLogger != nil {
c.pendingTX[chunk.ExpectedTXID] = udpTXPendingRecord{
msg: msg,
sendCallIndex: chunk.SendCallIndex,
bytesWritten: chunk.BytesWritten,
expectedTXID: chunk.ExpectedTXID,
observedTimestamps: make(map[string]int64, 2),
}
c.logUDPTXSendChunkDebugRecord(msg, chunk)
}
c.txPacketSeq++
}
// udpSend 通过已连接的 UDP socket 发送数据。
func (c *UDPConn) udpSend(payload []byte) error {
if c.raw != nil {
return c.udpSendmsgRaw(payload, nil)
}
_, err := c.conn.Write(payload)
return err
}
@@ -120,6 +148,7 @@ func (c *UDPConn) udpSendTo(payload []byte, addr *net.UDPAddr) error {
return c.udpSendmsgRaw(payload, sa)
}
}
_, err := c.conn.WriteToUDP(payload, addr)
return err
}
@@ -213,14 +242,13 @@ func (c *UDPConn) udpRecvmsgRaw() ([]byte, *net.UDPAddr, int64, error) {
return nil, nil, 0, opErr
}
addr := sockaddrToUDPAddr(from)
return buf[:n], addr, rxTimeNS, nil
return buf[:n], sockaddrToUDPAddr(from), rxTimeNS, nil
}
}
// collectAndLogUDPTXTimestampEvents 采集并记录 UDP 发送的 TX 时间戳事件。
func (c *UDPConn) collectAndLogUDPTXTimestampEvents(msg protocol.Message, readIndex *int) {
timestamps := c.collectUDPTXTimestampEvents(msg, readIndex)
func (c *UDPConn) collectAndLogUDPTXTimestampEvents(msg protocol.Message, chunk txSendChunk, readIndex *int) {
timestamps := c.collectUDPTXTimestampEvents(msg, chunk, readIndex)
if ts, ok := timestamps[latencylog.EventATXSched]; ok {
latencylog.LogMessageEventAt(c.logger, c.nodeRole, c.nodeID, latencylog.EventATXSched, ts, msg)
@@ -231,13 +259,13 @@ func (c *UDPConn) collectAndLogUDPTXTimestampEvents(msg protocol.Message, readIn
}
// collectUDPTXTimestampEvents 在 errqueue 中等待 TX 时间戳。
func (c *UDPConn) collectUDPTXTimestampEvents(msg protocol.Message, readIndex *int) map[string]int64 {
func (c *UDPConn) collectUDPTXTimestampEvents(msg protocol.Message, chunk txSendChunk, readIndex *int) map[string]int64 {
if c.raw == nil {
return nil
}
deadline := time.Now().Add(linuxTXTimestampWaitTimeout)
timestamps := make(map[string]int64, 2)
observed := make([]observedTXTimestampEvent, 0, 4)
for time.Now().Before(deadline) {
event, err := c.recvUDPTXTimestampOnce()
@@ -251,25 +279,31 @@ func (c *UDPConn) collectUDPTXTimestampEvents(msg protocol.Message, readIndex *i
if event.EventName == "" || event.TSUnixNano <= 0 {
continue
}
*readIndex++
if isBusinessTXTimestampEventName(event.EventName) {
if _, exists := timestamps[event.EventName]; !exists {
timestamps[event.EventName] = event.TSUnixNano
}
}
observed = append(observed, observedTXTimestampEvent{
Phase: linuxTXTimestampPhasePostSendCollect,
ReadIndex: *readIndex,
Event: event,
})
c.recordUDPPendingEvent(event)
*readIndex = *readIndex + 1
if hasCompleteTXTimestampPair(timestamps) {
selection := selectTXTimestampEvents(observed, chunk.ExpectedTXID, true)
if selection.HasEvent && selection.SelectedID == chunk.ExpectedTXID && hasCompleteTXTimestampPair(selection.Timestamps) {
break
}
}
c.drainPendingUDPTXTimestampEvents(msg, linuxTXTimestampPhasePostSelectDrain, readIndex)
return timestamps
selection := selectTXTimestampEvents(observed, chunk.ExpectedTXID, true)
c.logObservedUDPTXTimestampEvents(msg, chunk, observed, selection)
c.releaseCompletedUDPPendingFromObserved(observed)
c.drainPendingUDPTXTimestampEvents(linuxTXTimestampPhasePostSelectDrain, readIndex)
c.releaseCompletedUDPPending(chunk.ExpectedTXID)
return selection.Timestamps
}
// drainPendingUDPTXTimestampEvents 清空 errqueue 中残留的时间戳事件。
func (c *UDPConn) drainPendingUDPTXTimestampEvents(msg protocol.Message, phase string, readIndex *int) {
func (c *UDPConn) drainPendingUDPTXTimestampEvents(phase string, readIndex *int) {
if c.raw == nil {
return
}
@@ -277,12 +311,27 @@ func (c *UDPConn) drainPendingUDPTXTimestampEvents(msg protocol.Message, phase s
for {
event, err := c.recvUDPTXTimestampOnce()
if err != nil {
if isWouldBlock(err) {
return
}
return
}
if event.EventName == "" || event.TSUnixNano <= 0 {
continue
}
*readIndex++
complete := c.recordUDPPendingEvent(event)
if msg, chunks, ok := c.lookupUDPPendingDebugContext(event.EEData); ok {
c.logUDPTXErrqueueDebugRecord(msg, chunks, observedTXTimestampEvent{
Phase: phase,
ReadIndex: *readIndex,
Event: event,
}, false)
if complete {
delete(c.pendingTX, event.EEData)
}
}
*readIndex = *readIndex + 1
}
}
@@ -312,6 +361,155 @@ func (c *UDPConn) recvUDPTXTimestampOnce() (txTimestampEvent, error) {
return event, nil
}
func (c *UDPConn) recordUDPPendingEvent(event txTimestampEvent) bool {
if !isBusinessTXTimestampEventName(event.EventName) {
return false
}
record, ok := c.pendingTX[event.EEData]
if !ok {
return false
}
if record.observedTimestamps == nil {
record.observedTimestamps = make(map[string]int64, 2)
}
if existing, exists := record.observedTimestamps[event.EventName]; !exists || event.TSUnixNano < existing {
record.observedTimestamps[event.EventName] = event.TSUnixNano
}
c.pendingTX[event.EEData] = record
return hasCompleteTXTimestampPair(record.observedTimestamps)
}
func (c *UDPConn) releaseCompletedUDPPending(txID uint32) {
record, ok := c.pendingTX[txID]
if !ok {
return
}
if hasCompleteTXTimestampPair(record.observedTimestamps) {
delete(c.pendingTX, txID)
}
}
func (c *UDPConn) releaseCompletedUDPPendingFromObserved(observed []observedTXTimestampEvent) {
seen := make(map[uint32]struct{}, len(observed))
for _, entry := range observed {
if _, ok := seen[entry.Event.EEData]; ok {
continue
}
c.releaseCompletedUDPPending(entry.Event.EEData)
seen[entry.Event.EEData] = struct{}{}
}
}
func (c *UDPConn) lookupUDPPendingDebugContext(txID uint32) (protocol.Message, []txSendChunk, bool) {
record, ok := c.pendingTX[txID]
if !ok {
return protocol.Message{}, nil, false
}
chunk := txSendChunk{
SendCallIndex: record.sendCallIndex,
FrameOffsetStart: 0,
FrameOffsetEnd: record.bytesWritten - 1,
BytesWritten: record.bytesWritten,
ExpectedTXID: record.expectedTXID,
}
return record.msg, []txSendChunk{chunk}, true
}
func (c *UDPConn) logObservedUDPTXTimestampEvents(msg protocol.Message, chunk txSendChunk, observed []observedTXTimestampEvent, selection txTimestampSelection) {
if len(observed) == 0 {
return
}
currentChunks := []txSendChunk{chunk}
for _, entry := range observed {
recordMsg := msg
recordChunks := currentChunks
if pendingMsg, pendingChunks, ok := c.lookupUDPPendingDebugContext(entry.Event.EEData); ok {
recordMsg = pendingMsg
recordChunks = pendingChunks
}
// 理想情况应命中本次 expectedTXID如果等待窗口里只看到了更高的 ee_data
// 就退回到本轮实际观察到的最新事件,至少保留调试和定位线索。
selected := selection.HasEvent &&
entry.Event.EEData == selection.SelectedID &&
isBusinessTXTimestampEventName(entry.Event.EventName) &&
selection.Timestamps[entry.Event.EventName] == entry.Event.TSUnixNano
c.logUDPTXErrqueueDebugRecord(recordMsg, recordChunks, entry, selected)
}
}
func (c *UDPConn) logUDPTXSendChunkDebugRecord(msg protocol.Message, chunk txSendChunk) {
if c.txTimestampDebugLogger == nil {
return
}
sendCallIndex := chunk.SendCallIndex
frameOffsetStart := chunk.FrameOffsetStart
frameOffsetEnd := chunk.FrameOffsetEnd
bytesWritten := chunk.BytesWritten
expectedTXID := chunk.ExpectedTXID
record := c.newUDPTXTimestampDebugRecord(msg)
record.RecordType = txTimestampDebugRecordTypeSendChunk
record.SendCallIndex = &sendCallIndex
record.FrameOffsetStart = &frameOffsetStart
record.FrameOffsetEnd = &frameOffsetEnd
record.BytesWritten = &bytesWritten
record.ExpectedTXID = &expectedTXID
c.logUDPTXTimestampDebugRecord(record)
}
func (c *UDPConn) logUDPTXErrqueueDebugRecord(msg protocol.Message, chunks []txSendChunk, observed observedTXTimestampEvent, selected bool) {
if c.txTimestampDebugLogger == nil {
return
}
readIndex := observed.ReadIndex
tsUnixNano := observed.Event.TSUnixNano
eeInfo := observed.Event.EEInfo
eeData := observed.Event.EEData
selectedForLatency := selected
record := c.newUDPTXTimestampDebugRecord(msg)
record.RecordType = txTimestampDebugRecordTypeErrqueueEvent
record.Phase = observed.Phase
record.ReadIndex = &readIndex
record.EventName = observed.Event.EventName
record.TSUnixNano = &tsUnixNano
record.EEInfo = &eeInfo
record.EEData = &eeData
record.SelectedForLatency = &selectedForLatency
if matchedSendCallIndex, ok := matchTXTimestampEventToSendChunk(observed.Event.EEData, chunks); ok {
record.MatchedSendCallIndex = &matchedSendCallIndex
}
c.logUDPTXTimestampDebugRecord(record)
}
func (c *UDPConn) newUDPTXTimestampDebugRecord(msg protocol.Message) TXTimestampDebugRecord {
return TXTimestampDebugRecord{
NodeRole: c.nodeRole,
NodeID: c.nodeID,
MessageType: msg.Type,
MessageID: msg.ID,
From: msg.From,
To: msg.To,
FileName: msg.FileName,
BodySize: len(msg.Body),
}
}
func (c *UDPConn) logUDPTXTimestampDebugRecord(record TXTimestampDebugRecord) {
if c.txTimestampDebugLogger == nil {
return
}
_ = c.txTimestampDebugLogger.LogTXTimestampDebugRecord(record)
}
// udpAddrToSockaddr 将 net.UDPAddr 转换为 syscall.Sockaddr。
func udpAddrToSockaddr(addr *net.UDPAddr) syscall.Sockaddr {
if ip4 := addr.IP.To4(); ip4 != nil {