fix: 错误队列不能只读前两个,增加了清除功能,读完全部错误队列

This commit is contained in:
nnbcccscdscdsc
2026-03-24 00:15:16 +08:00
parent 0bd684c1c9
commit b1127d1f10

View File

@@ -7,6 +7,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"io" "io"
"strings"
"syscall" "syscall"
"time" "time"
@@ -18,7 +19,6 @@ const (
linuxTimestampControlBufferSize = 256 // 控制消息缓冲区。 linuxTimestampControlBufferSize = 256 // 控制消息缓冲区。
linuxTXTimestampWaitTimeout = 250 * time.Millisecond // 等待 TX 时间戳的上限。 linuxTXTimestampWaitTimeout = 250 * time.Millisecond // 等待 TX 时间戳的上限。
linuxTXTimestampPollInterval = time.Millisecond // 轮询 errqueue 的间隔。 linuxTXTimestampPollInterval = time.Millisecond // 轮询 errqueue 的间隔。
linuxDataPollInterval = time.Millisecond // 轮询普通收发的间隔。
linuxSOTimestampingNew = 0x41 linuxSOTimestampingNew = 0x41
linuxSCMTimestampingNew = linuxSOTimestampingNew linuxSCMTimestampingNew = linuxSOTimestampingNew
@@ -36,6 +36,8 @@ const (
) )
// 拿到底层 fd并打开 Linux timestamping。 // 拿到底层 fd并打开 Linux timestamping。
const linuxTXTimestampDrainQuietTime = 5 * time.Millisecond
func (c *TCPConn) initLinuxTimestamping() error { func (c *TCPConn) initLinuxTimestamping() error {
sysConn, ok := c.conn.(interface { sysConn, ok := c.conn.(interface {
SyscallConn() (syscall.RawConn, error) SyscallConn() (syscall.RawConn, error)
@@ -106,6 +108,10 @@ func enableLinuxTimestamping(rawConn syscall.RawConn) error {
// sendMessageLinux 编码消息、写完整帧,再记录 TX 时间戳。 // sendMessageLinux 编码消息、写完整帧,再记录 TX 时间戳。
func (c *TCPConn) sendMessageLinux(msg protocol.Message) error { func (c *TCPConn) sendMessageLinux(msg protocol.Message) error {
// Clear any stale TX timestamp entries left by previous large writes before
// sending the next message. Otherwise later sends may log old kernel times.
c.drainPendingTXTimestampEvents()
payload, err := protocol.EncodeMessage(msg) payload, err := protocol.EncodeMessage(msg)
if err != nil { if err != nil {
return fmt.Errorf("protocol: encode message: %w", err) return fmt.Errorf("protocol: encode message: %w", err)
@@ -127,20 +133,43 @@ func (c *TCPConn) sendMessageLinux(msg protocol.Message) error {
// writeFrameLinux 用 sendmsg 写完整帧。 // writeFrameLinux 用 sendmsg 写完整帧。
func (c *TCPConn) writeFrameLinux(frame []byte) error { func (c *TCPConn) writeFrameLinux(frame []byte) error {
written := 0 written := 0
var opErr error
for written < len(frame) { err := c.raw.Write(func(fd uintptr) bool {
n, err := c.sendmsgDataOnce(frame[written:]) if written >= len(frame) {
return true
}
n, sendErr := syscall.SendmsgN(int(fd), frame[written:], nil, nil, 0)
switch { switch {
case err == nil: case sendErr == nil:
if n <= 0 { if n <= 0 {
return io.ErrShortWrite opErr = io.ErrShortWrite
return true
} }
written += n written += n
case isWouldBlock(err): return written >= len(frame)
time.Sleep(linuxDataPollInterval) case errors.Is(sendErr, syscall.EAGAIN), errors.Is(sendErr, syscall.EWOULDBLOCK):
return false
default: default:
opErr = sendErr
return true
}
})
if err != nil {
if isRawConnNotPollable(err) {
return writeFullFallback(c.conn, frame[written:])
}
return err return err
} }
if opErr != nil {
if isRawConnNotPollable(opErr) {
return writeFullFallback(c.conn, frame[written:])
}
return opErr
}
if written != len(frame) {
return io.ErrShortWrite
} }
return nil return nil
@@ -159,25 +188,51 @@ func (c *TCPConn) logTXTimestampEvents(msg protocol.Message) {
} }
// 在 errqueue 里等两类 TX 时间戳。 // 在 errqueue 里等两类 TX 时间戳。
func (c *TCPConn) drainPendingTXTimestampEvents() {
for {
_, _, err := c.recvTXTimestampOnce()
switch {
case err == nil:
continue
case isWouldBlock(err):
return
default:
return
}
}
}
func (c *TCPConn) collectTXTimestampEvents() map[string]int64 { func (c *TCPConn) collectTXTimestampEvents() map[string]int64 {
timestamps := make(map[string]int64, 2) timestamps := make(map[string]int64, 2)
//设置合理等待上限 //设置合理等待上限
deadline := time.Now().Add(linuxTXTimestampWaitTimeout) deadline := time.Now().Add(linuxTXTimestampWaitTimeout)
var quietDeadline time.Time
//轮询 errqueue 直到拿到两类时间戳,或超时,或遇到非 EAGAIN 错误。 //轮询 errqueue 直到拿到两类时间戳,或超时,或遇到非 EAGAIN 错误。
for len(timestamps) < 2 && time.Now().Before(deadline) { for time.Now().Before(deadline) {
eventName, ts, err := c.recvTXTimestampOnce() eventName, ts, err := c.recvTXTimestampOnce()
if err != nil { if err != nil {
if isWouldBlock(err) { if isWouldBlock(err) {
if len(timestamps) == 0 {
time.Sleep(linuxTXTimestampPollInterval) time.Sleep(linuxTXTimestampPollInterval)
continue continue
} }
break if quietDeadline.IsZero() {
quietDeadline = time.Now().Add(linuxTXTimestampDrainQuietTime)
} }
if time.Now().After(quietDeadline) {
return timestamps
}
time.Sleep(linuxTXTimestampPollInterval)
continue
}
return timestamps
}
quietDeadline = time.Time{}
if eventName == "" || ts <= 0 { if eventName == "" || ts <= 0 {
continue continue
} }
if _, exists := timestamps[eventName]; !exists { if existing, exists := timestamps[eventName]; !exists || ts < existing {
timestamps[eventName] = ts timestamps[eventName] = ts
} }
} }
@@ -351,6 +406,15 @@ func (c *TCPConn) readFullLinux(buf []byte) (int64, error) {
firstRXTime = rxTimestamp firstRXTime = rxTimestamp
} }
if err != nil { if err != nil {
if isRawConnNotPollable(err) {
if fallbackErr := readFullFallback(c.conn, buf[offset:]); fallbackErr != nil {
if errors.Is(fallbackErr, io.EOF) && offset > 0 {
return firstRXTime, io.ErrUnexpectedEOF
}
return firstRXTime, fallbackErr
}
return firstRXTime, nil
}
if errors.Is(err, io.EOF) && offset > 0 { if errors.Is(err, io.EOF) && offset > 0 {
return firstRXTime, io.ErrUnexpectedEOF return firstRXTime, io.ErrUnexpectedEOF
} }
@@ -365,20 +429,39 @@ func (c *TCPConn) readFullLinux(buf []byte) (int64, error) {
// recvmsgLinux 用 recvmsg 同时读取数据和控制消息。 // recvmsgLinux 用 recvmsg 同时读取数据和控制消息。
func (c *TCPConn) recvmsgLinux(buf []byte) (int, int64, error) { func (c *TCPConn) recvmsgLinux(buf []byte) (int, int64, error) {
for { var (
n, rxTimeNS, err := c.recvmsgDataOnce(buf) n int
rxTimeNS int64
opErr error
)
err := c.raw.Read(func(fd uintptr) bool {
oob := make([]byte, linuxTimestampControlBufferSize)
readN, oobN, _, _, recvErr := syscall.Recvmsg(int(fd), buf, oob, 0)
switch { switch {
case err == nil: case recvErr == nil:
if n == 0 { if readN == 0 {
return 0, 0, io.EOF opErr = io.EOF
return true
} }
return n, rxTimeNS, nil n = readN
case isWouldBlock(err): rxTimeNS = parseRXTimestampControlMessages(oob[:oobN])
time.Sleep(linuxDataPollInterval) return true
case errors.Is(recvErr, syscall.EAGAIN), errors.Is(recvErr, syscall.EWOULDBLOCK):
return false
default: default:
opErr = recvErr
return true
}
})
if err != nil {
return 0, 0, err return 0, 0, err
} }
if opErr != nil {
return 0, 0, opErr
} }
return n, rxTimeNS, nil
} }
// 从控制消息里取 RX_SOFTWARE。 // 从控制消息里取 RX_SOFTWARE。
@@ -426,42 +509,26 @@ func isWouldBlock(err error) bool {
return errors.Is(err, syscall.EAGAIN) || errors.Is(err, syscall.EWOULDBLOCK) return errors.Is(err, syscall.EAGAIN) || errors.Is(err, syscall.EWOULDBLOCK)
} }
func (c *TCPConn) sendmsgDataOnce(buf []byte) (int, error) { func isRawConnNotPollable(err error) bool {
var ( return err != nil && strings.Contains(err.Error(), "not pollable")
n int
opErr error
)
err := c.raw.Control(func(fd uintptr) {
n, opErr = syscall.SendmsgN(int(fd), buf, nil, nil, 0)
})
if err != nil {
return 0, err
}
return n, opErr
} }
func (c *TCPConn) recvmsgDataOnce(buf []byte) (int, int64, error) { func writeFullFallback(w io.Writer, buf []byte) error {
var ( for len(buf) > 0 {
n int n, err := w.Write(buf)
rxTimeNS int64
opErr error
)
err := c.raw.Control(func(fd uintptr) {
oob := make([]byte, linuxTimestampControlBufferSize)
readN, oobN, _, _, recvErr := syscall.Recvmsg(int(fd), buf, oob, 0)
if recvErr != nil {
opErr = recvErr
return
}
n = readN
rxTimeNS = parseRXTimestampControlMessages(oob[:oobN])
})
if err != nil { if err != nil {
return 0, 0, err return err
}
if n <= 0 {
return io.ErrShortWrite
}
buf = buf[n:]
} }
return n, rxTimeNS, opErr return nil
}
func readFullFallback(r io.Reader, buf []byte) error {
_, err := io.ReadFull(r, buf)
return err
} }