package transport import ( "fmt" "net" "sync" "syscall" "omnisocketgo/cmd/internal/latencylog" "omnisocketgo/cmd/internal/protocol" ) // UDPConn wraps a UDP socket for protocol message send/receive. type UDPConn struct { conn *net.UDPConn peerAddr *net.UDPAddr raw syscall.RawConn linuxTimestampingEnabled bool logger latencylog.Logger txTimestampDebugLogger TXTimestampDebugLogger txPacketSeq uint32 pendingTX map[uint32]udpTXPendingRecord nodeRole string nodeID string writeMu sync.Mutex closeOnce sync.Once closeErr error } type udpTXPendingRecord struct { msg protocol.Message sendCallIndex int bytesWritten int expectedTXID uint32 observedTimestamps map[string]int64 } // UDPOption configures an optional behavior on UDPConn. type UDPOption func(*UDPConn) // WithUDPLogger attaches latency logging context to a UDP connection. func WithUDPLogger(logger latencylog.Logger, nodeRole, nodeID string) UDPOption { return func(conn *UDPConn) { conn.logger = logger conn.nodeRole = nodeRole conn.nodeID = nodeID } } // WithUDPTXTimestampDebugLogger attaches a TX errqueue debug logger. func WithUDPTXTimestampDebugLogger(logger TXTimestampDebugLogger) UDPOption { return func(conn *UDPConn) { conn.txTimestampDebugLogger = logger } } // WithUDPLinuxTimestamping controls whether Linux UDP timestamping is enabled. func WithUDPLinuxTimestamping(enabled bool) UDPOption { return func(conn *UDPConn) { conn.linuxTimestampingEnabled = enabled } } // NewUDPConn creates a UDP transport wrapper. func NewUDPConn(conn *net.UDPConn, peerAddr *net.UDPAddr, opts ...UDPOption) (*UDPConn, error) { udpConn := &UDPConn{ conn: conn, peerAddr: peerAddr, linuxTimestampingEnabled: true, logger: latencylog.NoopLogger{}, pendingTX: make(map[uint32]udpTXPendingRecord), } for _, opt := range opts { opt(udpConn) } if udpConn.logger == nil { udpConn.logger = latencylog.NoopLogger{} } if udpConn.linuxTimestampingEnabled { if err := udpConn.initUDPLinuxTimestamping(); err != nil { return nil, err } } return udpConn, nil } // Send encodes and sends one protocol message over UDP. func (c *UDPConn) Send(msg protocol.Message) error { c.writeMu.Lock() defer c.writeMu.Unlock() latencylog.LogMessageEvent(c.logger, c.nodeRole, c.nodeID, latencylog.EventSendHandoffBegin, msg) if err := c.sendMessageLinux(msg); err != nil { return fmt.Errorf("transport: udp send message: %w", err) } latencylog.LogMessageEvent(c.logger, c.nodeRole, c.nodeID, latencylog.EventSendHandoffEnd, msg) return nil } // SendTo encodes and sends one protocol message to a specific UDP address. func (c *UDPConn) SendTo(msg protocol.Message, addr *net.UDPAddr) error { c.writeMu.Lock() defer c.writeMu.Unlock() latencylog.LogMessageEvent(c.logger, c.nodeRole, c.nodeID, latencylog.EventSendHandoffBegin, msg) if err := c.sendMessageToLinux(msg, addr); err != nil { return fmt.Errorf("transport: udp send message to %s: %w", addr, err) } latencylog.LogMessageEvent(c.logger, c.nodeRole, c.nodeID, latencylog.EventSendHandoffEnd, msg) return nil } // Receive reads one full protocol message from UDP. func (c *UDPConn) Receive() (protocol.Message, *net.UDPAddr, error) { msg, addr, err := c.receiveMessageLinux() if err != nil { return protocol.Message{}, nil, fmt.Errorf("transport: udp receive message: %w", err) } return msg, addr, nil } // ReceiveLoop continuously receives messages and passes them to handler. func (c *UDPConn) ReceiveLoop(handler func(protocol.Message, *net.UDPAddr) error) error { for { msg, addr, err := c.Receive() if err != nil { return fmt.Errorf("transport: udp receive loop read: %w", err) } if err := handler(msg, addr); err != nil { return fmt.Errorf("transport: udp receive loop handler: %w", err) } } } // Close closes the underlying UDP socket. func (c *UDPConn) Close() error { c.closeOnce.Do(func() { c.closeErr = c.conn.Close() }) return c.closeErr }