Files
OmniSocketGo/cmd/internal/transport/udp_linux.go

557 lines
15 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
//go:build linux
package transport
import (
"errors"
"fmt"
"net"
"syscall"
"time"
"omnisocketgo/cmd/internal/latencylog"
"omnisocketgo/cmd/internal/protocol"
)
// UDP 接收缓冲区需要容纳完整 datagram 和协议头。
const udpReceiveBufferSize = protocol.MaxFrameSize + 1024
// initUDPLinuxTimestamping 拿到底层 fd并打开 Linux timestamping。
func (c *UDPConn) initUDPLinuxTimestamping() error {
rawConn, err := c.conn.SyscallConn()
if err != nil || rawConn == nil {
if err != nil {
return fmt.Errorf("transport: udp get syscall conn: %w", err)
}
return fmt.Errorf("transport: udp missing syscall conn")
}
flagCandidates := []int{
linuxSOFTimestampingTXSched |
linuxSOFTimestampingTXSoftware |
linuxSOFTimestampingRXSoftware |
linuxSOFTimestampingSoftware |
linuxSOFTimestampingOptID |
linuxSOFTimestampingOptTSONLY,
linuxSOFTimestampingTXSched |
linuxSOFTimestampingTXSoftware |
linuxSOFTimestampingRXSoftware |
linuxSOFTimestampingSoftware |
linuxSOFTimestampingOptTSONLY,
}
var lastErr error
for _, flags := range flagCandidates {
err := rawConn.Control(func(fd uintptr) {
lastErr = syscall.SetsockoptInt(int(fd), syscall.SOL_SOCKET, linuxSOTimestampingNew, flags)
})
if err != nil {
return err
}
if lastErr == nil {
c.raw = rawConn
return nil
}
if !errors.Is(lastErr, syscall.EINVAL) {
return lastErr
}
}
return lastErr
}
// sendMessageLinux 编码消息并通过 UDP 发送,采集 TX 时间戳。
func (c *UDPConn) sendMessageLinux(msg protocol.Message) error {
payload, err := protocol.EncodeMessage(msg)
if err != nil {
return fmt.Errorf("protocol: encode message: %w", err)
}
return c.sendUDPPayloadLinux(msg, payload, c.peerAddr)
}
// sendMessageToLinux 编码消息并通过 UDP 发送到指定地址,采集 TX 时间戳。
func (c *UDPConn) sendMessageToLinux(msg protocol.Message, addr *net.UDPAddr) error {
payload, err := protocol.EncodeMessage(msg)
if err != nil {
return fmt.Errorf("protocol: encode message: %w", err)
}
return c.sendUDPPayloadLinux(msg, payload, addr)
}
func (c *UDPConn) sendUDPPayloadLinux(msg protocol.Message, payload []byte, addr *net.UDPAddr) error {
readIndex := 0
// pre-send drain 可能读到上一条消息晚到的 errqueue 事件,
// 这里必须先清掉,并且按 ee_data 归还给原消息,不能污染当前消息。
c.drainPendingUDPTXTimestampEvents(linuxTXTimestampPhasePreSendDrain, &readIndex)
chunk := c.newUDPSendChunk(len(payload))
var err error
if addr != nil {
err = c.udpSendTo(payload, addr)
} else {
err = c.udpSend(payload)
}
if err != nil {
return err
}
c.commitUDPSend(msg, chunk)
c.collectAndLogUDPTXTimestampEvents(msg, chunk, &readIndex)
return nil
}
func (c *UDPConn) newUDPSendChunk(payloadLen int) txSendChunk {
return txSendChunk{
SendCallIndex: 0,
FrameOffsetStart: 0,
FrameOffsetEnd: payloadLen - 1,
BytesWritten: payloadLen,
ExpectedTXID: c.txPacketSeq,
}
}
func (c *UDPConn) commitUDPSend(msg protocol.Message, chunk txSendChunk) {
// Linux 对 UDP datagram 的 ee_data 是按包递增的 ID。
// 这里把 ID 和原始消息元数据绑定起来,后续 drain 到的晚到事件才能记回原消息。
if c.txTimestampDebugLogger != nil {
c.pendingTX[chunk.ExpectedTXID] = udpTXPendingRecord{
msg: msg,
sendCallIndex: chunk.SendCallIndex,
bytesWritten: chunk.BytesWritten,
expectedTXID: chunk.ExpectedTXID,
observedTimestamps: make(map[string]int64, 2),
}
c.logUDPTXSendChunkDebugRecord(msg, chunk)
}
c.txPacketSeq++
}
// udpSend 通过已连接的 UDP socket 发送数据。
func (c *UDPConn) udpSend(payload []byte) error {
if c.raw != nil {
return c.udpSendmsgRaw(payload, nil)
}
_, err := c.conn.Write(payload)
return err
}
// udpSendTo 通过 UDP socket 发送数据到指定地址。
func (c *UDPConn) udpSendTo(payload []byte, addr *net.UDPAddr) error {
if c.raw != nil {
sa := udpAddrToSockaddr(addr)
if sa != nil {
return c.udpSendmsgRaw(payload, sa)
}
}
_, err := c.conn.WriteToUDP(payload, addr)
return err
}
// udpSendmsgRaw 通过 sendmsg syscall 发送 UDP 数据。
func (c *UDPConn) udpSendmsgRaw(payload []byte, to syscall.Sockaddr) error {
var opErr error
for {
err := c.raw.Control(func(fd uintptr) {
opErr = syscall.Sendmsg(int(fd), payload, nil, to, 0)
})
if err != nil {
return err
}
if opErr == nil {
return nil
}
if isWouldBlock(opErr) {
time.Sleep(linuxDataPollInterval)
continue
}
return opErr
}
}
// receiveMessageLinux 从 UDP 连接读取一条完整消息,并记录 RX 时间戳。
func (c *UDPConn) receiveMessageLinux() (protocol.Message, *net.UDPAddr, error) {
payload, addr, rxTimestamp, err := c.udpRecvFrom()
if err != nil {
return protocol.Message{}, nil, fmt.Errorf("protocol: udp read: %w", err)
}
msg, err := protocol.DecodeMessage(payload)
if err != nil {
return protocol.Message{}, nil, fmt.Errorf("protocol: decode message: %w", err)
}
if rxTimestamp > 0 {
latencylog.LogMessageEventAt(c.logger, c.nodeRole, c.nodeID, latencylog.EventBRXSoftware, rxTimestamp, msg)
}
return msg, addr, nil
}
// udpRecvFrom 从 UDP socket 接收一个完整数据报,返回数据、来源地址和 RX 时间戳。
func (c *UDPConn) udpRecvFrom() ([]byte, *net.UDPAddr, int64, error) {
if c.raw != nil {
return c.udpRecvmsgRaw()
}
buf := make([]byte, udpReceiveBufferSize)
n, addr, err := c.conn.ReadFromUDP(buf)
if err != nil {
return nil, nil, 0, err
}
return buf[:n], addr, 0, nil
}
// udpRecvmsgRaw 通过 recvmsg syscall 接收 UDP 数据,同时采集 RX 时间戳。
func (c *UDPConn) udpRecvmsgRaw() ([]byte, *net.UDPAddr, int64, error) {
for {
var (
n int
rxTimeNS int64
from syscall.Sockaddr
opErr error
)
buf := make([]byte, udpReceiveBufferSize)
err := c.raw.Control(func(fd uintptr) {
oob := make([]byte, linuxTimestampControlBufferSize)
readN, oobN, _, sa, recvErr := syscall.Recvmsg(int(fd), buf, oob, 0)
if recvErr != nil {
opErr = recvErr
return
}
n = readN
from = sa
rxTimeNS = parseRXTimestampControlMessages(oob[:oobN])
})
if err != nil {
return nil, nil, 0, err
}
if opErr != nil {
if isWouldBlock(opErr) {
time.Sleep(linuxDataPollInterval)
continue
}
return nil, nil, 0, opErr
}
return buf[:n], sockaddrToUDPAddr(from), rxTimeNS, nil
}
}
// collectAndLogUDPTXTimestampEvents 采集并记录 UDP 发送的 TX 时间戳事件。
func (c *UDPConn) collectAndLogUDPTXTimestampEvents(msg protocol.Message, chunk txSendChunk, readIndex *int) {
timestamps := c.collectUDPTXTimestampEvents(msg, chunk, readIndex)
if ts, ok := timestamps[latencylog.EventATXSched]; ok {
latencylog.LogMessageEventAt(c.logger, c.nodeRole, c.nodeID, latencylog.EventATXSched, ts, msg)
}
if ts, ok := timestamps[latencylog.EventATXSoftware]; ok {
latencylog.LogMessageEventAt(c.logger, c.nodeRole, c.nodeID, latencylog.EventATXSoftware, ts, msg)
}
}
// collectUDPTXTimestampEvents 在 errqueue 中等待 TX 时间戳。
func (c *UDPConn) collectUDPTXTimestampEvents(msg protocol.Message, chunk txSendChunk, readIndex *int) map[string]int64 {
if c.raw == nil {
return nil
}
deadline := time.Now().Add(linuxTXTimestampWaitTimeout)
observed := make([]observedTXTimestampEvent, 0, 4)
for time.Now().Before(deadline) {
event, err := c.recvUDPTXTimestampOnce()
if err != nil {
if isWouldBlock(err) {
time.Sleep(linuxTXTimestampPollInterval)
continue
}
break
}
if event.EventName == "" || event.TSUnixNano <= 0 {
continue
}
observed = append(observed, observedTXTimestampEvent{
Phase: linuxTXTimestampPhasePostSendCollect,
ReadIndex: *readIndex,
Event: event,
})
c.recordUDPPendingEvent(event)
*readIndex = *readIndex + 1
selection := selectTXTimestampEvents(observed, chunk.ExpectedTXID, true)
if selection.HasEvent && selection.SelectedID == chunk.ExpectedTXID && hasCompleteTXTimestampPair(selection.Timestamps) {
break
}
}
selection := selectTXTimestampEvents(observed, chunk.ExpectedTXID, true)
c.logObservedUDPTXTimestampEvents(msg, chunk, observed, selection)
c.releaseCompletedUDPPendingFromObserved(observed)
c.drainPendingUDPTXTimestampEvents(linuxTXTimestampPhasePostSelectDrain, readIndex)
c.releaseCompletedUDPPending(chunk.ExpectedTXID)
return selection.Timestamps
}
// drainPendingUDPTXTimestampEvents 清空 errqueue 中残留的时间戳事件。
func (c *UDPConn) drainPendingUDPTXTimestampEvents(phase string, readIndex *int) {
if c.raw == nil {
return
}
for {
event, err := c.recvUDPTXTimestampOnce()
if err != nil {
if isWouldBlock(err) {
return
}
return
}
if event.EventName == "" || event.TSUnixNano <= 0 {
continue
}
complete := c.recordUDPPendingEvent(event)
if msg, chunks, ok := c.lookupUDPPendingDebugContext(event.EEData); ok {
c.logUDPTXErrqueueDebugRecord(msg, chunks, observedTXTimestampEvent{
Phase: phase,
ReadIndex: *readIndex,
Event: event,
}, false)
if complete {
delete(c.pendingTX, event.EEData)
}
}
*readIndex = *readIndex + 1
}
}
// recvUDPTXTimestampOnce 从 errqueue 读一次时间戳事件。
func (c *UDPConn) recvUDPTXTimestampOnce() (txTimestampEvent, error) {
var (
event txTimestampEvent
opErr error
)
err := c.raw.Control(func(fd uintptr) {
oob := make([]byte, linuxTimestampControlBufferSize)
_, oobn, _, _, recvErr := syscall.Recvmsg(int(fd), nil, oob, syscall.MSG_ERRQUEUE|syscall.MSG_DONTWAIT)
if recvErr != nil {
opErr = recvErr
return
}
event, _ = parseTXTimestampControlMessages(oob[:oobn])
})
if err != nil {
return txTimestampEvent{}, err
}
if opErr != nil {
return txTimestampEvent{}, opErr
}
return event, nil
}
func (c *UDPConn) recordUDPPendingEvent(event txTimestampEvent) bool {
if !isBusinessTXTimestampEventName(event.EventName) {
return false
}
record, ok := c.pendingTX[event.EEData]
if !ok {
return false
}
if record.observedTimestamps == nil {
record.observedTimestamps = make(map[string]int64, 2)
}
if existing, exists := record.observedTimestamps[event.EventName]; !exists || event.TSUnixNano < existing {
record.observedTimestamps[event.EventName] = event.TSUnixNano
}
c.pendingTX[event.EEData] = record
return hasCompleteTXTimestampPair(record.observedTimestamps)
}
func (c *UDPConn) releaseCompletedUDPPending(txID uint32) {
record, ok := c.pendingTX[txID]
if !ok {
return
}
if hasCompleteTXTimestampPair(record.observedTimestamps) {
delete(c.pendingTX, txID)
}
}
func (c *UDPConn) releaseCompletedUDPPendingFromObserved(observed []observedTXTimestampEvent) {
seen := make(map[uint32]struct{}, len(observed))
for _, entry := range observed {
if _, ok := seen[entry.Event.EEData]; ok {
continue
}
c.releaseCompletedUDPPending(entry.Event.EEData)
seen[entry.Event.EEData] = struct{}{}
}
}
func (c *UDPConn) lookupUDPPendingDebugContext(txID uint32) (protocol.Message, []txSendChunk, bool) {
record, ok := c.pendingTX[txID]
if !ok {
return protocol.Message{}, nil, false
}
chunk := txSendChunk{
SendCallIndex: record.sendCallIndex,
FrameOffsetStart: 0,
FrameOffsetEnd: record.bytesWritten - 1,
BytesWritten: record.bytesWritten,
ExpectedTXID: record.expectedTXID,
}
return record.msg, []txSendChunk{chunk}, true
}
func (c *UDPConn) logObservedUDPTXTimestampEvents(msg protocol.Message, chunk txSendChunk, observed []observedTXTimestampEvent, selection txTimestampSelection) {
if len(observed) == 0 {
return
}
currentChunks := []txSendChunk{chunk}
for _, entry := range observed {
recordMsg := msg
recordChunks := currentChunks
if pendingMsg, pendingChunks, ok := c.lookupUDPPendingDebugContext(entry.Event.EEData); ok {
recordMsg = pendingMsg
recordChunks = pendingChunks
}
// 理想情况应命中本次 expectedTXID如果等待窗口里只看到了更高的 ee_data
// 就退回到本轮实际观察到的最新事件,至少保留调试和定位线索。
selected := selection.HasEvent &&
entry.Event.EEData == selection.SelectedID &&
isBusinessTXTimestampEventName(entry.Event.EventName) &&
selection.Timestamps[entry.Event.EventName] == entry.Event.TSUnixNano
c.logUDPTXErrqueueDebugRecord(recordMsg, recordChunks, entry, selected)
}
}
func (c *UDPConn) logUDPTXSendChunkDebugRecord(msg protocol.Message, chunk txSendChunk) {
if c.txTimestampDebugLogger == nil {
return
}
sendCallIndex := chunk.SendCallIndex
frameOffsetStart := chunk.FrameOffsetStart
frameOffsetEnd := chunk.FrameOffsetEnd
bytesWritten := chunk.BytesWritten
expectedTXID := chunk.ExpectedTXID
record := c.newUDPTXTimestampDebugRecord(msg)
record.RecordType = txTimestampDebugRecordTypeSendChunk
record.SendCallIndex = &sendCallIndex
record.FrameOffsetStart = &frameOffsetStart
record.FrameOffsetEnd = &frameOffsetEnd
record.BytesWritten = &bytesWritten
record.ExpectedTXID = &expectedTXID
c.logUDPTXTimestampDebugRecord(record)
}
func (c *UDPConn) logUDPTXErrqueueDebugRecord(msg protocol.Message, chunks []txSendChunk, observed observedTXTimestampEvent, selected bool) {
if c.txTimestampDebugLogger == nil {
return
}
readIndex := observed.ReadIndex
tsUnixNano := observed.Event.TSUnixNano
eeInfo := observed.Event.EEInfo
eeData := observed.Event.EEData
selectedForLatency := selected
record := c.newUDPTXTimestampDebugRecord(msg)
record.RecordType = txTimestampDebugRecordTypeErrqueueEvent
record.Phase = observed.Phase
record.ReadIndex = &readIndex
record.EventName = observed.Event.EventName
record.TSUnixNano = &tsUnixNano
record.EEInfo = &eeInfo
record.EEData = &eeData
record.SelectedForLatency = &selectedForLatency
if matchedSendCallIndex, ok := matchTXTimestampEventToSendChunk(observed.Event.EEData, chunks); ok {
record.MatchedSendCallIndex = &matchedSendCallIndex
}
c.logUDPTXTimestampDebugRecord(record)
}
func (c *UDPConn) newUDPTXTimestampDebugRecord(msg protocol.Message) TXTimestampDebugRecord {
return TXTimestampDebugRecord{
NodeRole: c.nodeRole,
NodeID: c.nodeID,
MessageType: msg.Type,
MessageID: msg.ID,
From: msg.From,
To: msg.To,
FileName: msg.FileName,
BodySize: len(msg.Body),
}
}
func (c *UDPConn) logUDPTXTimestampDebugRecord(record TXTimestampDebugRecord) {
if c.txTimestampDebugLogger == nil {
return
}
_ = c.txTimestampDebugLogger.LogTXTimestampDebugRecord(record)
}
// udpAddrToSockaddr 将 net.UDPAddr 转换为 syscall.Sockaddr。
func udpAddrToSockaddr(addr *net.UDPAddr) syscall.Sockaddr {
if ip4 := addr.IP.To4(); ip4 != nil {
sa := &syscall.SockaddrInet4{Port: addr.Port}
copy(sa.Addr[:], ip4)
return sa
}
if ip6 := addr.IP.To16(); ip6 != nil {
sa := &syscall.SockaddrInet6{Port: addr.Port}
copy(sa.Addr[:], ip6)
return sa
}
return nil
}
// sockaddrToUDPAddr 将 syscall.Sockaddr 转换为 net.UDPAddr。
func sockaddrToUDPAddr(sa syscall.Sockaddr) *net.UDPAddr {
switch addr := sa.(type) {
case *syscall.SockaddrInet4:
return &net.UDPAddr{
IP: net.IP(addr.Addr[:]),
Port: addr.Port,
}
case *syscall.SockaddrInet6:
return &net.UDPAddr{
IP: net.IP(addr.Addr[:]),
Port: addr.Port,
Zone: zoneToString(addr.ZoneId),
}
default:
return nil
}
}
func zoneToString(zone uint32) string {
if zone == 0 {
return ""
}
iface, err := net.InterfaceByIndex(int(zone))
if err != nil {
return ""
}
return iface.Name
}