//go:build linux package transport import ( "errors" "fmt" "net" "syscall" "time" "omnisocketgo/cmd/internal/latencylog" "omnisocketgo/cmd/internal/protocol" ) // UDP 接收缓冲区需要容纳完整 datagram 和协议头。 const udpReceiveBufferSize = protocol.MaxFrameSize + 1024 // initUDPLinuxTimestamping 拿到底层 fd,并打开 Linux timestamping。 func (c *UDPConn) initUDPLinuxTimestamping() error { rawConn, err := c.conn.SyscallConn() if err != nil || rawConn == nil { if err != nil { return fmt.Errorf("transport: udp get syscall conn: %w", err) } return fmt.Errorf("transport: udp missing syscall conn") } flagCandidates := []int{ linuxSOFTimestampingTXSched | linuxSOFTimestampingTXSoftware | linuxSOFTimestampingRXSoftware | linuxSOFTimestampingSoftware | linuxSOFTimestampingOptID | linuxSOFTimestampingOptTSONLY, linuxSOFTimestampingTXSched | linuxSOFTimestampingTXSoftware | linuxSOFTimestampingRXSoftware | linuxSOFTimestampingSoftware | linuxSOFTimestampingOptTSONLY, } var lastErr error for _, flags := range flagCandidates { err := rawConn.Control(func(fd uintptr) { lastErr = syscall.SetsockoptInt(int(fd), syscall.SOL_SOCKET, linuxSOTimestampingNew, flags) }) if err != nil { return err } if lastErr == nil { c.raw = rawConn return nil } if !errors.Is(lastErr, syscall.EINVAL) { return lastErr } } return lastErr } // sendMessageLinux 编码消息并通过 UDP 发送,采集 TX 时间戳。 func (c *UDPConn) sendMessageLinux(msg protocol.Message) error { payload, err := protocol.EncodeMessage(msg) if err != nil { return fmt.Errorf("protocol: encode message: %w", err) } return c.sendUDPPayloadLinux(msg, payload, c.peerAddr) } // sendMessageToLinux 编码消息并通过 UDP 发送到指定地址,采集 TX 时间戳。 func (c *UDPConn) sendMessageToLinux(msg protocol.Message, addr *net.UDPAddr) error { payload, err := protocol.EncodeMessage(msg) if err != nil { return fmt.Errorf("protocol: encode message: %w", err) } return c.sendUDPPayloadLinux(msg, payload, addr) } func (c *UDPConn) sendUDPPayloadLinux(msg protocol.Message, payload []byte, addr *net.UDPAddr) error { readIndex := 0 // pre-send drain 可能读到上一条消息晚到的 errqueue 事件, // 这里必须先清掉,并且按 ee_data 归还给原消息,不能污染当前消息。 c.drainPendingUDPTXTimestampEvents(linuxTXTimestampPhasePreSendDrain, &readIndex) chunk := c.newUDPSendChunk(len(payload)) var err error if addr != nil { err = c.udpSendTo(payload, addr) } else { err = c.udpSend(payload) } if err != nil { return err } c.commitUDPSend(msg, chunk) c.collectAndLogUDPTXTimestampEvents(msg, chunk, &readIndex) return nil } func (c *UDPConn) newUDPSendChunk(payloadLen int) txSendChunk { return txSendChunk{ SendCallIndex: 0, FrameOffsetStart: 0, FrameOffsetEnd: payloadLen - 1, BytesWritten: payloadLen, ExpectedTXID: c.txPacketSeq, } } func (c *UDPConn) commitUDPSend(msg protocol.Message, chunk txSendChunk) { // Linux 对 UDP datagram 的 ee_data 是按包递增的 ID。 // 这里把 ID 和原始消息元数据绑定起来,后续 drain 到的晚到事件才能记回原消息。 if c.txTimestampDebugLogger != nil { c.pendingTX[chunk.ExpectedTXID] = udpTXPendingRecord{ msg: msg, sendCallIndex: chunk.SendCallIndex, bytesWritten: chunk.BytesWritten, expectedTXID: chunk.ExpectedTXID, observedTimestamps: make(map[string]int64, 2), } c.logUDPTXSendChunkDebugRecord(msg, chunk) } c.txPacketSeq++ } // udpSend 通过已连接的 UDP socket 发送数据。 func (c *UDPConn) udpSend(payload []byte) error { if c.raw != nil { return c.udpSendmsgRaw(payload, nil) } _, err := c.conn.Write(payload) return err } // udpSendTo 通过 UDP socket 发送数据到指定地址。 func (c *UDPConn) udpSendTo(payload []byte, addr *net.UDPAddr) error { if c.raw != nil { sa := udpAddrToSockaddr(addr) if sa != nil { return c.udpSendmsgRaw(payload, sa) } } _, err := c.conn.WriteToUDP(payload, addr) return err } // udpSendmsgRaw 通过 sendmsg syscall 发送 UDP 数据。 func (c *UDPConn) udpSendmsgRaw(payload []byte, to syscall.Sockaddr) error { var opErr error for { err := c.raw.Control(func(fd uintptr) { opErr = syscall.Sendmsg(int(fd), payload, nil, to, 0) }) if err != nil { return err } if opErr == nil { return nil } if isWouldBlock(opErr) { time.Sleep(linuxDataPollInterval) continue } return opErr } } // receiveMessageLinux 从 UDP 连接读取一条完整消息,并记录 RX 时间戳。 func (c *UDPConn) receiveMessageLinux() (protocol.Message, *net.UDPAddr, error) { payload, addr, rxTimestamp, err := c.udpRecvFrom() if err != nil { return protocol.Message{}, nil, fmt.Errorf("protocol: udp read: %w", err) } msg, err := protocol.DecodeMessage(payload) if err != nil { return protocol.Message{}, nil, fmt.Errorf("protocol: decode message: %w", err) } if rxTimestamp > 0 { latencylog.LogMessageEventAt(c.logger, c.nodeRole, c.nodeID, latencylog.EventBRXSoftware, rxTimestamp, msg) } return msg, addr, nil } // udpRecvFrom 从 UDP socket 接收一个完整数据报,返回数据、来源地址和 RX 时间戳。 func (c *UDPConn) udpRecvFrom() ([]byte, *net.UDPAddr, int64, error) { if c.raw != nil { return c.udpRecvmsgRaw() } buf := make([]byte, udpReceiveBufferSize) n, addr, err := c.conn.ReadFromUDP(buf) if err != nil { return nil, nil, 0, err } return buf[:n], addr, 0, nil } // udpRecvmsgRaw 通过 recvmsg syscall 接收 UDP 数据,同时采集 RX 时间戳。 func (c *UDPConn) udpRecvmsgRaw() ([]byte, *net.UDPAddr, int64, error) { for { var ( n int rxTimeNS int64 from syscall.Sockaddr opErr error ) buf := make([]byte, udpReceiveBufferSize) err := c.raw.Control(func(fd uintptr) { oob := make([]byte, linuxTimestampControlBufferSize) readN, oobN, _, sa, recvErr := syscall.Recvmsg(int(fd), buf, oob, 0) if recvErr != nil { opErr = recvErr return } n = readN from = sa rxTimeNS = parseRXTimestampControlMessages(oob[:oobN]) }) if err != nil { return nil, nil, 0, err } if opErr != nil { if isWouldBlock(opErr) { time.Sleep(linuxDataPollInterval) continue } return nil, nil, 0, opErr } return buf[:n], sockaddrToUDPAddr(from), rxTimeNS, nil } } // collectAndLogUDPTXTimestampEvents 采集并记录 UDP 发送的 TX 时间戳事件。 func (c *UDPConn) collectAndLogUDPTXTimestampEvents(msg protocol.Message, chunk txSendChunk, readIndex *int) { timestamps := c.collectUDPTXTimestampEvents(msg, chunk, readIndex) if ts, ok := timestamps[latencylog.EventATXSched]; ok { latencylog.LogMessageEventAt(c.logger, c.nodeRole, c.nodeID, latencylog.EventATXSched, ts, msg) } if ts, ok := timestamps[latencylog.EventATXSoftware]; ok { latencylog.LogMessageEventAt(c.logger, c.nodeRole, c.nodeID, latencylog.EventATXSoftware, ts, msg) } } // collectUDPTXTimestampEvents 在 errqueue 中等待 TX 时间戳。 func (c *UDPConn) collectUDPTXTimestampEvents(msg protocol.Message, chunk txSendChunk, readIndex *int) map[string]int64 { if c.raw == nil { return nil } deadline := time.Now().Add(linuxTXTimestampWaitTimeout) observed := make([]observedTXTimestampEvent, 0, 4) for time.Now().Before(deadline) { event, err := c.recvUDPTXTimestampOnce() if err != nil { if isWouldBlock(err) { time.Sleep(linuxTXTimestampPollInterval) continue } break } if event.EventName == "" || event.TSUnixNano <= 0 { continue } observed = append(observed, observedTXTimestampEvent{ Phase: linuxTXTimestampPhasePostSendCollect, ReadIndex: *readIndex, Event: event, }) c.recordUDPPendingEvent(event) *readIndex = *readIndex + 1 selection := selectTXTimestampEvents(observed, chunk.ExpectedTXID, true) if selection.HasEvent && selection.SelectedID == chunk.ExpectedTXID && hasCompleteTXTimestampPair(selection.Timestamps) { break } } selection := selectTXTimestampEvents(observed, chunk.ExpectedTXID, true) c.logObservedUDPTXTimestampEvents(msg, chunk, observed, selection) c.releaseCompletedUDPPendingFromObserved(observed) c.drainPendingUDPTXTimestampEvents(linuxTXTimestampPhasePostSelectDrain, readIndex) c.releaseCompletedUDPPending(chunk.ExpectedTXID) return selection.Timestamps } // drainPendingUDPTXTimestampEvents 清空 errqueue 中残留的时间戳事件。 func (c *UDPConn) drainPendingUDPTXTimestampEvents(phase string, readIndex *int) { if c.raw == nil { return } for { event, err := c.recvUDPTXTimestampOnce() if err != nil { if isWouldBlock(err) { return } return } if event.EventName == "" || event.TSUnixNano <= 0 { continue } complete := c.recordUDPPendingEvent(event) if msg, chunks, ok := c.lookupUDPPendingDebugContext(event.EEData); ok { c.logUDPTXErrqueueDebugRecord(msg, chunks, observedTXTimestampEvent{ Phase: phase, ReadIndex: *readIndex, Event: event, }, false) if complete { delete(c.pendingTX, event.EEData) } } *readIndex = *readIndex + 1 } } // recvUDPTXTimestampOnce 从 errqueue 读一次时间戳事件。 func (c *UDPConn) recvUDPTXTimestampOnce() (txTimestampEvent, error) { var ( event txTimestampEvent opErr error ) err := c.raw.Control(func(fd uintptr) { oob := make([]byte, linuxTimestampControlBufferSize) _, oobn, _, _, recvErr := syscall.Recvmsg(int(fd), nil, oob, syscall.MSG_ERRQUEUE|syscall.MSG_DONTWAIT) if recvErr != nil { opErr = recvErr return } event, _ = parseTXTimestampControlMessages(oob[:oobn]) }) if err != nil { return txTimestampEvent{}, err } if opErr != nil { return txTimestampEvent{}, opErr } return event, nil } func (c *UDPConn) recordUDPPendingEvent(event txTimestampEvent) bool { if !isBusinessTXTimestampEventName(event.EventName) { return false } record, ok := c.pendingTX[event.EEData] if !ok { return false } if record.observedTimestamps == nil { record.observedTimestamps = make(map[string]int64, 2) } if existing, exists := record.observedTimestamps[event.EventName]; !exists || event.TSUnixNano < existing { record.observedTimestamps[event.EventName] = event.TSUnixNano } c.pendingTX[event.EEData] = record return hasCompleteTXTimestampPair(record.observedTimestamps) } func (c *UDPConn) releaseCompletedUDPPending(txID uint32) { record, ok := c.pendingTX[txID] if !ok { return } if hasCompleteTXTimestampPair(record.observedTimestamps) { delete(c.pendingTX, txID) } } func (c *UDPConn) releaseCompletedUDPPendingFromObserved(observed []observedTXTimestampEvent) { seen := make(map[uint32]struct{}, len(observed)) for _, entry := range observed { if _, ok := seen[entry.Event.EEData]; ok { continue } c.releaseCompletedUDPPending(entry.Event.EEData) seen[entry.Event.EEData] = struct{}{} } } func (c *UDPConn) lookupUDPPendingDebugContext(txID uint32) (protocol.Message, []txSendChunk, bool) { record, ok := c.pendingTX[txID] if !ok { return protocol.Message{}, nil, false } chunk := txSendChunk{ SendCallIndex: record.sendCallIndex, FrameOffsetStart: 0, FrameOffsetEnd: record.bytesWritten - 1, BytesWritten: record.bytesWritten, ExpectedTXID: record.expectedTXID, } return record.msg, []txSendChunk{chunk}, true } func (c *UDPConn) logObservedUDPTXTimestampEvents(msg protocol.Message, chunk txSendChunk, observed []observedTXTimestampEvent, selection txTimestampSelection) { if len(observed) == 0 { return } currentChunks := []txSendChunk{chunk} for _, entry := range observed { recordMsg := msg recordChunks := currentChunks if pendingMsg, pendingChunks, ok := c.lookupUDPPendingDebugContext(entry.Event.EEData); ok { recordMsg = pendingMsg recordChunks = pendingChunks } // 理想情况应命中本次 expectedTXID;如果等待窗口里只看到了更高的 ee_data, // 就退回到本轮实际观察到的最新事件,至少保留调试和定位线索。 selected := selection.HasEvent && entry.Event.EEData == selection.SelectedID && isBusinessTXTimestampEventName(entry.Event.EventName) && selection.Timestamps[entry.Event.EventName] == entry.Event.TSUnixNano c.logUDPTXErrqueueDebugRecord(recordMsg, recordChunks, entry, selected) } } func (c *UDPConn) logUDPTXSendChunkDebugRecord(msg protocol.Message, chunk txSendChunk) { if c.txTimestampDebugLogger == nil { return } sendCallIndex := chunk.SendCallIndex frameOffsetStart := chunk.FrameOffsetStart frameOffsetEnd := chunk.FrameOffsetEnd bytesWritten := chunk.BytesWritten expectedTXID := chunk.ExpectedTXID record := c.newUDPTXTimestampDebugRecord(msg) record.RecordType = txTimestampDebugRecordTypeSendChunk record.SendCallIndex = &sendCallIndex record.FrameOffsetStart = &frameOffsetStart record.FrameOffsetEnd = &frameOffsetEnd record.BytesWritten = &bytesWritten record.ExpectedTXID = &expectedTXID c.logUDPTXTimestampDebugRecord(record) } func (c *UDPConn) logUDPTXErrqueueDebugRecord(msg protocol.Message, chunks []txSendChunk, observed observedTXTimestampEvent, selected bool) { if c.txTimestampDebugLogger == nil { return } readIndex := observed.ReadIndex tsUnixNano := observed.Event.TSUnixNano eeInfo := observed.Event.EEInfo eeData := observed.Event.EEData selectedForLatency := selected record := c.newUDPTXTimestampDebugRecord(msg) record.RecordType = txTimestampDebugRecordTypeErrqueueEvent record.Phase = observed.Phase record.ReadIndex = &readIndex record.EventName = observed.Event.EventName record.TSUnixNano = &tsUnixNano record.EEInfo = &eeInfo record.EEData = &eeData record.SelectedForLatency = &selectedForLatency if matchedSendCallIndex, ok := matchTXTimestampEventToSendChunk(observed.Event.EEData, chunks); ok { record.MatchedSendCallIndex = &matchedSendCallIndex } c.logUDPTXTimestampDebugRecord(record) } func (c *UDPConn) newUDPTXTimestampDebugRecord(msg protocol.Message) TXTimestampDebugRecord { return TXTimestampDebugRecord{ NodeRole: c.nodeRole, NodeID: c.nodeID, MessageType: msg.Type, MessageID: msg.ID, From: msg.From, To: msg.To, FileName: msg.FileName, BodySize: len(msg.Body), } } func (c *UDPConn) logUDPTXTimestampDebugRecord(record TXTimestampDebugRecord) { if c.txTimestampDebugLogger == nil { return } _ = c.txTimestampDebugLogger.LogTXTimestampDebugRecord(record) } // udpAddrToSockaddr 将 net.UDPAddr 转换为 syscall.Sockaddr。 func udpAddrToSockaddr(addr *net.UDPAddr) syscall.Sockaddr { if ip4 := addr.IP.To4(); ip4 != nil { sa := &syscall.SockaddrInet4{Port: addr.Port} copy(sa.Addr[:], ip4) return sa } if ip6 := addr.IP.To16(); ip6 != nil { sa := &syscall.SockaddrInet6{Port: addr.Port} copy(sa.Addr[:], ip6) return sa } return nil } // sockaddrToUDPAddr 将 syscall.Sockaddr 转换为 net.UDPAddr。 func sockaddrToUDPAddr(sa syscall.Sockaddr) *net.UDPAddr { switch addr := sa.(type) { case *syscall.SockaddrInet4: return &net.UDPAddr{ IP: net.IP(addr.Addr[:]), Port: addr.Port, } case *syscall.SockaddrInet6: return &net.UDPAddr{ IP: net.IP(addr.Addr[:]), Port: addr.Port, Zone: zoneToString(addr.ZoneId), } default: return nil } } func zoneToString(zone uint32) string { if zone == 0 { return "" } iface, err := net.InterfaceByIndex(int(zone)) if err != nil { return "" } return iface.Name }