From 8cec6a0766c4c4b9736d16aabc7437c78da0e614 Mon Sep 17 00:00:00 2001 From: Mock Date: Fri, 27 Mar 2026 01:59:17 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20transport.UDPConn=20=E6=96=B0=E5=A2=9E?= =?UTF-8?q?=E4=BA=86=20WithUDPLinuxTimestamping(false)=20=E5=BC=80?= =?UTF-8?q?=E5=85=B3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cmd/internal/peer/client.go | 11 +++- cmd/internal/peer/udp_client.go | 4 +- cmd/internal/server/udp_hub.go | 22 +++++-- cmd/internal/transport/udp.go | 63 ++++++++++-------- cmd/internal/transport/udp_linux_test.go | 84 ++++++++++++++++++++++++ cmd/udpping/platform_linux.go | 3 +- cmd/udpserver/main.go | 4 +- 7 files changed, 154 insertions(+), 37 deletions(-) diff --git a/cmd/internal/peer/client.go b/cmd/internal/peer/client.go index 2500667..f31699e 100644 --- a/cmd/internal/peer/client.go +++ b/cmd/internal/peer/client.go @@ -22,6 +22,7 @@ type clientOptions struct { kcpPacketDebugLogger transport.KCPPacketDebugLogger kcpSessionStatsLogger transport.KCPSessionStatsLogger kcpSessionStatsInterval time.Duration + udpLinuxTimestamping bool bindIP string bindDevice string } @@ -72,6 +73,13 @@ func WithBindDevice(device string) Option { } } +// WithUDPLinuxTimestamping controls whether UDP clients enable Linux timestamping. +func WithUDPLinuxTimestamping(enabled bool) Option { + return func(options *clientOptions) { + options.udpLinuxTimestamping = enabled + } +} + // Client 表示一个已经连接到 server 的 peer。 type Client struct { id string @@ -84,7 +92,8 @@ type Client struct { // Dial 连接到 server,并立即发送 register 消息完成身份注册。 func Dial(serverAddr, peerID string, opts ...Option) (*Client, error) { options := clientOptions{ - logger: latencylog.NoopLogger{}, + logger: latencylog.NoopLogger{}, + udpLinuxTimestamping: true, } for _, opt := range opts { opt(&options) diff --git a/cmd/internal/peer/udp_client.go b/cmd/internal/peer/udp_client.go index ca8eccc..70840f3 100644 --- a/cmd/internal/peer/udp_client.go +++ b/cmd/internal/peer/udp_client.go @@ -24,7 +24,8 @@ type UDPClient struct { // DialUDP 通过 UDP 连接到 server,并发送 register 消息完成身份注册。 func DialUDP(serverAddr, peerID string, opts ...Option) (*UDPClient, error) { options := clientOptions{ - logger: latencylog.NoopLogger{}, + logger: latencylog.NoopLogger{}, + udpLinuxTimestamping: true, } for _, opt := range opts { opt(&options) @@ -56,6 +57,7 @@ func DialUDP(serverAddr, peerID string, opts ...Option) (*UDPClient, error) { rawConn, nil, // peer 侧已连接模式,不需要指定 peerAddr transport.WithUDPLogger(options.logger, latencylog.NodeRolePeer, peerID), + transport.WithUDPLinuxTimestamping(options.udpLinuxTimestamping), transport.WithUDPTXTimestampDebugLogger(options.txTimestampDebugLogger), ) if err != nil { diff --git a/cmd/internal/server/udp_hub.go b/cmd/internal/server/udp_hub.go index 680b94d..515e32c 100644 --- a/cmd/internal/server/udp_hub.go +++ b/cmd/internal/server/udp_hub.go @@ -28,23 +28,32 @@ func WithUDPTXTimestampDebugLogger(logger transport.TXTimestampDebugLogger) UDPO } } +// WithUDPLinuxTimestamping controls whether the UDP hub enables Linux timestamping. +func WithUDPLinuxTimestamping(enabled bool) UDPOption { + return func(hub *UDPHub) { + hub.linuxTimestampingEnabled = enabled + } +} + // UDPHub 管理通过 UDP 注册的 peer,并负责在它们之间转发消息。 type UDPHub struct { mu sync.RWMutex peers map[string]*net.UDPAddr addrs map[string]string - conn *transport.UDPConn - logger latencylog.Logger - txTimestampDebugLogger transport.TXTimestampDebugLogger + conn *transport.UDPConn + logger latencylog.Logger + txTimestampDebugLogger transport.TXTimestampDebugLogger + linuxTimestampingEnabled bool } // NewUDPHub 创建一个新的 UDP 连接中心。 func NewUDPHub(conn *net.UDPConn, opts ...UDPOption) (*UDPHub, error) { hub := &UDPHub{ - peers: make(map[string]*net.UDPAddr), - addrs: make(map[string]string), - logger: latencylog.NoopLogger{}, + peers: make(map[string]*net.UDPAddr), + addrs: make(map[string]string), + logger: latencylog.NoopLogger{}, + linuxTimestampingEnabled: true, } for _, opt := range opts { @@ -59,6 +68,7 @@ func NewUDPHub(conn *net.UDPConn, opts ...UDPOption) (*UDPHub, error) { conn, nil, transport.WithUDPLogger(hub.logger, latencylog.NodeRoleServer, "hub"), + transport.WithUDPLinuxTimestamping(hub.linuxTimestampingEnabled), transport.WithUDPTXTimestampDebugLogger(hub.txTimestampDebugLogger), ) if err != nil { diff --git a/cmd/internal/transport/udp.go b/cmd/internal/transport/udp.go index 566d1ed..87d1b31 100644 --- a/cmd/internal/transport/udp.go +++ b/cmd/internal/transport/udp.go @@ -10,23 +10,22 @@ import ( "omnisocketgo/cmd/internal/protocol" ) -// UDPConn 是对 UDP 连接的轻量封装。 -// server 侧共享一个 net.UDPConn,通过 SendTo 指定目标地址; -// peer 侧使用已连接的 net.UDPConn,直接 Send 即可。 +// UDPConn wraps a UDP socket for protocol message send/receive. type UDPConn struct { conn *net.UDPConn peerAddr *net.UDPAddr raw syscall.RawConn - logger latencylog.Logger - txTimestampDebugLogger TXTimestampDebugLogger - txPacketSeq uint32 - pendingTX map[uint32]udpTXPendingRecord - nodeRole string - nodeID string - writeMu sync.Mutex - closeOnce sync.Once - closeErr error + linuxTimestampingEnabled bool + logger latencylog.Logger + txTimestampDebugLogger TXTimestampDebugLogger + txPacketSeq uint32 + pendingTX map[uint32]udpTXPendingRecord + nodeRole string + nodeID string + writeMu sync.Mutex + closeOnce sync.Once + closeErr error } type udpTXPendingRecord struct { @@ -37,10 +36,10 @@ type udpTXPendingRecord struct { observedTimestamps map[string]int64 } -// UDPOption 用于为 UDPConn 注入可选行为。 +// UDPOption configures an optional behavior on UDPConn. type UDPOption func(*UDPConn) -// WithUDPLogger 为 UDP 连接注入业务消息日志上下文。 +// WithUDPLogger attaches latency logging context to a UDP connection. func WithUDPLogger(logger latencylog.Logger, nodeRole, nodeID string) UDPOption { return func(conn *UDPConn) { conn.logger = logger @@ -49,20 +48,28 @@ func WithUDPLogger(logger latencylog.Logger, nodeRole, nodeID string) UDPOption } } -// WithUDPTXTimestampDebugLogger 为 UDP 连接注入 TX errqueue 调试日志器。 +// WithUDPTXTimestampDebugLogger attaches a TX errqueue debug logger. func WithUDPTXTimestampDebugLogger(logger TXTimestampDebugLogger) UDPOption { return func(conn *UDPConn) { conn.txTimestampDebugLogger = logger } } -// NewUDPConn 创建 UDP transport 连接封装。 +// WithUDPLinuxTimestamping controls whether Linux UDP timestamping is enabled. +func WithUDPLinuxTimestamping(enabled bool) UDPOption { + return func(conn *UDPConn) { + conn.linuxTimestampingEnabled = enabled + } +} + +// NewUDPConn creates a UDP transport wrapper. func NewUDPConn(conn *net.UDPConn, peerAddr *net.UDPAddr, opts ...UDPOption) (*UDPConn, error) { udpConn := &UDPConn{ - conn: conn, - peerAddr: peerAddr, - logger: latencylog.NoopLogger{}, - pendingTX: make(map[uint32]udpTXPendingRecord), + conn: conn, + peerAddr: peerAddr, + linuxTimestampingEnabled: true, + logger: latencylog.NoopLogger{}, + pendingTX: make(map[uint32]udpTXPendingRecord), } for _, opt := range opts { @@ -73,14 +80,16 @@ func NewUDPConn(conn *net.UDPConn, peerAddr *net.UDPAddr, opts ...UDPOption) (*U udpConn.logger = latencylog.NoopLogger{} } - if err := udpConn.initUDPLinuxTimestamping(); err != nil { - return nil, err + if udpConn.linuxTimestampingEnabled { + if err := udpConn.initUDPLinuxTimestamping(); err != nil { + return nil, err + } } return udpConn, nil } -// Send 将一条协议消息编码为 UDP 数据报并发送。 +// Send encodes and sends one protocol message over UDP. func (c *UDPConn) Send(msg protocol.Message) error { c.writeMu.Lock() defer c.writeMu.Unlock() @@ -94,7 +103,7 @@ func (c *UDPConn) Send(msg protocol.Message) error { return nil } -// SendTo 将一条协议消息编码为 UDP 数据报并发送到指定地址。 +// SendTo encodes and sends one protocol message to a specific UDP address. func (c *UDPConn) SendTo(msg protocol.Message, addr *net.UDPAddr) error { c.writeMu.Lock() defer c.writeMu.Unlock() @@ -108,7 +117,7 @@ func (c *UDPConn) SendTo(msg protocol.Message, addr *net.UDPAddr) error { return nil } -// Receive 从 UDP 连接读取一条完整协议消息。 +// Receive reads one full protocol message from UDP. func (c *UDPConn) Receive() (protocol.Message, *net.UDPAddr, error) { msg, addr, err := c.receiveMessageLinux() if err != nil { @@ -118,7 +127,7 @@ func (c *UDPConn) Receive() (protocol.Message, *net.UDPAddr, error) { return msg, addr, nil } -// ReceiveLoop 持续从 UDP 连接读取消息并交给 handler 处理。 +// ReceiveLoop continuously receives messages and passes them to handler. func (c *UDPConn) ReceiveLoop(handler func(protocol.Message, *net.UDPAddr) error) error { for { msg, addr, err := c.Receive() @@ -132,7 +141,7 @@ func (c *UDPConn) ReceiveLoop(handler func(protocol.Message, *net.UDPAddr) error } } -// Close 关闭底层 UDP 连接,保证重复调用安全。 +// Close closes the underlying UDP socket. func (c *UDPConn) Close() error { c.closeOnce.Do(func() { c.closeErr = c.conn.Close() diff --git a/cmd/internal/transport/udp_linux_test.go b/cmd/internal/transport/udp_linux_test.go index 1b1d33e..2472e92 100644 --- a/cmd/internal/transport/udp_linux_test.go +++ b/cmd/internal/transport/udp_linux_test.go @@ -246,3 +246,87 @@ func TestUDPLinuxTimestampingDebugLoggerCapturesDatagramAndErrqueueEvents(t *tes } } } + +func TestUDPLinuxTimestampingCanBeDisabled(t *testing.T) { + senderLogger := &recordingLogger{} + receiverLogger := &recordingLogger{} + + serverAddr, err := net.ResolveUDPAddr("udp", "127.0.0.1:0") + if err != nil { + t.Fatalf("ResolveUDPAddr() error = %v", err) + } + serverRaw, err := net.ListenUDP("udp", serverAddr) + if err != nil { + t.Fatalf("ListenUDP() error = %v", err) + } + receiver, err := NewUDPConn( + serverRaw, + nil, + WithUDPLogger(receiverLogger, latencylog.NodeRolePeer, "peer-b"), + WithUDPLinuxTimestamping(false), + ) + if err != nil { + _ = serverRaw.Close() + t.Fatalf("NewUDPConn(receiver) error = %v", err) + } + t.Cleanup(func() { _ = receiver.Close() }) + + peerRaw, err := net.DialUDP("udp", nil, serverRaw.LocalAddr().(*net.UDPAddr)) + if err != nil { + t.Fatalf("DialUDP() error = %v", err) + } + sender, err := NewUDPConn( + peerRaw, + nil, + WithUDPLogger(senderLogger, latencylog.NodeRolePeer, "peer-a"), + WithUDPLinuxTimestamping(false), + ) + if err != nil { + _ = peerRaw.Close() + t.Fatalf("NewUDPConn(sender) error = %v", err) + } + t.Cleanup(func() { _ = sender.Close() }) + + msg := protocol.Message{ + Type: protocol.MessageTypeText, + ID: 123, + From: "peer-a", + To: "peer-b", + Body: []byte("hello without udp timestamping"), + } + + sendErr := make(chan error, 1) + go func() { + sendErr <- sender.Send(msg) + }() + + got, _, err := receiver.Receive() + if err != nil { + t.Fatalf("Receive() error = %v", err) + } + if err := <-sendErr; err != nil { + t.Fatalf("Send() error = %v", err) + } + if !reflect.DeepEqual(got, msg) { + t.Fatalf("message mismatch: got %+v want %+v", got, msg) + } + if sender.raw != nil { + t.Fatal("sender.raw != nil, want nil when linux timestamping is disabled") + } + if receiver.raw != nil { + t.Fatal("receiver.raw != nil, want nil when linux timestamping is disabled") + } + assertMissingEvent(t, senderLogger.Events(), latencylog.EventATXSched, msg.ID) + assertMissingEvent(t, senderLogger.Events(), latencylog.EventATXSoftware, msg.ID) + assertMissingEvent(t, receiverLogger.Events(), latencylog.EventBRXSoftware, msg.ID) +} + +func assertMissingEvent(t *testing.T, events []latencylog.Event, wantEvent string, wantMessageID uint64) { + t.Helper() + + for _, event := range events { + if event.Event == wantEvent && event.MessageID == wantMessageID { + t.Fatalf("unexpected event %s for message %d: %+v", wantEvent, wantMessageID, event) + } + } +} diff --git a/cmd/udpping/platform_linux.go b/cmd/udpping/platform_linux.go index 7780b69..150dc2e 100644 --- a/cmd/udpping/platform_linux.go +++ b/cmd/udpping/platform_linux.go @@ -35,8 +35,9 @@ func runPlatform(cfg config, stdout, stderr io.Writer, now func() time.Time) err } func dialUDPClient(cfg config) (*peerpkg.UDPClient, func(), error) { - options := make([]peerpkg.Option, 0, 2) + options := make([]peerpkg.Option, 0, 3) closeLogger := func() {} + options = append(options, peerpkg.WithUDPLinuxTimestamping(false)) if cfg.latencyLog != "" { logger, err := latencylog.NewJSONLLogger(cfg.latencyLog) diff --git a/cmd/udpserver/main.go b/cmd/udpserver/main.go index 01bb347..ecb65c3 100644 --- a/cmd/udpserver/main.go +++ b/cmd/udpserver/main.go @@ -13,10 +13,11 @@ import ( func main() { listenAddr := flag.String("listen", ":9001", "UDP server listen address") logPath := flag.String("latency-log", "", "optional JSONL file path for latency timestamp logs") - txTimestampDebugLogPath := flag.String("tx-ts-debug-log", "", "optional JSONL file path for TX errqueue debug records") + txTimestampDebugLogPath := flag.String("tx-ts-debug-log", "", "optional JSONL file path for TX errqueue debug records; enables Linux UDP timestamping") flag.Parse() hubOptions := make([]server.UDPOption, 0, 2) + hubOptions = append(hubOptions, server.WithUDPLinuxTimestamping(false)) if *logPath != "" { logger, err := latencylog.NewJSONLLogger(*logPath) if err != nil { @@ -31,6 +32,7 @@ func main() { log.Fatalf("create tx timestamp debug logger %s: %v", *txTimestampDebugLogPath, err) } defer logger.Close() + hubOptions = append(hubOptions, server.WithUDPLinuxTimestamping(true)) hubOptions = append(hubOptions, server.WithUDPTXTimestampDebugLogger(logger)) }