fix: 错误队列不能只读前两个,增加了清除功能,读完全部错误队列

This commit is contained in:
nnbcccscdscdsc
2026-03-24 11:39:02 +08:00
parent 0bd684c1c9
commit 1889679dbe
8 changed files with 771 additions and 60 deletions

View File

@@ -33,8 +33,44 @@ const (
linuxSOFTimestampingTXSched = 1 << 8 // 打开 TX sched timestamp。
linuxSOFTimestampingOptTSONLY = 1 << 11 // 只回时间戳。
linuxSOFTimestampingOptIDTCP = 1 << 16 // 让 TCP 也带 timestamp ID。
linuxTXTimestampPhasePreSendDrain = "pre_send_drain"
linuxTXTimestampPhasePostSendCollect = "post_send_collect"
linuxTXTimestampPhasePostSelectDrain = "post_select_drain"
)
type txSendChunk struct {
SendCallIndex int
FrameOffsetStart int
FrameOffsetEnd int // 包含当前 sendmsg 成功写出的最后一个 frame 字节偏移。
BytesWritten int
ExpectedTXID uint32
}
type txTimestampEvent struct {
EventName string
TSUnixNano int64
EEInfo uint32
EEData uint32
}
type observedTXTimestampEvent struct {
Phase string
ReadIndex int
Event txTimestampEvent
}
type txTimestampSelection struct {
SelectedID uint32
Timestamps map[string]int64
HasEvent bool
}
type socketExtendedErrInfo struct {
Info uint32
Data uint32
}
// 拿到底层 fd并打开 Linux timestamping。
func (c *TCPConn) initLinuxTimestamping() error {
sysConn, ok := c.conn.(interface {
@@ -116,39 +152,56 @@ func (c *TCPConn) sendMessageLinux(msg protocol.Message) error {
binary.BigEndian.PutUint32(frame[:4], uint32(len(payload)))
copy(frame[4:], payload)
if err := c.writeFrameLinux(frame); err != nil {
readIndex := 0
c.drainPendingTXTimestampEvents(msg, nil, linuxTXTimestampPhasePreSendDrain, &readIndex)
chunks, err := c.writeFrameLinux(frame, msg)
if err != nil {
return fmt.Errorf("protocol: write frame: %w", err)
}
//记录发送延时日志
c.logTXTimestampEvents(msg)
c.logTXTimestampEvents(msg, chunks, &readIndex)
return nil
}
// writeFrameLinux 用 sendmsg 写完整帧。
func (c *TCPConn) writeFrameLinux(frame []byte) error {
func (c *TCPConn) writeFrameLinux(frame []byte, msg protocol.Message) ([]txSendChunk, error) {
written := 0
sendCallIndex := 0
chunks := make([]txSendChunk, 0, 1)
for written < len(frame) {
n, err := c.sendmsgDataOnce(frame[written:])
switch {
case err == nil:
if n <= 0 {
return io.ErrShortWrite
return nil, io.ErrShortWrite
}
chunk := txSendChunk{
SendCallIndex: sendCallIndex,
FrameOffsetStart: written,
FrameOffsetEnd: written + n - 1,
BytesWritten: n,
ExpectedTXID: c.txWriteSeq + uint32(n) - 1,
}
c.txWriteSeq += uint32(n)
chunks = append(chunks, chunk)
c.logTXSendChunkDebugRecord(msg, chunk)
sendCallIndex++
written += n
case isWouldBlock(err):
time.Sleep(linuxDataPollInterval)
default:
return err
return nil, err
}
}
return nil
return chunks, nil
}
// 把 A_TX_SCHED / A_TX_SOFTWARE 写入日志。(发送过程中)
func (c *TCPConn) logTXTimestampEvents(msg protocol.Message) {
timestamps := c.collectTXTimestampEvents()
func (c *TCPConn) logTXTimestampEvents(msg protocol.Message, chunks []txSendChunk, readIndex *int) {
timestamps := c.collectTXTimestampEvents(msg, chunks, readIndex)
if ts, ok := timestamps[latencylog.EventATXSched]; ok {
latencylog.LogMessageEventAt(c.logger, c.nodeRole, c.nodeID, latencylog.EventATXSched, ts, msg)
@@ -159,14 +212,14 @@ func (c *TCPConn) logTXTimestampEvents(msg protocol.Message) {
}
// 在 errqueue 里等两类 TX 时间戳。
func (c *TCPConn) collectTXTimestampEvents() map[string]int64 {
timestamps := make(map[string]int64, 2)
//设置合理等待上限
func (c *TCPConn) collectTXTimestampEvents(msg protocol.Message, chunks []txSendChunk, readIndex *int) map[string]int64 {
deadline := time.Now().Add(linuxTXTimestampWaitTimeout)
observed := make([]observedTXTimestampEvent, 0, 4)
finalChunk, hasFinalChunk := finalTXSendChunk(chunks)
//轮询 errqueue 直到拿到两类时间戳,或超时,或遇到非 EAGAIN 错误
for len(timestamps) < 2 && time.Now().Before(deadline) {
eventName, ts, err := c.recvTXTimestampOnce()
// 轮询 errqueue,优先等待本次消息最后一个 sendmsg chunk 的 sched/software 两类时间戳
for time.Now().Before(deadline) {
event, err := c.recvTXTimestampOnce()
if err != nil {
if isWouldBlock(err) {
time.Sleep(linuxTXTimestampPollInterval)
@@ -174,23 +227,33 @@ func (c *TCPConn) collectTXTimestampEvents() map[string]int64 {
}
break
}
if eventName == "" || ts <= 0 {
if event.EventName == "" || event.TSUnixNano <= 0 {
continue
}
if _, exists := timestamps[eventName]; !exists {
timestamps[eventName] = ts
observed = append(observed, observedTXTimestampEvent{
Phase: linuxTXTimestampPhasePostSendCollect,
ReadIndex: *readIndex,
Event: event,
})
*readIndex = *readIndex + 1
selection := selectTXTimestampEvents(observed, finalChunk.ExpectedTXID, hasFinalChunk)
if hasFinalChunk && selection.HasEvent && selection.SelectedID == finalChunk.ExpectedTXID && hasCompleteTXTimestampPair(selection.Timestamps) {
break
}
}
return timestamps
selection := selectTXTimestampEvents(observed, finalChunk.ExpectedTXID, hasFinalChunk)
c.logObservedTXTimestampEvents(msg, chunks, observed, selection)
c.drainPendingTXTimestampEvents(msg, chunks, linuxTXTimestampPhasePostSelectDrain, readIndex)
return selection.Timestamps
}
// recvTXTimestampOnce 从 errqueue 读一次时间戳事件。
func (c *TCPConn) recvTXTimestampOnce() (string, int64, error) {
func (c *TCPConn) recvTXTimestampOnce() (txTimestampEvent, error) {
var (
eventName string // 事件名,例如 A_TX_SCHED 或 A_TX_SOFTWARE。
tsUnixNS int64 // 时间戳的 UnixNano 表示。
opErr error
event txTimestampEvent
opErr error
)
err := c.raw.Control(func(fd uintptr) {
@@ -203,63 +266,61 @@ func (c *TCPConn) recvTXTimestampOnce() (string, int64, error) {
return
}
//解析控制消息,看看是不是我们关心的 TX 时间戳事件,如果是就拿到事件名和时间戳。
eventName, tsUnixNS = parseTXTimestampControlMessages(oob[:oobn])
event, _ = parseTXTimestampControlMessages(oob[:oobn])
})
if err != nil {
return "", 0, err
return txTimestampEvent{}, err
}
if opErr != nil {
return "", 0, opErr
return txTimestampEvent{}, opErr
}
return eventName, tsUnixNS, nil //如果成功拿到时间戳事件eventName 会是 A_TX_SCHED 或 A_TX_SOFTWARE 之一tsUnixNS 是对应的时间戳如果没有拿到事件或时间戳无效eventName 会是空字符串tsUnixNS 会是 0。
return event, nil
}
// 把底层时间戳映射成日志事件名。
func parseTXTimestampControlMessages(oob []byte) (string, int64) {
func parseTXTimestampControlMessages(oob []byte) (txTimestampEvent, bool) {
if len(oob) == 0 {
return "", 0
return txTimestampEvent{}, false
}
//解析控制消息,看看是不是我们关心的 TX 时间戳事件,如果是就拿到事件名和时间戳。
controlMessages, err := syscall.ParseSocketControlMessage(oob)
if err != nil {
return "", 0
return txTimestampEvent{}, false
}
return parseTXTimestampSocketControlMessages(controlMessages)
}
func parseTXTimestampSocketControlMessages(controlMessages []syscall.SocketControlMessage) (txTimestampEvent, bool) {
var (
tsUnixNS int64 //时间戳的 UnixNano 表示。
tsKind uint32 //extended err里告诉我们这个时间戳是 sched 还是 software。
hasTS bool // 是否拿到时间戳了。
hasKind bool // 是否拿到时间戳类型了。
event txTimestampEvent
hasTS bool
hasKind bool
)
//一个 recvmsg 可能会收到多个控制消息,循环找我们关心的时间戳事件,拿到时间戳和事件类型。
for _, controlMessage := range controlMessages {
switch {
case controlMessage.Header.Level == syscall.SOL_SOCKET && controlMessage.Header.Type == linuxSCMTimestampingNew:
if ts := parseSCMTimestampingData(controlMessage.Data); ts > 0 {
tsUnixNS = ts
event.TSUnixNano = ts
hasTS = true
}
case isSocketExtendedErr(controlMessage): //判断时间戳是否进入了errqueue
if info, ok := parseSocketExtendedErrInfo(controlMessage.Data); ok {
tsKind = info //时间戳类型被内核放在 extended err 的附加信息里,解析出来。
event.EEInfo = info.Info
event.EEData = info.Data
event.EventName = mapLinuxTXTimestampEventName(info.Info)
hasKind = true
}
}
}
if !hasTS || !hasKind {
return "", 0
if !hasTS || !hasKind || event.EventName == "" {
return txTimestampEvent{}, false
}
switch tsKind { //把内核的时间戳类型映射成日志事件名。(记录时只关心 sched 和 software 两类时间戳)
case linuxSCMTstampSched:
return latencylog.EventATXSched, tsUnixNS
case linuxSCMTstampSnd:
return latencylog.EventATXSoftware, tsUnixNS
default:
return "", 0
}
return event, true
}
// 判断控制消息是否来自 socket extended err。
@@ -276,15 +337,217 @@ func isSocketExtendedErr(controlMessage syscall.SocketControlMessage) bool {
}
// 从 socket extended err 的数据里取 origin timestamping 信息。
func parseSocketExtendedErrInfo(data []byte) (uint32, bool) {
func parseSocketExtendedErrInfo(data []byte) (socketExtendedErrInfo, bool) {
if len(data) < 16 {
return 0, false
return socketExtendedErrInfo{}, false
}
if data[4] != linuxSOEEOriginTimestamping {
return 0, false
return socketExtendedErrInfo{}, false
}
return binary.NativeEndian.Uint32(data[8:12]), true
return socketExtendedErrInfo{
Info: binary.NativeEndian.Uint32(data[8:12]),
Data: binary.NativeEndian.Uint32(data[12:16]),
}, true
}
func mapLinuxTXTimestampEventName(tsKind uint32) string {
switch tsKind {
case linuxSCMTstampSched:
return latencylog.EventATXSched
case linuxSCMTstampSnd:
return latencylog.EventATXSoftware
default:
return fmt.Sprintf("TX_TIMESTAMP_KIND_%d", tsKind)
}
}
func finalTXSendChunk(chunks []txSendChunk) (txSendChunk, bool) {
if len(chunks) == 0 {
return txSendChunk{}, false
}
return chunks[len(chunks)-1], true
}
func isBusinessTXTimestampEventName(eventName string) bool {
return eventName == latencylog.EventATXSched || eventName == latencylog.EventATXSoftware
}
func hasCompleteTXTimestampPair(timestamps map[string]int64) bool {
if len(timestamps) == 0 {
return false
}
return timestamps[latencylog.EventATXSched] > 0 && timestamps[latencylog.EventATXSoftware] > 0
}
func selectTXTimestampEvents(events []observedTXTimestampEvent, expectedFinalTXID uint32, hasExpectedFinalTXID bool) txTimestampSelection {
byID := make(map[uint32]map[string]int64)
selection := txTimestampSelection{
Timestamps: make(map[string]int64, 2),
}
var (
highestID uint32
haveHighest bool
)
for _, observed := range events {
event := observed.Event
selection.HasEvent = true
if !haveHighest || event.EEData > highestID {
highestID = event.EEData
haveHighest = true
}
if !isBusinessTXTimestampEventName(event.EventName) {
continue
}
if _, ok := byID[event.EEData]; !ok {
byID[event.EEData] = make(map[string]int64, 2)
}
if existing, exists := byID[event.EEData][event.EventName]; !exists || event.TSUnixNano < existing {
byID[event.EEData][event.EventName] = event.TSUnixNano
}
}
if !selection.HasEvent {
return selection
}
switch {
case hasExpectedFinalTXID && len(byID[expectedFinalTXID]) > 0:
selection.SelectedID = expectedFinalTXID
selection.Timestamps = copyTXTimestampMap(byID[expectedFinalTXID])
case len(byID[highestID]) > 0:
selection.SelectedID = highestID
selection.Timestamps = copyTXTimestampMap(byID[highestID])
default:
selection.SelectedID = highestID
}
return selection
}
func copyTXTimestampMap(src map[string]int64) map[string]int64 {
dst := make(map[string]int64, len(src))
for key, value := range src {
dst[key] = value
}
return dst
}
func (c *TCPConn) drainPendingTXTimestampEvents(msg protocol.Message, chunks []txSendChunk, phase string, readIndex *int) {
for {
event, err := c.recvTXTimestampOnce()
if err != nil {
if isWouldBlock(err) {
return
}
return
}
if event.EventName == "" || event.TSUnixNano <= 0 {
continue
}
c.logTXErrqueueDebugRecord(msg, chunks, observedTXTimestampEvent{
Phase: phase,
ReadIndex: *readIndex,
Event: event,
}, false)
*readIndex = *readIndex + 1
}
}
func (c *TCPConn) logObservedTXTimestampEvents(msg protocol.Message, chunks []txSendChunk, observed []observedTXTimestampEvent, selection txTimestampSelection) {
if len(observed) == 0 {
return
}
for _, entry := range observed {
selected := selection.HasEvent &&
entry.Event.EEData == selection.SelectedID &&
isBusinessTXTimestampEventName(entry.Event.EventName) &&
selection.Timestamps[entry.Event.EventName] == entry.Event.TSUnixNano
c.logTXErrqueueDebugRecord(msg, chunks, entry, selected)
}
}
func (c *TCPConn) logTXSendChunkDebugRecord(msg protocol.Message, chunk txSendChunk) {
if c.txTimestampDebugLogger == nil {
return
}
sendCallIndex := chunk.SendCallIndex
frameOffsetStart := chunk.FrameOffsetStart
frameOffsetEnd := chunk.FrameOffsetEnd
bytesWritten := chunk.BytesWritten
expectedTXID := chunk.ExpectedTXID
record := c.newTXTimestampDebugRecord(msg)
record.RecordType = txTimestampDebugRecordTypeSendChunk
record.SendCallIndex = &sendCallIndex
record.FrameOffsetStart = &frameOffsetStart
record.FrameOffsetEnd = &frameOffsetEnd
record.BytesWritten = &bytesWritten
record.ExpectedTXID = &expectedTXID
c.logTXTimestampDebugRecord(record)
}
func (c *TCPConn) logTXErrqueueDebugRecord(msg protocol.Message, chunks []txSendChunk, observed observedTXTimestampEvent, selected bool) {
if c.txTimestampDebugLogger == nil {
return
}
readIndex := observed.ReadIndex
tsUnixNano := observed.Event.TSUnixNano
eeInfo := observed.Event.EEInfo
eeData := observed.Event.EEData
selectedForLatency := selected
record := c.newTXTimestampDebugRecord(msg)
record.RecordType = txTimestampDebugRecordTypeErrqueueEvent
record.Phase = observed.Phase
record.ReadIndex = &readIndex
record.EventName = observed.Event.EventName
record.TSUnixNano = &tsUnixNano
record.EEInfo = &eeInfo
record.EEData = &eeData
record.SelectedForLatency = &selectedForLatency
if matchedSendCallIndex, ok := matchTXTimestampEventToSendChunk(observed.Event.EEData, chunks); ok {
record.MatchedSendCallIndex = &matchedSendCallIndex
}
c.logTXTimestampDebugRecord(record)
}
func matchTXTimestampEventToSendChunk(txID uint32, chunks []txSendChunk) (int, bool) {
for _, chunk := range chunks {
if chunk.ExpectedTXID == txID {
return chunk.SendCallIndex, true
}
}
return 0, false
}
func (c *TCPConn) newTXTimestampDebugRecord(msg protocol.Message) TXTimestampDebugRecord {
return TXTimestampDebugRecord{
NodeRole: c.nodeRole,
NodeID: c.nodeID,
MessageType: msg.Type,
MessageID: msg.ID,
From: msg.From,
To: msg.To,
FileName: msg.FileName,
BodySize: len(msg.Body),
}
}
func (c *TCPConn) logTXTimestampDebugRecord(record TXTimestampDebugRecord) {
if c.txTimestampDebugLogger == nil {
return
}
_ = c.txTimestampDebugLogger.LogTXTimestampDebugRecord(record)
}
// 读一条完整消息,并记录 B_RX_SOFTWARE。