diff --git a/cmd/internal/transport/tcp_linux.go b/cmd/internal/transport/tcp_linux.go index 7dc2fe6..e0ee67a 100644 --- a/cmd/internal/transport/tcp_linux.go +++ b/cmd/internal/transport/tcp_linux.go @@ -7,6 +7,7 @@ import ( "errors" "fmt" "io" + "strings" "syscall" "time" @@ -18,7 +19,6 @@ const ( linuxTimestampControlBufferSize = 256 // 控制消息缓冲区。 linuxTXTimestampWaitTimeout = 250 * time.Millisecond // 等待 TX 时间戳的上限。 linuxTXTimestampPollInterval = time.Millisecond // 轮询 errqueue 的间隔。 - linuxDataPollInterval = time.Millisecond // 轮询普通收发的间隔。 linuxSOTimestampingNew = 0x41 linuxSCMTimestampingNew = linuxSOTimestampingNew @@ -36,6 +36,8 @@ const ( ) // 拿到底层 fd,并打开 Linux timestamping。 +const linuxTXTimestampDrainQuietTime = 5 * time.Millisecond + func (c *TCPConn) initLinuxTimestamping() error { sysConn, ok := c.conn.(interface { SyscallConn() (syscall.RawConn, error) @@ -106,6 +108,10 @@ func enableLinuxTimestamping(rawConn syscall.RawConn) error { // sendMessageLinux 编码消息、写完整帧,再记录 TX 时间戳。 func (c *TCPConn) sendMessageLinux(msg protocol.Message) error { + // Clear any stale TX timestamp entries left by previous large writes before + // sending the next message. Otherwise later sends may log old kernel times. + c.drainPendingTXTimestampEvents() + payload, err := protocol.EncodeMessage(msg) if err != nil { return fmt.Errorf("protocol: encode message: %w", err) @@ -127,20 +133,43 @@ func (c *TCPConn) sendMessageLinux(msg protocol.Message) error { // writeFrameLinux 用 sendmsg 写完整帧。 func (c *TCPConn) writeFrameLinux(frame []byte) error { written := 0 + var opErr error - for written < len(frame) { - n, err := c.sendmsgDataOnce(frame[written:]) + err := c.raw.Write(func(fd uintptr) bool { + if written >= len(frame) { + return true + } + + n, sendErr := syscall.SendmsgN(int(fd), frame[written:], nil, nil, 0) switch { - case err == nil: + case sendErr == nil: if n <= 0 { - return io.ErrShortWrite + opErr = io.ErrShortWrite + return true } written += n - case isWouldBlock(err): - time.Sleep(linuxDataPollInterval) + return written >= len(frame) + case errors.Is(sendErr, syscall.EAGAIN), errors.Is(sendErr, syscall.EWOULDBLOCK): + return false default: - return err + opErr = sendErr + return true } + }) + if err != nil { + if isRawConnNotPollable(err) { + return writeFullFallback(c.conn, frame[written:]) + } + return err + } + if opErr != nil { + if isRawConnNotPollable(opErr) { + return writeFullFallback(c.conn, frame[written:]) + } + return opErr + } + if written != len(frame) { + return io.ErrShortWrite } return nil @@ -159,25 +188,51 @@ func (c *TCPConn) logTXTimestampEvents(msg protocol.Message) { } // 在 errqueue 里等两类 TX 时间戳。 +func (c *TCPConn) drainPendingTXTimestampEvents() { + for { + _, _, err := c.recvTXTimestampOnce() + switch { + case err == nil: + continue + case isWouldBlock(err): + return + default: + return + } + } +} + func (c *TCPConn) collectTXTimestampEvents() map[string]int64 { timestamps := make(map[string]int64, 2) //设置合理等待上限 deadline := time.Now().Add(linuxTXTimestampWaitTimeout) + var quietDeadline time.Time //轮询 errqueue 直到拿到两类时间戳,或超时,或遇到非 EAGAIN 错误。 - for len(timestamps) < 2 && time.Now().Before(deadline) { + for time.Now().Before(deadline) { eventName, ts, err := c.recvTXTimestampOnce() if err != nil { if isWouldBlock(err) { + if len(timestamps) == 0 { + time.Sleep(linuxTXTimestampPollInterval) + continue + } + if quietDeadline.IsZero() { + quietDeadline = time.Now().Add(linuxTXTimestampDrainQuietTime) + } + if time.Now().After(quietDeadline) { + return timestamps + } time.Sleep(linuxTXTimestampPollInterval) continue } - break + return timestamps } + quietDeadline = time.Time{} if eventName == "" || ts <= 0 { continue } - if _, exists := timestamps[eventName]; !exists { + if existing, exists := timestamps[eventName]; !exists || ts < existing { timestamps[eventName] = ts } } @@ -351,6 +406,15 @@ func (c *TCPConn) readFullLinux(buf []byte) (int64, error) { firstRXTime = rxTimestamp } if err != nil { + if isRawConnNotPollable(err) { + if fallbackErr := readFullFallback(c.conn, buf[offset:]); fallbackErr != nil { + if errors.Is(fallbackErr, io.EOF) && offset > 0 { + return firstRXTime, io.ErrUnexpectedEOF + } + return firstRXTime, fallbackErr + } + return firstRXTime, nil + } if errors.Is(err, io.EOF) && offset > 0 { return firstRXTime, io.ErrUnexpectedEOF } @@ -365,20 +429,39 @@ func (c *TCPConn) readFullLinux(buf []byte) (int64, error) { // recvmsgLinux 用 recvmsg 同时读取数据和控制消息。 func (c *TCPConn) recvmsgLinux(buf []byte) (int, int64, error) { - for { - n, rxTimeNS, err := c.recvmsgDataOnce(buf) + var ( + n int + rxTimeNS int64 + opErr error + ) + + err := c.raw.Read(func(fd uintptr) bool { + oob := make([]byte, linuxTimestampControlBufferSize) + readN, oobN, _, _, recvErr := syscall.Recvmsg(int(fd), buf, oob, 0) switch { - case err == nil: - if n == 0 { - return 0, 0, io.EOF + case recvErr == nil: + if readN == 0 { + opErr = io.EOF + return true } - return n, rxTimeNS, nil - case isWouldBlock(err): - time.Sleep(linuxDataPollInterval) + n = readN + rxTimeNS = parseRXTimestampControlMessages(oob[:oobN]) + return true + case errors.Is(recvErr, syscall.EAGAIN), errors.Is(recvErr, syscall.EWOULDBLOCK): + return false default: - return 0, 0, err + opErr = recvErr + return true } + }) + if err != nil { + return 0, 0, err } + if opErr != nil { + return 0, 0, opErr + } + + return n, rxTimeNS, nil } // 从控制消息里取 RX_SOFTWARE。 @@ -426,42 +509,26 @@ func isWouldBlock(err error) bool { return errors.Is(err, syscall.EAGAIN) || errors.Is(err, syscall.EWOULDBLOCK) } -func (c *TCPConn) sendmsgDataOnce(buf []byte) (int, error) { - var ( - n int - opErr error - ) - - err := c.raw.Control(func(fd uintptr) { - n, opErr = syscall.SendmsgN(int(fd), buf, nil, nil, 0) - }) - if err != nil { - return 0, err - } - - return n, opErr +func isRawConnNotPollable(err error) bool { + return err != nil && strings.Contains(err.Error(), "not pollable") } -func (c *TCPConn) recvmsgDataOnce(buf []byte) (int, int64, error) { - var ( - n int - rxTimeNS int64 - opErr error - ) - - err := c.raw.Control(func(fd uintptr) { - oob := make([]byte, linuxTimestampControlBufferSize) - readN, oobN, _, _, recvErr := syscall.Recvmsg(int(fd), buf, oob, 0) - if recvErr != nil { - opErr = recvErr - return +func writeFullFallback(w io.Writer, buf []byte) error { + for len(buf) > 0 { + n, err := w.Write(buf) + if err != nil { + return err } - n = readN - rxTimeNS = parseRXTimestampControlMessages(oob[:oobN]) - }) - if err != nil { - return 0, 0, err + if n <= 0 { + return io.ErrShortWrite + } + buf = buf[n:] } - return n, rxTimeNS, opErr + return nil +} + +func readFullFallback(r io.Reader, buf []byte) error { + _, err := io.ReadFull(r, buf) + return err }