Files
OmniSocketGo/cmd/internal/transport/tcp_linux.go

750 lines
21 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
//go:build linux
package transport
import (
"encoding/binary"
"errors"
"fmt"
"io"
"syscall"
"time"
"omnisocketgo/cmd/internal/latencylog"
"omnisocketgo/cmd/internal/protocol"
)
const (
linuxTimestampControlBufferSize = 2048 // 控制消息缓冲区。
linuxSocketWriteBufferSize = 10 * 1024 * 1024 // 请求把 socket 发送缓冲区调到 10 MiB。
linuxTXTimestampWaitTimeout = 5000 * time.Millisecond // 等待 TX 时间戳的上限。
linuxTXTimestampPollInterval = time.Millisecond // 轮询 errqueue 的间隔。
linuxDataPollInterval = time.Millisecond // 轮询普通收发的间隔。
linuxSOTimestampingNew = 0x41
linuxSCMTimestampingNew = linuxSOTimestampingNew
linuxSOEEOriginTimestamping = 4 // timestamping errqueue 事件。
linuxSCMTstampSnd = 0 // 对应 A_TX_SOFTWARE。
linuxSCMTstampSched = 1 // 对应 A_TX_SCHED。
linuxSOFTimestampingTXSoftware = 1 << 1 // 打开 TX software timestamp。
linuxSOFTimestampingRXSoftware = 1 << 3 // 打开 RX software timestamp。
linuxSOFTimestampingSoftware = 1 << 4 // software timestamp 总开关。
linuxSOFTimestampingOptID = 1 << 7 // 给时间戳关联 ID。
linuxSOFTimestampingTXSched = 1 << 8 // 打开 TX sched timestamp。
linuxSOFTimestampingOptTSONLY = 1 << 11 // 只回时间戳。
linuxSOFTimestampingOptIDTCP = 1 << 16 // 让 TCP 也带 timestamp ID。
linuxTXTimestampPhasePreSendDrain = "pre_send_drain"
linuxTXTimestampPhasePostSendCollect = "post_send_collect"
linuxTXTimestampPhasePostSelectDrain = "post_select_drain"
)
type txSendChunk struct {
SendCallIndex int
FrameOffsetStart int
FrameOffsetEnd int // 包含当前 sendmsg 成功写出的最后一个 frame 字节偏移。
BytesWritten int
ExpectedTXID uint32
}
type txTimestampEvent struct {
EventName string
TSUnixNano int64
EEInfo uint32
EEData uint32
}
type observedTXTimestampEvent struct {
Phase string
ReadIndex int
Event txTimestampEvent
}
type txTimestampSelection struct {
SelectedID uint32
Timestamps map[string]int64
HasEvent bool
}
type socketExtendedErrInfo struct {
Info uint32
Data uint32
}
// 拿到底层 fd并打开 Linux timestamping。
func (c *TCPConn) initLinuxTimestamping() error {
sysConn, ok := c.conn.(interface {
SyscallConn() (syscall.RawConn, error)
})
if !ok {
return fmt.Errorf("transport: connection does not support SyscallConn")
}
rawConn, err := sysConn.SyscallConn()
if err != nil || rawConn == nil {
if err != nil {
return fmt.Errorf("transport: get syscall conn: %w", err)
}
return fmt.Errorf("transport: missing syscall conn")
}
if err := configureLinuxSocketWriteBuffer(rawConn); err != nil {
return fmt.Errorf("transport: configure socket write buffer: %w", err)
}
//socket是否可以成功打开 timestamping 取决于内核版本和配置,尝试多个 flag 组合直到成功或遇到非 EINVAL 错误。
if err := enableLinuxTimestamping(rawConn); err != nil {
return fmt.Errorf("transport: enable linux timestamping: %w", err)
}
//成功打开 timestamping 后rawConn 就可以用来收 TX/RX 时间戳了。
c.raw = rawConn
return nil
}
// 设置 TCP缓冲区buffer size
func configureLinuxSocketWriteBuffer(rawConn syscall.RawConn) error {
var lastErr error
err := rawConn.Control(func(fd uintptr) {
lastErr = syscall.SetsockoptInt(int(fd), syscall.SOL_SOCKET, syscall.SO_SNDBUF, linuxSocketWriteBufferSize)
})
if err != nil {
return err
}
return lastErr
}
// 给 socket开权限打开TX software timestamping。
func enableLinuxTimestamping(rawConn syscall.RawConn) error {
flagCandidates := []int{ //不同linux版本可能支持不同的 flag 组合,尝试多个组合直到成功。
linuxSOFTimestampingTXSched |
linuxSOFTimestampingTXSoftware |
linuxSOFTimestampingRXSoftware |
linuxSOFTimestampingSoftware |
linuxSOFTimestampingOptID | //TCP 协议栈给每个时间戳生成一个序列号
linuxSOFTimestampingOptIDTCP |
linuxSOFTimestampingOptTSONLY,
linuxSOFTimestampingTXSched |
linuxSOFTimestampingTXSoftware |
linuxSOFTimestampingRXSoftware |
linuxSOFTimestampingSoftware |
linuxSOFTimestampingOptID |
linuxSOFTimestampingOptTSONLY,
linuxSOFTimestampingTXSched |
linuxSOFTimestampingTXSoftware |
linuxSOFTimestampingRXSoftware |
linuxSOFTimestampingSoftware |
linuxSOFTimestampingOptTSONLY,
}
var lastErr error
for _, flags := range flagCandidates { //尝试不同的 flag 组合,直到成功或遇到非 EINVAL 错误。
// 内核根据 fd 找到对应的内存结构体Socket 缓冲区)
err := rawConn.Control(func(fd uintptr) { //Control 方法保证在回调里 fd 是有效的,可以安全地调用 syscall.SetsockoptInt。
lastErr = syscall.SetsockoptInt(int(fd), syscall.SOL_SOCKET, linuxSOTimestampingNew, flags)
})
if err != nil {
return err
}
if lastErr == nil {
return nil
}
if !errors.Is(lastErr, syscall.EINVAL) {
return lastErr
}
}
return lastErr
}
// sendMessageLinux 编码消息、写完整帧,再记录 TX 时间戳。
func (c *TCPConn) sendMessageLinux(msg protocol.Message) error {
payload, err := protocol.EncodeMessage(msg)
if err != nil {
return fmt.Errorf("protocol: encode message: %w", err)
}
//编码后的消息 payload 前面加 4 字节长度,构成完整帧。
frame := make([]byte, 4+len(payload))
binary.BigEndian.PutUint32(frame[:4], uint32(len(payload)))
copy(frame[4:], payload)
readIndex := 0
c.drainPendingTXTimestampEvents(msg, nil, linuxTXTimestampPhasePreSendDrain, &readIndex)
chunks, err := c.writeFrameLinux(frame, msg)
if err != nil {
return fmt.Errorf("protocol: write frame: %w", err)
}
//记录发送延时日志
c.logTXTimestampEvents(msg, chunks, &readIndex)
return nil
}
// writeFrameLinux 用 sendmsg 写完整帧。
func (c *TCPConn) writeFrameLinux(frame []byte, msg protocol.Message) ([]txSendChunk, error) {
written := 0
sendCallIndex := 0
chunks := make([]txSendChunk, 0, 1)
for written < len(frame) {
n, err := c.sendmsgDataOnce(frame[written:])
switch {
case err == nil:
if n <= 0 {
return nil, io.ErrShortWrite
}
chunk := txSendChunk{
SendCallIndex: sendCallIndex,
FrameOffsetStart: written,
FrameOffsetEnd: written + n - 1,
BytesWritten: n,
ExpectedTXID: c.txWriteSeq + uint32(n) - 1,
}
c.txWriteSeq += uint32(n)
chunks = append(chunks, chunk)
c.logTXSendChunkDebugRecord(msg, chunk)
sendCallIndex++
written += n
case isWouldBlock(err):
time.Sleep(linuxDataPollInterval)
default:
return nil, err
}
}
return chunks, nil
}
// 把 A_TX_SCHED / A_TX_SOFTWARE 写入日志。(发送过程中)
func (c *TCPConn) logTXTimestampEvents(msg protocol.Message, chunks []txSendChunk, readIndex *int) {
timestamps := c.collectTXTimestampEvents(msg, chunks, readIndex)
if ts, ok := timestamps[latencylog.EventATXSched]; ok {
latencylog.LogMessageEventAt(c.logger, c.nodeRole, c.nodeID, latencylog.EventATXSched, ts, msg)
}
if ts, ok := timestamps[latencylog.EventATXSoftware]; ok {
latencylog.LogMessageEventAt(c.logger, c.nodeRole, c.nodeID, latencylog.EventATXSoftware, ts, msg)
}
}
// 在 errqueue 里等两类 TX 时间戳。
func (c *TCPConn) collectTXTimestampEvents(msg protocol.Message, chunks []txSendChunk, readIndex *int) map[string]int64 {
deadline := time.Now().Add(linuxTXTimestampWaitTimeout)
observed := make([]observedTXTimestampEvent, 0, 4)
finalChunk, hasFinalChunk := finalTXSendChunk(chunks)
// 轮询 errqueue优先等待本次消息最后一个 sendmsg chunk 的 sched/software 两类时间戳。
for time.Now().Before(deadline) {
event, err := c.recvTXTimestampOnce()
if err != nil {
if isWouldBlock(err) {
time.Sleep(linuxTXTimestampPollInterval)
continue
}
break
}
if event.EventName == "" || event.TSUnixNano <= 0 {
continue
}
observed = append(observed, observedTXTimestampEvent{
Phase: linuxTXTimestampPhasePostSendCollect,
ReadIndex: *readIndex,
Event: event,
})
*readIndex = *readIndex + 1
selection := selectTXTimestampEvents(observed, finalChunk.ExpectedTXID, hasFinalChunk)
if hasFinalChunk && selection.HasEvent && selection.SelectedID == finalChunk.ExpectedTXID && hasCompleteTXTimestampPair(selection.Timestamps) {
break
}
}
selection := selectTXTimestampEvents(observed, finalChunk.ExpectedTXID, hasFinalChunk)
c.logObservedTXTimestampEvents(msg, chunks, observed, selection)
c.drainPendingTXTimestampEvents(msg, chunks, linuxTXTimestampPhasePostSelectDrain, readIndex)
return selection.Timestamps
}
// recvTXTimestampOnce 从 errqueue 读一次时间戳事件。
func (c *TCPConn) recvTXTimestampOnce() (txTimestampEvent, error) {
var (
event txTimestampEvent
opErr error
)
err := c.raw.Control(func(fd uintptr) {
//设置足够大的 oob buffer 来接收控制消息,调用 recvmsg 从 errqueue 读一条消息。
oob := make([]byte, linuxTimestampControlBufferSize)
//recvmsg 的 flags 里必须带 MSG_ERRQUEUE才能从 errqueue 里读消息,非阻塞模式下如果没有消息可读会返回 EAGAIN。
_, oobn, _, _, recvErr := syscall.Recvmsg(int(fd), nil, oob, syscall.MSG_ERRQUEUE|syscall.MSG_DONTWAIT)
if recvErr != nil {
opErr = recvErr
return
}
//解析控制消息,看看是不是我们关心的 TX 时间戳事件,如果是就拿到事件名和时间戳。
event, _ = parseTXTimestampControlMessages(oob[:oobn])
})
if err != nil {
return txTimestampEvent{}, err
}
if opErr != nil {
return txTimestampEvent{}, opErr
}
return event, nil
}
// 把底层时间戳映射成日志事件名。
func parseTXTimestampControlMessages(oob []byte) (txTimestampEvent, bool) {
if len(oob) == 0 {
return txTimestampEvent{}, false
}
//解析控制消息,看看是不是我们关心的 TX 时间戳事件,如果是就拿到事件名和时间戳。
controlMessages, err := syscall.ParseSocketControlMessage(oob)
if err != nil {
return txTimestampEvent{}, false
}
return parseTXTimestampSocketControlMessages(controlMessages)
}
func parseTXTimestampSocketControlMessages(controlMessages []syscall.SocketControlMessage) (txTimestampEvent, bool) {
var (
event txTimestampEvent
hasTS bool
hasKind bool
)
for _, controlMessage := range controlMessages {
switch {
case controlMessage.Header.Level == syscall.SOL_SOCKET && controlMessage.Header.Type == linuxSCMTimestampingNew:
if ts := parseSCMTimestampingData(controlMessage.Data); ts > 0 {
event.TSUnixNano = ts
hasTS = true
}
case isSocketExtendedErr(controlMessage): //判断时间戳是否进入了errqueue
if info, ok := parseSocketExtendedErrInfo(controlMessage.Data); ok {
event.EEInfo = info.Info
event.EEData = info.Data
event.EventName = mapLinuxTXTimestampEventName(info.Info)
hasKind = true
}
}
}
if !hasTS || !hasKind || event.EventName == "" {
return txTimestampEvent{}, false
}
return event, true
}
// 判断控制消息是否来自 socket extended err。
// 内核产生的时间戳并不会混合在普通的数据流里,而是被包装成一种特殊的“错误消息”丢进 Error Queue。
func isSocketExtendedErr(controlMessage syscall.SocketControlMessage) bool {
switch {
case controlMessage.Header.Level == syscall.SOL_IP && controlMessage.Header.Type == syscall.IP_RECVERR:
return true
case controlMessage.Header.Level == syscall.SOL_IPV6 && controlMessage.Header.Type == syscall.IPV6_RECVERR:
return true
default:
return false
}
}
// 从 socket extended err 的数据里取 origin timestamping 信息。
func parseSocketExtendedErrInfo(data []byte) (socketExtendedErrInfo, bool) {
if len(data) < 16 {
return socketExtendedErrInfo{}, false
}
if data[4] != linuxSOEEOriginTimestamping {
return socketExtendedErrInfo{}, false
}
return socketExtendedErrInfo{
Info: binary.NativeEndian.Uint32(data[8:12]),
Data: binary.NativeEndian.Uint32(data[12:16]),
}, true
}
func mapLinuxTXTimestampEventName(tsKind uint32) string {
switch tsKind {
case linuxSCMTstampSched:
return latencylog.EventATXSched
case linuxSCMTstampSnd:
return latencylog.EventATXSoftware
default:
return fmt.Sprintf("TX_TIMESTAMP_KIND_%d", tsKind)
}
}
func finalTXSendChunk(chunks []txSendChunk) (txSendChunk, bool) {
if len(chunks) == 0 {
return txSendChunk{}, false
}
return chunks[len(chunks)-1], true
}
func isBusinessTXTimestampEventName(eventName string) bool {
return eventName == latencylog.EventATXSched || eventName == latencylog.EventATXSoftware
}
func hasCompleteTXTimestampPair(timestamps map[string]int64) bool {
if len(timestamps) == 0 {
return false
}
return timestamps[latencylog.EventATXSched] > 0 && timestamps[latencylog.EventATXSoftware] > 0
}
func selectTXTimestampEvents(events []observedTXTimestampEvent, expectedFinalTXID uint32, hasExpectedFinalTXID bool) txTimestampSelection {
byID := make(map[uint32]map[string]int64)
selection := txTimestampSelection{
Timestamps: make(map[string]int64, 2),
}
var (
highestID uint32
haveHighest bool
)
for _, observed := range events {
event := observed.Event
selection.HasEvent = true
if !haveHighest || event.EEData > highestID {
highestID = event.EEData
haveHighest = true
}
if !isBusinessTXTimestampEventName(event.EventName) {
continue
}
if _, ok := byID[event.EEData]; !ok {
byID[event.EEData] = make(map[string]int64, 2)
}
if existing, exists := byID[event.EEData][event.EventName]; !exists || event.TSUnixNano < existing {
byID[event.EEData][event.EventName] = event.TSUnixNano
}
}
if !selection.HasEvent {
return selection
}
switch {
case hasExpectedFinalTXID && len(byID[expectedFinalTXID]) > 0:
selection.SelectedID = expectedFinalTXID
selection.Timestamps = copyTXTimestampMap(byID[expectedFinalTXID])
case len(byID[highestID]) > 0:
selection.SelectedID = highestID
selection.Timestamps = copyTXTimestampMap(byID[highestID])
default:
selection.SelectedID = highestID
}
return selection
}
func copyTXTimestampMap(src map[string]int64) map[string]int64 {
dst := make(map[string]int64, len(src))
for key, value := range src {
dst[key] = value
}
return dst
}
func (c *TCPConn) drainPendingTXTimestampEvents(msg protocol.Message, chunks []txSendChunk, phase string, readIndex *int) {
for {
event, err := c.recvTXTimestampOnce()
if err != nil {
if isWouldBlock(err) {
return
}
return
}
if event.EventName == "" || event.TSUnixNano <= 0 {
continue
}
c.logTXErrqueueDebugRecord(msg, chunks, observedTXTimestampEvent{
Phase: phase,
ReadIndex: *readIndex,
Event: event,
}, false)
*readIndex = *readIndex + 1
}
}
func (c *TCPConn) logObservedTXTimestampEvents(msg protocol.Message, chunks []txSendChunk, observed []observedTXTimestampEvent, selection txTimestampSelection) {
if len(observed) == 0 {
return
}
for _, entry := range observed {
selected := selection.HasEvent &&
entry.Event.EEData == selection.SelectedID &&
isBusinessTXTimestampEventName(entry.Event.EventName) &&
selection.Timestamps[entry.Event.EventName] == entry.Event.TSUnixNano
c.logTXErrqueueDebugRecord(msg, chunks, entry, selected)
}
}
func (c *TCPConn) logTXSendChunkDebugRecord(msg protocol.Message, chunk txSendChunk) {
if c.txTimestampDebugLogger == nil {
return
}
sendCallIndex := chunk.SendCallIndex
frameOffsetStart := chunk.FrameOffsetStart
frameOffsetEnd := chunk.FrameOffsetEnd
bytesWritten := chunk.BytesWritten
expectedTXID := chunk.ExpectedTXID
record := c.newTXTimestampDebugRecord(msg)
record.RecordType = txTimestampDebugRecordTypeSendChunk
record.SendCallIndex = &sendCallIndex
record.FrameOffsetStart = &frameOffsetStart
record.FrameOffsetEnd = &frameOffsetEnd
record.BytesWritten = &bytesWritten
record.ExpectedTXID = &expectedTXID
c.logTXTimestampDebugRecord(record)
}
func (c *TCPConn) logTXErrqueueDebugRecord(msg protocol.Message, chunks []txSendChunk, observed observedTXTimestampEvent, selected bool) {
if c.txTimestampDebugLogger == nil {
return
}
readIndex := observed.ReadIndex
tsUnixNano := observed.Event.TSUnixNano
eeInfo := observed.Event.EEInfo
eeData := observed.Event.EEData
selectedForLatency := selected
record := c.newTXTimestampDebugRecord(msg)
record.RecordType = txTimestampDebugRecordTypeErrqueueEvent
record.Phase = observed.Phase
record.ReadIndex = &readIndex
record.EventName = observed.Event.EventName
record.TSUnixNano = &tsUnixNano
record.EEInfo = &eeInfo
record.EEData = &eeData
record.SelectedForLatency = &selectedForLatency
if matchedSendCallIndex, ok := matchTXTimestampEventToSendChunk(observed.Event.EEData, chunks); ok {
record.MatchedSendCallIndex = &matchedSendCallIndex
}
c.logTXTimestampDebugRecord(record)
}
func matchTXTimestampEventToSendChunk(txID uint32, chunks []txSendChunk) (int, bool) {
for _, chunk := range chunks {
if chunk.ExpectedTXID == txID {
return chunk.SendCallIndex, true
}
}
return 0, false
}
func (c *TCPConn) newTXTimestampDebugRecord(msg protocol.Message) TXTimestampDebugRecord {
return TXTimestampDebugRecord{
NodeRole: c.nodeRole,
NodeID: c.nodeID,
MessageType: msg.Type,
MessageID: msg.ID,
From: msg.From,
To: msg.To,
FileName: msg.FileName,
BodySize: len(msg.Body),
}
}
func (c *TCPConn) logTXTimestampDebugRecord(record TXTimestampDebugRecord) {
if c.txTimestampDebugLogger == nil {
return
}
_ = c.txTimestampDebugLogger.LogTXTimestampDebugRecord(record)
}
// 读一条完整消息,并记录 B_RX_SOFTWARE。
func (c *TCPConn) receiveMessageLinux() (protocol.Message, error) {
payload, rxTimestamp, err := c.readFrameLinux()
if err != nil {
return protocol.Message{}, fmt.Errorf("protocol: read frame: %w", err)
}
msg, err := protocol.DecodeMessage(payload)
if err != nil {
return protocol.Message{}, fmt.Errorf("protocol: decode message: %w", err)
}
if rxTimestamp > 0 {
latencylog.LogMessageEventAt(c.logger, c.nodeRole, c.nodeID, latencylog.EventBRXSoftware, rxTimestamp, msg)
}
return msg, nil
}
// readFrameLinux 先读 4 字节长度,再读整条 payload。
func (c *TCPConn) readFrameLinux() ([]byte, int64, error) {
var frameHeader [4]byte
rxTimestamp, err := c.readFullLinux(frameHeader[:])
if err != nil {
return nil, rxTimestamp, err
}
size := binary.BigEndian.Uint32(frameHeader[:])
switch {
case size == 0:
return nil, rxTimestamp, protocol.ErrInvalidFrameLength
case size > protocol.MaxFrameSize:
return nil, rxTimestamp, protocol.ErrFrameTooLarge
}
payload := make([]byte, int(size))
bodyTimestamp, err := c.readFullLinux(payload)
if rxTimestamp == 0 {
rxTimestamp = bodyTimestamp
}
if err != nil {
return nil, rxTimestamp, err
}
return payload, rxTimestamp, nil
}
// 读满 buf并保留首个 RX_SOFTWARE返回进入tcp协议栈的时间戳
func (c *TCPConn) readFullLinux(buf []byte) (int64, error) {
if len(buf) == 0 {
return 0, nil
}
var (
offset int
firstRXTime int64
)
for offset < len(buf) {
n, rxTimestamp, err := c.recvmsgLinux(buf[offset:])
if firstRXTime == 0 && rxTimestamp > 0 {
firstRXTime = rxTimestamp
}
if err != nil {
if errors.Is(err, io.EOF) && offset > 0 {
return firstRXTime, io.ErrUnexpectedEOF
}
return firstRXTime, err
}
offset += n
}
return firstRXTime, nil
}
// recvmsgLinux 用 recvmsg 同时读取数据和控制消息。
func (c *TCPConn) recvmsgLinux(buf []byte) (int, int64, error) {
for {
n, rxTimeNS, err := c.recvmsgDataOnce(buf)
switch {
case err == nil:
if n == 0 {
return 0, 0, io.EOF
}
return n, rxTimeNS, nil
case isWouldBlock(err):
time.Sleep(linuxDataPollInterval)
default:
return 0, 0, err
}
}
}
// 从控制消息里取 RX_SOFTWARE。
func parseRXTimestampControlMessages(oob []byte) int64 {
if len(oob) == 0 {
return 0
}
controlMessages, err := syscall.ParseSocketControlMessage(oob)
if err != nil {
return 0
}
for _, controlMessage := range controlMessages {
if controlMessage.Header.Level != syscall.SOL_SOCKET || controlMessage.Header.Type != linuxSCMTimestampingNew {
continue
}
if ts := parseSCMTimestampingData(controlMessage.Data); ts > 0 {
return ts
}
}
return 0
}
// 取第一个非零 timespec。
func parseSCMTimestampingData(data []byte) int64 {
const timespec64Size = 16
for offset := 0; offset+timespec64Size <= len(data); offset += timespec64Size {
sec := int64(binary.NativeEndian.Uint64(data[offset : offset+8]))
nsec := int64(binary.NativeEndian.Uint64(data[offset+8 : offset+16]))
if sec == 0 && nsec == 0 {
continue
}
return sec*int64(time.Second) + nsec
}
return 0
}
// 判断错误是否是 EAGAIN 或 EWOULDBLOCK。
func isWouldBlock(err error) bool {
return errors.Is(err, syscall.EAGAIN) || errors.Is(err, syscall.EWOULDBLOCK)
}
func (c *TCPConn) sendmsgDataOnce(buf []byte) (int, error) {
var (
n int
opErr error
)
err := c.raw.Control(func(fd uintptr) {
n, opErr = syscall.SendmsgN(int(fd), buf, nil, nil, 0)
})
if err != nil {
return 0, err
}
return n, opErr
}
func (c *TCPConn) recvmsgDataOnce(buf []byte) (int, int64, error) {
var (
n int
rxTimeNS int64
opErr error
)
err := c.raw.Control(func(fd uintptr) {
oob := make([]byte, linuxTimestampControlBufferSize)
readN, oobN, _, _, recvErr := syscall.Recvmsg(int(fd), buf, oob, 0)
if recvErr != nil {
opErr = recvErr
return
}
n = readN
rxTimeNS = parseRXTimestampControlMessages(oob[:oobN])
})
if err != nil {
return 0, 0, err
}
return n, rxTimeNS, opErr
}