From 0bd684c1c990bd7f6229e9cda1d8e4cb4c8b657c Mon Sep 17 00:00:00 2001 From: nnbcccscdscdsc <2709767634@qq.com> Date: Mon, 23 Mar 2026 22:22:00 +0800 Subject: [PATCH] =?UTF-8?q?fix=EF=BC=9A=E6=99=AE=E9=80=9A=E4=B8=9A?= =?UTF-8?q?=E5=8A=A1=E6=95=B0=E6=8D=AE=E4=B8=8D=E5=86=8D=E8=B5=B0=20RawCon?= =?UTF-8?q?n.Read/Write?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cmd/internal/transport/tcp_linux.go | 135 +++++++++++----------------- 1 file changed, 50 insertions(+), 85 deletions(-) diff --git a/cmd/internal/transport/tcp_linux.go b/cmd/internal/transport/tcp_linux.go index 3ccbe10..7dc2fe6 100644 --- a/cmd/internal/transport/tcp_linux.go +++ b/cmd/internal/transport/tcp_linux.go @@ -7,7 +7,6 @@ import ( "errors" "fmt" "io" - "strings" "syscall" "time" @@ -19,6 +18,7 @@ const ( linuxTimestampControlBufferSize = 256 // 控制消息缓冲区。 linuxTXTimestampWaitTimeout = 250 * time.Millisecond // 等待 TX 时间戳的上限。 linuxTXTimestampPollInterval = time.Millisecond // 轮询 errqueue 的间隔。 + linuxDataPollInterval = time.Millisecond // 轮询普通收发的间隔。 linuxSOTimestampingNew = 0x41 linuxSCMTimestampingNew = linuxSOTimestampingNew @@ -127,43 +127,20 @@ func (c *TCPConn) sendMessageLinux(msg protocol.Message) error { // writeFrameLinux 用 sendmsg 写完整帧。 func (c *TCPConn) writeFrameLinux(frame []byte) error { written := 0 - var opErr error - err := c.raw.Write(func(fd uintptr) bool { - if written >= len(frame) { - return true - } - - n, sendErr := syscall.SendmsgN(int(fd), frame[written:], nil, nil, 0) + for written < len(frame) { + n, err := c.sendmsgDataOnce(frame[written:]) switch { - case sendErr == nil: + case err == nil: if n <= 0 { - opErr = io.ErrShortWrite - return true + return io.ErrShortWrite } written += n - return written >= len(frame) - case errors.Is(sendErr, syscall.EAGAIN), errors.Is(sendErr, syscall.EWOULDBLOCK): - return false + case isWouldBlock(err): + time.Sleep(linuxDataPollInterval) default: - opErr = sendErr - return true + return err } - }) - if err != nil { - if isRawConnNotPollable(err) { - return writeFullFallback(c.conn, frame[written:]) - } - return err - } - if opErr != nil { - if isRawConnNotPollable(opErr) { - return writeFullFallback(c.conn, frame[written:]) - } - return opErr - } - if written != len(frame) { - return io.ErrShortWrite } return nil @@ -374,15 +351,6 @@ func (c *TCPConn) readFullLinux(buf []byte) (int64, error) { firstRXTime = rxTimestamp } if err != nil { - if isRawConnNotPollable(err) { - if fallbackErr := readFullFallback(c.conn, buf[offset:]); fallbackErr != nil { - if errors.Is(fallbackErr, io.EOF) && offset > 0 { - return firstRXTime, io.ErrUnexpectedEOF - } - return firstRXTime, fallbackErr - } - return firstRXTime, nil - } if errors.Is(err, io.EOF) && offset > 0 { return firstRXTime, io.ErrUnexpectedEOF } @@ -397,39 +365,20 @@ func (c *TCPConn) readFullLinux(buf []byte) (int64, error) { // recvmsgLinux 用 recvmsg 同时读取数据和控制消息。 func (c *TCPConn) recvmsgLinux(buf []byte) (int, int64, error) { - var ( - n int - rxTimeNS int64 - opErr error - ) - - err := c.raw.Read(func(fd uintptr) bool { - oob := make([]byte, linuxTimestampControlBufferSize) - readN, oobN, _, _, recvErr := syscall.Recvmsg(int(fd), buf, oob, 0) + for { + n, rxTimeNS, err := c.recvmsgDataOnce(buf) switch { - case recvErr == nil: - if readN == 0 { - opErr = io.EOF - return true + case err == nil: + if n == 0 { + return 0, 0, io.EOF } - n = readN - rxTimeNS = parseRXTimestampControlMessages(oob[:oobN]) - return true - case errors.Is(recvErr, syscall.EAGAIN), errors.Is(recvErr, syscall.EWOULDBLOCK): - return false + return n, rxTimeNS, nil + case isWouldBlock(err): + time.Sleep(linuxDataPollInterval) default: - opErr = recvErr - return true + return 0, 0, err } - }) - if err != nil { - return 0, 0, err } - if opErr != nil { - return 0, 0, opErr - } - - return n, rxTimeNS, nil } // 从控制消息里取 RX_SOFTWARE。 @@ -477,26 +426,42 @@ func isWouldBlock(err error) bool { return errors.Is(err, syscall.EAGAIN) || errors.Is(err, syscall.EWOULDBLOCK) } -func isRawConnNotPollable(err error) bool { - return err != nil && strings.Contains(err.Error(), "not pollable") -} +func (c *TCPConn) sendmsgDataOnce(buf []byte) (int, error) { + var ( + n int + opErr error + ) -func writeFullFallback(w io.Writer, buf []byte) error { - for len(buf) > 0 { - n, err := w.Write(buf) - if err != nil { - return err - } - if n <= 0 { - return io.ErrShortWrite - } - buf = buf[n:] + err := c.raw.Control(func(fd uintptr) { + n, opErr = syscall.SendmsgN(int(fd), buf, nil, nil, 0) + }) + if err != nil { + return 0, err } - return nil + return n, opErr } -func readFullFallback(r io.Reader, buf []byte) error { - _, err := io.ReadFull(r, buf) - return err +func (c *TCPConn) recvmsgDataOnce(buf []byte) (int, int64, error) { + var ( + n int + rxTimeNS int64 + opErr error + ) + + err := c.raw.Control(func(fd uintptr) { + oob := make([]byte, linuxTimestampControlBufferSize) + readN, oobN, _, _, recvErr := syscall.Recvmsg(int(fd), buf, oob, 0) + if recvErr != nil { + opErr = recvErr + return + } + n = readN + rxTimeNS = parseRXTimestampControlMessages(oob[:oobN]) + }) + if err != nil { + return 0, 0, err + } + + return n, rxTimeNS, opErr }