This commit is contained in:
2026-03-24 12:05:25 +08:00
8 changed files with 819 additions and 143 deletions

1
.gitignore vendored
View File

@@ -1,2 +1,3 @@
bin/* bin/*
inbox/* inbox/*
*.jsonl

View File

@@ -16,9 +16,10 @@ import (
var dialServer = dialServerWithOptions var dialServer = dialServerWithOptions
type clientOptions struct { type clientOptions struct {
logger latencylog.Logger logger latencylog.Logger
bindIP string txTimestampDebugLogger transport.TXTimestampDebugLogger
bindDevice string bindIP string
bindDevice string
} }
// Option 用于配置 Client 的可选行为,例如时延日志。 // Option 用于配置 Client 的可选行为,例如时延日志。
@@ -31,6 +32,13 @@ func WithLogger(logger latencylog.Logger) Option {
} }
} }
// WithTXTimestampDebugLogger 为 client 注入 TX errqueue 调试日志器。
func WithTXTimestampDebugLogger(logger transport.TXTimestampDebugLogger) Option {
return func(options *clientOptions) {
options.txTimestampDebugLogger = logger
}
}
// WithBindIP 指定拨号时使用的本地源 IP。 // WithBindIP 指定拨号时使用的本地源 IP。
func WithBindIP(ip string) Option { func WithBindIP(ip string) Option {
return func(options *clientOptions) { return func(options *clientOptions) {
@@ -74,6 +82,7 @@ func Dial(serverAddr, peerID string, opts ...Option) (*Client, error) {
conn, err := transport.NewTCPConn( conn, err := transport.NewTCPConn(
rawConn, rawConn,
transport.WithLogger(options.logger, latencylog.NodeRolePeer, peerID), transport.WithLogger(options.logger, latencylog.NodeRolePeer, peerID),
transport.WithTXTimestampDebugLogger(options.txTimestampDebugLogger),
) )
if err != nil { if err != nil {
_ = rawConn.Close() _ = rawConn.Close()

View File

@@ -19,12 +19,14 @@ type TCPConn struct {
conn net.Conn conn net.Conn
raw syscall.RawConn // 连接对应的底层 syscall 句柄,用于 Linux socket timestamping 收发。 raw syscall.RawConn // 连接对应的底层 syscall 句柄,用于 Linux socket timestamping 收发。
logger latencylog.Logger logger latencylog.Logger
nodeRole string // 日志中记录的节点角色,例如 "server" 或 "peer" txTimestampDebugLogger TXTimestampDebugLogger
nodeID string // 日志中记录的节点 ID,例如 peer 的 ID 或 server 的 "hub" nodeRole string // 日志中记录的节点角色,例如 "server" 或 "peer"
writeMu sync.Mutex // 保护 Send 方法的互斥锁,确保同一时刻只有一条完整协议消息被写入连接,防止多条消息字节交叉 nodeID string // 日志中记录的节点 ID例如 peer 的 ID 或 server 的 "hub"
closeOnce sync.Once // 保护 Close 方法的 sync.Once确保连接只被关闭一次 writeMu sync.Mutex // 保护 Send 方法的互斥锁,确保同一时刻只有一条完整协议消息被写入连接,防止多条消息字节交叉
closeErr error // 连接关闭时的错误,如果连接成功关闭则为 nil重复调用 Close 时会返回同样的错误 txWriteSeq uint32 // Linux TX timestamp OPT_ID_TCP 的本地镜像,按成功写出的字节推进。
closeOnce sync.Once // 保护 Close 方法的 sync.Once确保连接只被关闭一次
closeErr error // 连接关闭时的错误,如果连接成功关闭则为 nil重复调用 Close 时会返回同样的错误
} }
// Option 用于为 TCPConn 注入可选行为,例如时延日志。 // Option 用于为 TCPConn 注入可选行为,例如时延日志。
@@ -39,6 +41,13 @@ func WithLogger(logger latencylog.Logger, nodeRole, nodeID string) Option {
} }
} }
// WithTXTimestampDebugLogger 为连接注入可选的 TX errqueue 调试日志器。
func WithTXTimestampDebugLogger(logger TXTimestampDebugLogger) Option {
return func(conn *TCPConn) {
conn.txTimestampDebugLogger = logger
}
}
// NewTCPConn 用已有的 net.Conn 创建 transport 连接封装。 // NewTCPConn 用已有的 net.Conn 创建 transport 连接封装。
func NewTCPConn(conn net.Conn, opts ...Option) (*TCPConn, error) { func NewTCPConn(conn net.Conn, opts ...Option) (*TCPConn, error) {
tcpConn := &TCPConn{ tcpConn := &TCPConn{

View File

@@ -7,7 +7,6 @@ import (
"errors" "errors"
"fmt" "fmt"
"io" "io"
"strings"
"syscall" "syscall"
"time" "time"
@@ -19,6 +18,7 @@ const (
linuxTimestampControlBufferSize = 256 // 控制消息缓冲区。 linuxTimestampControlBufferSize = 256 // 控制消息缓冲区。
linuxTXTimestampWaitTimeout = 250 * time.Millisecond // 等待 TX 时间戳的上限。 linuxTXTimestampWaitTimeout = 250 * time.Millisecond // 等待 TX 时间戳的上限。
linuxTXTimestampPollInterval = time.Millisecond // 轮询 errqueue 的间隔。 linuxTXTimestampPollInterval = time.Millisecond // 轮询 errqueue 的间隔。
linuxDataPollInterval = time.Millisecond // 轮询普通收发的间隔。
linuxSOTimestampingNew = 0x41 linuxSOTimestampingNew = 0x41
linuxSCMTimestampingNew = linuxSOTimestampingNew linuxSCMTimestampingNew = linuxSOTimestampingNew
@@ -33,8 +33,44 @@ const (
linuxSOFTimestampingTXSched = 1 << 8 // 打开 TX sched timestamp。 linuxSOFTimestampingTXSched = 1 << 8 // 打开 TX sched timestamp。
linuxSOFTimestampingOptTSONLY = 1 << 11 // 只回时间戳。 linuxSOFTimestampingOptTSONLY = 1 << 11 // 只回时间戳。
linuxSOFTimestampingOptIDTCP = 1 << 16 // 让 TCP 也带 timestamp ID。 linuxSOFTimestampingOptIDTCP = 1 << 16 // 让 TCP 也带 timestamp ID。
linuxTXTimestampPhasePreSendDrain = "pre_send_drain"
linuxTXTimestampPhasePostSendCollect = "post_send_collect"
linuxTXTimestampPhasePostSelectDrain = "post_select_drain"
) )
type txSendChunk struct {
SendCallIndex int
FrameOffsetStart int
FrameOffsetEnd int // 包含当前 sendmsg 成功写出的最后一个 frame 字节偏移。
BytesWritten int
ExpectedTXID uint32
}
type txTimestampEvent struct {
EventName string
TSUnixNano int64
EEInfo uint32
EEData uint32
}
type observedTXTimestampEvent struct {
Phase string
ReadIndex int
Event txTimestampEvent
}
type txTimestampSelection struct {
SelectedID uint32
Timestamps map[string]int64
HasEvent bool
}
type socketExtendedErrInfo struct {
Info uint32
Data uint32
}
// 拿到底层 fd并打开 Linux timestamping。 // 拿到底层 fd并打开 Linux timestamping。
func (c *TCPConn) initLinuxTimestamping() error { func (c *TCPConn) initLinuxTimestamping() error {
sysConn, ok := c.conn.(interface { sysConn, ok := c.conn.(interface {
@@ -116,62 +152,56 @@ func (c *TCPConn) sendMessageLinux(msg protocol.Message) error {
binary.BigEndian.PutUint32(frame[:4], uint32(len(payload))) binary.BigEndian.PutUint32(frame[:4], uint32(len(payload)))
copy(frame[4:], payload) copy(frame[4:], payload)
if err := c.writeFrameLinux(frame); err != nil { readIndex := 0
c.drainPendingTXTimestampEvents(msg, nil, linuxTXTimestampPhasePreSendDrain, &readIndex)
chunks, err := c.writeFrameLinux(frame, msg)
if err != nil {
return fmt.Errorf("protocol: write frame: %w", err) return fmt.Errorf("protocol: write frame: %w", err)
} }
//记录发送延时日志 //记录发送延时日志
c.logTXTimestampEvents(msg) c.logTXTimestampEvents(msg, chunks, &readIndex)
return nil return nil
} }
// writeFrameLinux 用 sendmsg 写完整帧。 // writeFrameLinux 用 sendmsg 写完整帧。
func (c *TCPConn) writeFrameLinux(frame []byte) error { func (c *TCPConn) writeFrameLinux(frame []byte, msg protocol.Message) ([]txSendChunk, error) {
written := 0 written := 0
var opErr error sendCallIndex := 0
chunks := make([]txSendChunk, 0, 1)
err := c.raw.Write(func(fd uintptr) bool { for written < len(frame) {
if written >= len(frame) { n, err := c.sendmsgDataOnce(frame[written:])
return true
}
n, sendErr := syscall.SendmsgN(int(fd), frame[written:], nil, nil, 0)
switch { switch {
case sendErr == nil: case err == nil:
if n <= 0 { if n <= 0 {
opErr = io.ErrShortWrite return nil, io.ErrShortWrite
return true
} }
chunk := txSendChunk{
SendCallIndex: sendCallIndex,
FrameOffsetStart: written,
FrameOffsetEnd: written + n - 1,
BytesWritten: n,
ExpectedTXID: c.txWriteSeq + uint32(n) - 1,
}
c.txWriteSeq += uint32(n)
chunks = append(chunks, chunk)
c.logTXSendChunkDebugRecord(msg, chunk)
sendCallIndex++
written += n written += n
return written >= len(frame) case isWouldBlock(err):
case errors.Is(sendErr, syscall.EAGAIN), errors.Is(sendErr, syscall.EWOULDBLOCK): time.Sleep(linuxDataPollInterval)
return false
default: default:
opErr = sendErr return nil, err
return true
} }
})
if err != nil {
if isRawConnNotPollable(err) {
return writeFullFallback(c.conn, frame[written:])
}
return err
}
if opErr != nil {
if isRawConnNotPollable(opErr) {
return writeFullFallback(c.conn, frame[written:])
}
return opErr
}
if written != len(frame) {
return io.ErrShortWrite
} }
return nil return chunks, nil
} }
// 把 A_TX_SCHED / A_TX_SOFTWARE 写入日志。(发送过程中) // 把 A_TX_SCHED / A_TX_SOFTWARE 写入日志。(发送过程中)
func (c *TCPConn) logTXTimestampEvents(msg protocol.Message) { func (c *TCPConn) logTXTimestampEvents(msg protocol.Message, chunks []txSendChunk, readIndex *int) {
timestamps := c.collectTXTimestampEvents() timestamps := c.collectTXTimestampEvents(msg, chunks, readIndex)
if ts, ok := timestamps[latencylog.EventATXSched]; ok { if ts, ok := timestamps[latencylog.EventATXSched]; ok {
latencylog.LogMessageEventAt(c.logger, c.nodeRole, c.nodeID, latencylog.EventATXSched, ts, msg) latencylog.LogMessageEventAt(c.logger, c.nodeRole, c.nodeID, latencylog.EventATXSched, ts, msg)
@@ -182,14 +212,14 @@ func (c *TCPConn) logTXTimestampEvents(msg protocol.Message) {
} }
// 在 errqueue 里等两类 TX 时间戳。 // 在 errqueue 里等两类 TX 时间戳。
func (c *TCPConn) collectTXTimestampEvents() map[string]int64 { func (c *TCPConn) collectTXTimestampEvents(msg protocol.Message, chunks []txSendChunk, readIndex *int) map[string]int64 {
timestamps := make(map[string]int64, 2)
//设置合理等待上限
deadline := time.Now().Add(linuxTXTimestampWaitTimeout) deadline := time.Now().Add(linuxTXTimestampWaitTimeout)
observed := make([]observedTXTimestampEvent, 0, 4)
finalChunk, hasFinalChunk := finalTXSendChunk(chunks)
//轮询 errqueue 直到拿到两类时间戳,或超时,或遇到非 EAGAIN 错误 // 轮询 errqueue,优先等待本次消息最后一个 sendmsg chunk 的 sched/software 两类时间戳
for len(timestamps) < 2 && time.Now().Before(deadline) { for time.Now().Before(deadline) {
eventName, ts, err := c.recvTXTimestampOnce() event, err := c.recvTXTimestampOnce()
if err != nil { if err != nil {
if isWouldBlock(err) { if isWouldBlock(err) {
time.Sleep(linuxTXTimestampPollInterval) time.Sleep(linuxTXTimestampPollInterval)
@@ -197,23 +227,33 @@ func (c *TCPConn) collectTXTimestampEvents() map[string]int64 {
} }
break break
} }
if eventName == "" || ts <= 0 { if event.EventName == "" || event.TSUnixNano <= 0 {
continue continue
} }
if _, exists := timestamps[eventName]; !exists { observed = append(observed, observedTXTimestampEvent{
timestamps[eventName] = ts Phase: linuxTXTimestampPhasePostSendCollect,
ReadIndex: *readIndex,
Event: event,
})
*readIndex = *readIndex + 1
selection := selectTXTimestampEvents(observed, finalChunk.ExpectedTXID, hasFinalChunk)
if hasFinalChunk && selection.HasEvent && selection.SelectedID == finalChunk.ExpectedTXID && hasCompleteTXTimestampPair(selection.Timestamps) {
break
} }
} }
return timestamps selection := selectTXTimestampEvents(observed, finalChunk.ExpectedTXID, hasFinalChunk)
c.logObservedTXTimestampEvents(msg, chunks, observed, selection)
c.drainPendingTXTimestampEvents(msg, chunks, linuxTXTimestampPhasePostSelectDrain, readIndex)
return selection.Timestamps
} }
// recvTXTimestampOnce 从 errqueue 读一次时间戳事件。 // recvTXTimestampOnce 从 errqueue 读一次时间戳事件。
func (c *TCPConn) recvTXTimestampOnce() (string, int64, error) { func (c *TCPConn) recvTXTimestampOnce() (txTimestampEvent, error) {
var ( var (
eventName string // 事件名,例如 A_TX_SCHED 或 A_TX_SOFTWARE。 event txTimestampEvent
tsUnixNS int64 // 时间戳的 UnixNano 表示。 opErr error
opErr error
) )
err := c.raw.Control(func(fd uintptr) { err := c.raw.Control(func(fd uintptr) {
@@ -226,63 +266,61 @@ func (c *TCPConn) recvTXTimestampOnce() (string, int64, error) {
return return
} }
//解析控制消息,看看是不是我们关心的 TX 时间戳事件,如果是就拿到事件名和时间戳。 //解析控制消息,看看是不是我们关心的 TX 时间戳事件,如果是就拿到事件名和时间戳。
eventName, tsUnixNS = parseTXTimestampControlMessages(oob[:oobn]) event, _ = parseTXTimestampControlMessages(oob[:oobn])
}) })
if err != nil { if err != nil {
return "", 0, err return txTimestampEvent{}, err
} }
if opErr != nil { if opErr != nil {
return "", 0, opErr return txTimestampEvent{}, opErr
} }
return eventName, tsUnixNS, nil //如果成功拿到时间戳事件eventName 会是 A_TX_SCHED 或 A_TX_SOFTWARE 之一tsUnixNS 是对应的时间戳如果没有拿到事件或时间戳无效eventName 会是空字符串tsUnixNS 会是 0。 return event, nil
} }
// 把底层时间戳映射成日志事件名。 // 把底层时间戳映射成日志事件名。
func parseTXTimestampControlMessages(oob []byte) (string, int64) { func parseTXTimestampControlMessages(oob []byte) (txTimestampEvent, bool) {
if len(oob) == 0 { if len(oob) == 0 {
return "", 0 return txTimestampEvent{}, false
} }
//解析控制消息,看看是不是我们关心的 TX 时间戳事件,如果是就拿到事件名和时间戳。 //解析控制消息,看看是不是我们关心的 TX 时间戳事件,如果是就拿到事件名和时间戳。
controlMessages, err := syscall.ParseSocketControlMessage(oob) controlMessages, err := syscall.ParseSocketControlMessage(oob)
if err != nil { if err != nil {
return "", 0 return txTimestampEvent{}, false
} }
return parseTXTimestampSocketControlMessages(controlMessages)
}
func parseTXTimestampSocketControlMessages(controlMessages []syscall.SocketControlMessage) (txTimestampEvent, bool) {
var ( var (
tsUnixNS int64 //时间戳的 UnixNano 表示。 event txTimestampEvent
tsKind uint32 //extended err里告诉我们这个时间戳是 sched 还是 software。 hasTS bool
hasTS bool // 是否拿到时间戳了。 hasKind bool
hasKind bool // 是否拿到时间戳类型了。
) )
//一个 recvmsg 可能会收到多个控制消息,循环找我们关心的时间戳事件,拿到时间戳和事件类型。
for _, controlMessage := range controlMessages { for _, controlMessage := range controlMessages {
switch { switch {
case controlMessage.Header.Level == syscall.SOL_SOCKET && controlMessage.Header.Type == linuxSCMTimestampingNew: case controlMessage.Header.Level == syscall.SOL_SOCKET && controlMessage.Header.Type == linuxSCMTimestampingNew:
if ts := parseSCMTimestampingData(controlMessage.Data); ts > 0 { if ts := parseSCMTimestampingData(controlMessage.Data); ts > 0 {
tsUnixNS = ts event.TSUnixNano = ts
hasTS = true hasTS = true
} }
case isSocketExtendedErr(controlMessage): //判断时间戳是否进入了errqueue case isSocketExtendedErr(controlMessage): //判断时间戳是否进入了errqueue
if info, ok := parseSocketExtendedErrInfo(controlMessage.Data); ok { if info, ok := parseSocketExtendedErrInfo(controlMessage.Data); ok {
tsKind = info //时间戳类型被内核放在 extended err 的附加信息里,解析出来。 event.EEInfo = info.Info
event.EEData = info.Data
event.EventName = mapLinuxTXTimestampEventName(info.Info)
hasKind = true hasKind = true
} }
} }
} }
if !hasTS || !hasKind { if !hasTS || !hasKind || event.EventName == "" {
return "", 0 return txTimestampEvent{}, false
} }
switch tsKind { //把内核的时间戳类型映射成日志事件名。(记录时只关心 sched 和 software 两类时间戳) return event, true
case linuxSCMTstampSched:
return latencylog.EventATXSched, tsUnixNS
case linuxSCMTstampSnd:
return latencylog.EventATXSoftware, tsUnixNS
default:
return "", 0
}
} }
// 判断控制消息是否来自 socket extended err。 // 判断控制消息是否来自 socket extended err。
@@ -299,15 +337,217 @@ func isSocketExtendedErr(controlMessage syscall.SocketControlMessage) bool {
} }
// 从 socket extended err 的数据里取 origin timestamping 信息。 // 从 socket extended err 的数据里取 origin timestamping 信息。
func parseSocketExtendedErrInfo(data []byte) (uint32, bool) { func parseSocketExtendedErrInfo(data []byte) (socketExtendedErrInfo, bool) {
if len(data) < 16 { if len(data) < 16 {
return 0, false return socketExtendedErrInfo{}, false
} }
if data[4] != linuxSOEEOriginTimestamping { if data[4] != linuxSOEEOriginTimestamping {
return 0, false return socketExtendedErrInfo{}, false
} }
return binary.NativeEndian.Uint32(data[8:12]), true return socketExtendedErrInfo{
Info: binary.NativeEndian.Uint32(data[8:12]),
Data: binary.NativeEndian.Uint32(data[12:16]),
}, true
}
func mapLinuxTXTimestampEventName(tsKind uint32) string {
switch tsKind {
case linuxSCMTstampSched:
return latencylog.EventATXSched
case linuxSCMTstampSnd:
return latencylog.EventATXSoftware
default:
return fmt.Sprintf("TX_TIMESTAMP_KIND_%d", tsKind)
}
}
func finalTXSendChunk(chunks []txSendChunk) (txSendChunk, bool) {
if len(chunks) == 0 {
return txSendChunk{}, false
}
return chunks[len(chunks)-1], true
}
func isBusinessTXTimestampEventName(eventName string) bool {
return eventName == latencylog.EventATXSched || eventName == latencylog.EventATXSoftware
}
func hasCompleteTXTimestampPair(timestamps map[string]int64) bool {
if len(timestamps) == 0 {
return false
}
return timestamps[latencylog.EventATXSched] > 0 && timestamps[latencylog.EventATXSoftware] > 0
}
func selectTXTimestampEvents(events []observedTXTimestampEvent, expectedFinalTXID uint32, hasExpectedFinalTXID bool) txTimestampSelection {
byID := make(map[uint32]map[string]int64)
selection := txTimestampSelection{
Timestamps: make(map[string]int64, 2),
}
var (
highestID uint32
haveHighest bool
)
for _, observed := range events {
event := observed.Event
selection.HasEvent = true
if !haveHighest || event.EEData > highestID {
highestID = event.EEData
haveHighest = true
}
if !isBusinessTXTimestampEventName(event.EventName) {
continue
}
if _, ok := byID[event.EEData]; !ok {
byID[event.EEData] = make(map[string]int64, 2)
}
if existing, exists := byID[event.EEData][event.EventName]; !exists || event.TSUnixNano < existing {
byID[event.EEData][event.EventName] = event.TSUnixNano
}
}
if !selection.HasEvent {
return selection
}
switch {
case hasExpectedFinalTXID && len(byID[expectedFinalTXID]) > 0:
selection.SelectedID = expectedFinalTXID
selection.Timestamps = copyTXTimestampMap(byID[expectedFinalTXID])
case len(byID[highestID]) > 0:
selection.SelectedID = highestID
selection.Timestamps = copyTXTimestampMap(byID[highestID])
default:
selection.SelectedID = highestID
}
return selection
}
func copyTXTimestampMap(src map[string]int64) map[string]int64 {
dst := make(map[string]int64, len(src))
for key, value := range src {
dst[key] = value
}
return dst
}
func (c *TCPConn) drainPendingTXTimestampEvents(msg protocol.Message, chunks []txSendChunk, phase string, readIndex *int) {
for {
event, err := c.recvTXTimestampOnce()
if err != nil {
if isWouldBlock(err) {
return
}
return
}
if event.EventName == "" || event.TSUnixNano <= 0 {
continue
}
c.logTXErrqueueDebugRecord(msg, chunks, observedTXTimestampEvent{
Phase: phase,
ReadIndex: *readIndex,
Event: event,
}, false)
*readIndex = *readIndex + 1
}
}
func (c *TCPConn) logObservedTXTimestampEvents(msg protocol.Message, chunks []txSendChunk, observed []observedTXTimestampEvent, selection txTimestampSelection) {
if len(observed) == 0 {
return
}
for _, entry := range observed {
selected := selection.HasEvent &&
entry.Event.EEData == selection.SelectedID &&
isBusinessTXTimestampEventName(entry.Event.EventName) &&
selection.Timestamps[entry.Event.EventName] == entry.Event.TSUnixNano
c.logTXErrqueueDebugRecord(msg, chunks, entry, selected)
}
}
func (c *TCPConn) logTXSendChunkDebugRecord(msg protocol.Message, chunk txSendChunk) {
if c.txTimestampDebugLogger == nil {
return
}
sendCallIndex := chunk.SendCallIndex
frameOffsetStart := chunk.FrameOffsetStart
frameOffsetEnd := chunk.FrameOffsetEnd
bytesWritten := chunk.BytesWritten
expectedTXID := chunk.ExpectedTXID
record := c.newTXTimestampDebugRecord(msg)
record.RecordType = txTimestampDebugRecordTypeSendChunk
record.SendCallIndex = &sendCallIndex
record.FrameOffsetStart = &frameOffsetStart
record.FrameOffsetEnd = &frameOffsetEnd
record.BytesWritten = &bytesWritten
record.ExpectedTXID = &expectedTXID
c.logTXTimestampDebugRecord(record)
}
func (c *TCPConn) logTXErrqueueDebugRecord(msg protocol.Message, chunks []txSendChunk, observed observedTXTimestampEvent, selected bool) {
if c.txTimestampDebugLogger == nil {
return
}
readIndex := observed.ReadIndex
tsUnixNano := observed.Event.TSUnixNano
eeInfo := observed.Event.EEInfo
eeData := observed.Event.EEData
selectedForLatency := selected
record := c.newTXTimestampDebugRecord(msg)
record.RecordType = txTimestampDebugRecordTypeErrqueueEvent
record.Phase = observed.Phase
record.ReadIndex = &readIndex
record.EventName = observed.Event.EventName
record.TSUnixNano = &tsUnixNano
record.EEInfo = &eeInfo
record.EEData = &eeData
record.SelectedForLatency = &selectedForLatency
if matchedSendCallIndex, ok := matchTXTimestampEventToSendChunk(observed.Event.EEData, chunks); ok {
record.MatchedSendCallIndex = &matchedSendCallIndex
}
c.logTXTimestampDebugRecord(record)
}
func matchTXTimestampEventToSendChunk(txID uint32, chunks []txSendChunk) (int, bool) {
for _, chunk := range chunks {
if chunk.ExpectedTXID == txID {
return chunk.SendCallIndex, true
}
}
return 0, false
}
func (c *TCPConn) newTXTimestampDebugRecord(msg protocol.Message) TXTimestampDebugRecord {
return TXTimestampDebugRecord{
NodeRole: c.nodeRole,
NodeID: c.nodeID,
MessageType: msg.Type,
MessageID: msg.ID,
From: msg.From,
To: msg.To,
FileName: msg.FileName,
BodySize: len(msg.Body),
}
}
func (c *TCPConn) logTXTimestampDebugRecord(record TXTimestampDebugRecord) {
if c.txTimestampDebugLogger == nil {
return
}
_ = c.txTimestampDebugLogger.LogTXTimestampDebugRecord(record)
} }
// 读一条完整消息,并记录 B_RX_SOFTWARE。 // 读一条完整消息,并记录 B_RX_SOFTWARE。
@@ -374,15 +614,6 @@ func (c *TCPConn) readFullLinux(buf []byte) (int64, error) {
firstRXTime = rxTimestamp firstRXTime = rxTimestamp
} }
if err != nil { if err != nil {
if isRawConnNotPollable(err) {
if fallbackErr := readFullFallback(c.conn, buf[offset:]); fallbackErr != nil {
if errors.Is(fallbackErr, io.EOF) && offset > 0 {
return firstRXTime, io.ErrUnexpectedEOF
}
return firstRXTime, fallbackErr
}
return firstRXTime, nil
}
if errors.Is(err, io.EOF) && offset > 0 { if errors.Is(err, io.EOF) && offset > 0 {
return firstRXTime, io.ErrUnexpectedEOF return firstRXTime, io.ErrUnexpectedEOF
} }
@@ -397,39 +628,20 @@ func (c *TCPConn) readFullLinux(buf []byte) (int64, error) {
// recvmsgLinux 用 recvmsg 同时读取数据和控制消息。 // recvmsgLinux 用 recvmsg 同时读取数据和控制消息。
func (c *TCPConn) recvmsgLinux(buf []byte) (int, int64, error) { func (c *TCPConn) recvmsgLinux(buf []byte) (int, int64, error) {
var ( for {
n int n, rxTimeNS, err := c.recvmsgDataOnce(buf)
rxTimeNS int64
opErr error
)
err := c.raw.Read(func(fd uintptr) bool {
oob := make([]byte, linuxTimestampControlBufferSize)
readN, oobN, _, _, recvErr := syscall.Recvmsg(int(fd), buf, oob, 0)
switch { switch {
case recvErr == nil: case err == nil:
if readN == 0 { if n == 0 {
opErr = io.EOF return 0, 0, io.EOF
return true
} }
n = readN return n, rxTimeNS, nil
rxTimeNS = parseRXTimestampControlMessages(oob[:oobN]) case isWouldBlock(err):
return true time.Sleep(linuxDataPollInterval)
case errors.Is(recvErr, syscall.EAGAIN), errors.Is(recvErr, syscall.EWOULDBLOCK):
return false
default: default:
opErr = recvErr return 0, 0, err
return true
} }
})
if err != nil {
return 0, 0, err
} }
if opErr != nil {
return 0, 0, opErr
}
return n, rxTimeNS, nil
} }
// 从控制消息里取 RX_SOFTWARE。 // 从控制消息里取 RX_SOFTWARE。
@@ -477,26 +689,42 @@ func isWouldBlock(err error) bool {
return errors.Is(err, syscall.EAGAIN) || errors.Is(err, syscall.EWOULDBLOCK) return errors.Is(err, syscall.EAGAIN) || errors.Is(err, syscall.EWOULDBLOCK)
} }
func isRawConnNotPollable(err error) bool { func (c *TCPConn) sendmsgDataOnce(buf []byte) (int, error) {
return err != nil && strings.Contains(err.Error(), "not pollable") var (
} n int
opErr error
)
func writeFullFallback(w io.Writer, buf []byte) error { err := c.raw.Control(func(fd uintptr) {
for len(buf) > 0 { n, opErr = syscall.SendmsgN(int(fd), buf, nil, nil, 0)
n, err := w.Write(buf) })
if err != nil { if err != nil {
return err return 0, err
}
if n <= 0 {
return io.ErrShortWrite
}
buf = buf[n:]
} }
return nil return n, opErr
} }
func readFullFallback(r io.Reader, buf []byte) error { func (c *TCPConn) recvmsgDataOnce(buf []byte) (int, int64, error) {
_, err := io.ReadFull(r, buf) var (
return err n int
rxTimeNS int64
opErr error
)
err := c.raw.Control(func(fd uintptr) {
oob := make([]byte, linuxTimestampControlBufferSize)
readN, oobN, _, _, recvErr := syscall.Recvmsg(int(fd), buf, oob, 0)
if recvErr != nil {
opErr = recvErr
return
}
n = readN
rxTimeNS = parseRXTimestampControlMessages(oob[:oobN])
})
if err != nil {
return 0, 0, err
}
return n, rxTimeNS, opErr
} }

View File

@@ -3,8 +3,10 @@
package transport package transport
import ( import (
"encoding/binary"
"net" "net"
"reflect" "reflect"
"syscall"
"testing" "testing"
"omnisocketgo/cmd/internal/latencylog" "omnisocketgo/cmd/internal/latencylog"
@@ -87,6 +89,235 @@ func TestLinuxTimestampingRecordsKernelEvents(t *testing.T) {
} }
} }
func TestParseTXTimestampSocketControlMessagesExtractsEventKindAndID(t *testing.T) {
tests := []struct {
name string
kind uint32
wantEvent string
}{
{
name: "sched",
kind: linuxSCMTstampSched,
wantEvent: latencylog.EventATXSched,
},
{
name: "software",
kind: linuxSCMTstampSnd,
wantEvent: latencylog.EventATXSoftware,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
controlMessages := []syscall.SocketControlMessage{
makeTimestampSocketControlMessage(1_700_000_000_123_456_789),
makeSocketExtendedErrControlMessage(tt.kind, 42),
}
event, ok := parseTXTimestampSocketControlMessages(controlMessages)
if !ok {
t.Fatal("parseTXTimestampSocketControlMessages() ok = false, want true")
}
if event.EventName != tt.wantEvent {
t.Fatalf("event name = %q, want %q", event.EventName, tt.wantEvent)
}
if event.TSUnixNano != 1_700_000_000_123_456_789 {
t.Fatalf("timestamp = %d, want %d", event.TSUnixNano, int64(1_700_000_000_123_456_789))
}
if event.EEInfo != tt.kind {
t.Fatalf("ee_info = %d, want %d", event.EEInfo, tt.kind)
}
if event.EEData != 42 {
t.Fatalf("ee_data = %d, want 42", event.EEData)
}
})
}
}
func TestSelectTXTimestampEventsPrefersExactFinalChunkID(t *testing.T) {
events := []observedTXTimestampEvent{
{Event: txTimestampEvent{EventName: latencylog.EventATXSched, TSUnixNano: 100, EEData: 7}},
{Event: txTimestampEvent{EventName: latencylog.EventATXSoftware, TSUnixNano: 110, EEData: 7}},
{Event: txTimestampEvent{EventName: latencylog.EventATXSched, TSUnixNano: 200, EEData: 9}},
{Event: txTimestampEvent{EventName: latencylog.EventATXSoftware, TSUnixNano: 210, EEData: 9}},
}
selection := selectTXTimestampEvents(events, 9, true)
if !selection.HasEvent {
t.Fatal("selection.HasEvent = false, want true")
}
if selection.SelectedID != 9 {
t.Fatalf("SelectedID = %d, want 9", selection.SelectedID)
}
if got := selection.Timestamps[latencylog.EventATXSched]; got != 200 {
t.Fatalf("selected sched = %d, want 200", got)
}
if got := selection.Timestamps[latencylog.EventATXSoftware]; got != 210 {
t.Fatalf("selected software = %d, want 210", got)
}
}
func TestSelectTXTimestampEventsFallsBackToHighestObservedID(t *testing.T) {
events := []observedTXTimestampEvent{
{Event: txTimestampEvent{EventName: latencylog.EventATXSched, TSUnixNano: 100, EEData: 11}},
{Event: txTimestampEvent{EventName: latencylog.EventATXSoftware, TSUnixNano: 120, EEData: 11}},
{Event: txTimestampEvent{EventName: latencylog.EventATXSched, TSUnixNano: 200, EEData: 15}},
{Event: txTimestampEvent{EventName: latencylog.EventATXSoftware, TSUnixNano: 220, EEData: 15}},
}
selection := selectTXTimestampEvents(events, 20, true)
if !selection.HasEvent {
t.Fatal("selection.HasEvent = false, want true")
}
if selection.SelectedID != 15 {
t.Fatalf("SelectedID = %d, want 15", selection.SelectedID)
}
if got := selection.Timestamps[latencylog.EventATXSched]; got != 200 {
t.Fatalf("selected sched = %d, want 200", got)
}
if got := selection.Timestamps[latencylog.EventATXSoftware]; got != 220 {
t.Fatalf("selected software = %d, want 220", got)
}
}
func TestLinuxTimestampingDebugLoggerCapturesChunkAndErrqueueEvents(t *testing.T) {
clientConn, serverConn := newTCPPair(t)
setTCPWriteBuffer(t, clientConn, 4096)
debugLogger := &recordingTXTimestampDebugLogger{}
senderLogger := &recordingLogger{}
receiverLogger := &recordingLogger{}
sender, err := NewTCPConn(
clientConn,
WithLogger(senderLogger, latencylog.NodeRolePeer, "peer-a"),
WithTXTimestampDebugLogger(debugLogger),
)
if err != nil {
t.Fatalf("NewTCPConn(sender) error = %v", err)
}
receiver, err := NewTCPConn(
serverConn,
WithLogger(receiverLogger, latencylog.NodeRolePeer, "peer-b"),
)
if err != nil {
t.Fatalf("NewTCPConn(receiver) error = %v", err)
}
t.Cleanup(func() {
_ = sender.Close()
_ = receiver.Close()
})
body := make([]byte, 1<<20)
for i := range body {
body[i] = byte(i % 251)
}
msg := protocol.Message{
Type: protocol.MessageTypeFile,
ID: 99,
From: "peer-a",
To: "peer-b",
FileName: "payload.bin",
Body: body,
}
sendErr := make(chan error, 1)
go func() {
sendErr <- sender.Send(msg)
}()
got, err := receiver.Receive()
if err != nil {
t.Fatalf("Receive() error = %v", err)
}
if err := <-sendErr; err != nil {
t.Fatalf("Send() error = %v", err)
}
if !reflect.DeepEqual(got, msg) {
t.Fatalf("message mismatch: got %+v want %+v", got, msg)
}
assertHasEvent(t, senderLogger.Events(), latencylog.EventATXSched, msg.ID)
assertHasEvent(t, senderLogger.Events(), latencylog.EventATXSoftware, msg.ID)
assertHasEvent(t, receiverLogger.Events(), latencylog.EventBRXSoftware, msg.ID)
sendChunkRecords := debugRecordsByType(debugLogger.Records(), txTimestampDebugRecordTypeSendChunk)
errqueueRecords := debugRecordsByType(debugLogger.Records(), txTimestampDebugRecordTypeErrqueueEvent)
if len(sendChunkRecords) == 0 {
t.Fatal("send_chunk debug records = 0, want at least 1")
}
if len(errqueueRecords) == 0 {
t.Fatal("errqueue_event debug records = 0, want at least 1")
}
finalChunkRecord := sendChunkRecords[len(sendChunkRecords)-1]
if finalChunkRecord.ExpectedTXID == nil {
t.Fatal("final send_chunk expected_tx_id = nil, want non-nil")
}
finalExpectedTXID := *finalChunkRecord.ExpectedTXID
selectedRecords := selectedErrqueueRecords(errqueueRecords)
if len(selectedRecords) == 0 {
t.Fatal("selected errqueue debug records = 0, want at least 1")
}
highestObservedID := uint32(0)
haveHighestObservedID := false
haveExactFinalID := false
for _, record := range errqueueRecords {
if record.EEData == nil {
continue
}
if !haveHighestObservedID || *record.EEData > highestObservedID {
highestObservedID = *record.EEData
haveHighestObservedID = true
}
if *record.EEData == finalExpectedTXID && isBusinessTXTimestampRecord(record) {
haveExactFinalID = true
}
}
if !haveHighestObservedID {
t.Fatal("highestObservedID missing, want at least one ee_data")
}
wantSelectedID := highestObservedID
if haveExactFinalID {
wantSelectedID = finalExpectedTXID
}
for _, record := range selectedRecords {
if record.EEData == nil {
t.Fatalf("selected record missing ee_data: %+v", record)
}
if *record.EEData != wantSelectedID {
t.Fatalf("selected ee_data = %d, want %d", *record.EEData, wantSelectedID)
}
}
selectedByEventName := make(map[string]int64, len(selectedRecords))
for _, record := range selectedRecords {
if record.TSUnixNano == nil {
t.Fatalf("selected record missing timestamp: %+v", record)
}
selectedByEventName[record.EventName] = *record.TSUnixNano
}
senderEventsByName := make(map[string]int64)
for _, event := range senderLogger.Events() {
if event.MessageID != msg.ID {
continue
}
if !isBusinessTXTimestampEventName(event.Event) {
continue
}
senderEventsByName[event.Event] = event.TsUnixNano
}
for eventName, selectedTS := range selectedByEventName {
if senderEventsByName[eventName] != selectedTS {
t.Fatalf("sender latency event %s = %d, want %d from selected debug record", eventName, senderEventsByName[eventName], selectedTS)
}
}
}
func newTCPPair(t *testing.T) (net.Conn, net.Conn) { func newTCPPair(t *testing.T) (net.Conn, net.Conn) {
t.Helper() t.Helper()
@@ -138,3 +369,74 @@ func assertHasEvent(t *testing.T, events []latencylog.Event, wantEvent string, w
t.Fatalf("missing event %s for message %d in %+v", wantEvent, wantMessageID, events) t.Fatalf("missing event %s for message %d in %+v", wantEvent, wantMessageID, events)
} }
func makeTimestampSocketControlMessage(tsUnixNano int64) syscall.SocketControlMessage {
const timespec64Size = 16
data := make([]byte, timespec64Size*3)
sec := uint64(tsUnixNano / int64(1e9))
nsec := uint64(tsUnixNano % int64(1e9))
binary.NativeEndian.PutUint64(data[:8], sec)
binary.NativeEndian.PutUint64(data[8:16], nsec)
return syscall.SocketControlMessage{
Header: syscall.Cmsghdr{
Level: syscall.SOL_SOCKET,
Type: linuxSCMTimestampingNew,
},
Data: data,
}
}
func makeSocketExtendedErrControlMessage(kind, id uint32) syscall.SocketControlMessage {
data := make([]byte, 16)
data[4] = linuxSOEEOriginTimestamping
binary.NativeEndian.PutUint32(data[8:12], kind)
binary.NativeEndian.PutUint32(data[12:16], id)
return syscall.SocketControlMessage{
Header: syscall.Cmsghdr{
Level: syscall.SOL_IP,
Type: syscall.IP_RECVERR,
},
Data: data,
}
}
func setTCPWriteBuffer(t *testing.T, conn net.Conn, size int) {
t.Helper()
tcpConn, ok := conn.(*net.TCPConn)
if !ok {
t.Fatalf("conn type %T does not support SetWriteBuffer", conn)
}
if err := tcpConn.SetWriteBuffer(size); err != nil {
t.Fatalf("SetWriteBuffer(%d) error = %v", size, err)
}
}
func debugRecordsByType(records []TXTimestampDebugRecord, recordType string) []TXTimestampDebugRecord {
filtered := make([]TXTimestampDebugRecord, 0, len(records))
for _, record := range records {
if record.RecordType != recordType {
continue
}
filtered = append(filtered, record)
}
return filtered
}
func selectedErrqueueRecords(records []TXTimestampDebugRecord) []TXTimestampDebugRecord {
selected := make([]TXTimestampDebugRecord, 0, len(records))
for _, record := range records {
if record.SelectedForLatency == nil || !*record.SelectedForLatency {
continue
}
selected = append(selected, record)
}
return selected
}
func isBusinessTXTimestampRecord(record TXTimestampDebugRecord) bool {
return record.EventName == latencylog.EventATXSched || record.EventName == latencylog.EventATXSoftware
}

View File

@@ -38,6 +38,26 @@ func (failingLogger) LogEvent(latencylog.Event) error {
return errors.New("log failed") return errors.New("log failed")
} }
type recordingTXTimestampDebugLogger struct {
mu sync.Mutex
records []TXTimestampDebugRecord
}
func (l *recordingTXTimestampDebugLogger) LogTXTimestampDebugRecord(record TXTimestampDebugRecord) error {
l.mu.Lock()
defer l.mu.Unlock()
l.records = append(l.records, record)
return nil
}
func (l *recordingTXTimestampDebugLogger) Records() []TXTimestampDebugRecord {
l.mu.Lock()
defer l.mu.Unlock()
return append([]TXTimestampDebugRecord(nil), l.records...)
}
// TestSendReceiveMessage 验证 transport 可以在单条连接上正常收发 text 和 file 消息。 // TestSendReceiveMessage 验证 transport 可以在单条连接上正常收发 text 和 file 消息。
func TestSendReceiveMessage(t *testing.T) { func TestSendReceiveMessage(t *testing.T) {
tests := []struct { tests := []struct {

View File

@@ -0,0 +1,97 @@
package transport
import (
"encoding/json"
"fmt"
"os"
"path/filepath"
"sync"
"omnisocketgo/cmd/internal/protocol"
)
const (
txTimestampDebugRecordTypeSendChunk = "send_chunk"
txTimestampDebugRecordTypeErrqueueEvent = "errqueue_event"
)
// TXTimestampDebugRecord 是 TX errqueue 调试日志的一条 JSONL 记录。
type TXTimestampDebugRecord struct {
RecordType string `json:"record_type"`
NodeRole string `json:"node_role,omitempty"`
NodeID string `json:"node_id,omitempty"`
MessageType protocol.MessageType `json:"message_type"`
MessageID uint64 `json:"message_id"`
From string `json:"from"`
To string `json:"to"`
FileName string `json:"file_name,omitempty"`
BodySize int `json:"body_size"`
Phase string `json:"phase,omitempty"`
SendCallIndex *int `json:"send_call_index,omitempty"`
FrameOffsetStart *int `json:"frame_offset_start,omitempty"`
FrameOffsetEnd *int `json:"frame_offset_end,omitempty"`
BytesWritten *int `json:"bytes_written,omitempty"`
ExpectedTXID *uint32 `json:"expected_tx_id,omitempty"`
ReadIndex *int `json:"read_index,omitempty"`
EventName string `json:"event_name,omitempty"`
TSUnixNano *int64 `json:"ts_unix_nano,omitempty"`
EEInfo *uint32 `json:"ee_info,omitempty"`
EEData *uint32 `json:"ee_data,omitempty"`
MatchedSendCallIndex *int `json:"matched_send_call_index,omitempty"`
SelectedForLatency *bool `json:"selected_for_latency,omitempty"`
}
// TXTimestampDebugLogger 接收 TX errqueue 调试记录。
type TXTimestampDebugLogger interface {
LogTXTimestampDebugRecord(record TXTimestampDebugRecord) error
}
// JSONLTXTimestampDebugLogger 以 JSONL 形式追加写 TX errqueue 调试日志。
type JSONLTXTimestampDebugLogger struct {
mu sync.Mutex
closeOnce sync.Once
closeErr error
file *os.File
}
// NewJSONLTXTimestampDebugLogger 创建一个线程安全的 TX errqueue JSONL 日志器。
func NewJSONLTXTimestampDebugLogger(path string) (*JSONLTXTimestampDebugLogger, error) {
dir := filepath.Dir(path)
if err := os.MkdirAll(dir, 0o755); err != nil {
return nil, fmt.Errorf("transport: create tx timestamp debug log dir %s: %w", dir, err)
}
file, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0o644)
if err != nil {
return nil, fmt.Errorf("transport: open tx timestamp debug log %s: %w", path, err)
}
return &JSONLTXTimestampDebugLogger{file: file}, nil
}
// LogTXTimestampDebugRecord 以单行 JSON 的形式追加一条调试记录。
func (l *JSONLTXTimestampDebugLogger) LogTXTimestampDebugRecord(record TXTimestampDebugRecord) error {
line, err := json.Marshal(record)
if err != nil {
return err
}
l.mu.Lock()
defer l.mu.Unlock()
if _, err := l.file.Write(append(line, '\n')); err != nil {
return err
}
return nil
}
// Close 关闭底层文件;重复调用是安全的。
func (l *JSONLTXTimestampDebugLogger) Close() error {
l.closeOnce.Do(func() {
l.closeErr = l.file.Close()
})
return l.closeErr
}

View File

@@ -11,6 +11,7 @@ import (
"omnisocketgo/cmd/internal/latencylog" "omnisocketgo/cmd/internal/latencylog"
peerpkg "omnisocketgo/cmd/internal/peer" peerpkg "omnisocketgo/cmd/internal/peer"
"omnisocketgo/cmd/internal/protocol" "omnisocketgo/cmd/internal/protocol"
"omnisocketgo/cmd/internal/transport"
) )
func main() { func main() {
@@ -23,10 +24,11 @@ func main() {
bindDevice := flag.String("bind-device", "", "optional Linux network device used when dialing the server") bindDevice := flag.String("bind-device", "", "optional Linux network device used when dialing the server")
inboxDir := flag.String("inbox-dir", "inbox", "directory used to persist received text and file messages") inboxDir := flag.String("inbox-dir", "inbox", "directory used to persist received text and file messages")
logPath := flag.String("latency-log", "", "optional JSONL file path for latency timestamp logs") logPath := flag.String("latency-log", "", "optional JSONL file path for latency timestamp logs")
txTimestampDebugLogPath := flag.String("tx-ts-debug-log", "", "optional JSONL file path for TX errqueue debug records")
interactive := flag.Bool("interactive", true, "enable interactive REPL for repeated text/file sends on the same connection") interactive := flag.Bool("interactive", true, "enable interactive REPL for repeated text/file sends on the same connection")
flag.Parse() flag.Parse()
clientOptions := make([]peerpkg.Option, 0, 3) clientOptions := make([]peerpkg.Option, 0, 4)
if *logPath != "" { if *logPath != "" {
logger, err := latencylog.NewJSONLLogger(*logPath) logger, err := latencylog.NewJSONLLogger(*logPath)
if err != nil { if err != nil {
@@ -35,6 +37,14 @@ func main() {
defer logger.Close() defer logger.Close()
clientOptions = append(clientOptions, peerpkg.WithLogger(logger)) clientOptions = append(clientOptions, peerpkg.WithLogger(logger))
} }
if *txTimestampDebugLogPath != "" {
logger, err := transport.NewJSONLTXTimestampDebugLogger(*txTimestampDebugLogPath)
if err != nil {
log.Fatalf("create tx timestamp debug logger %s: %v", *txTimestampDebugLogPath, err)
}
defer logger.Close()
clientOptions = append(clientOptions, peerpkg.WithTXTimestampDebugLogger(logger))
}
if *bindIP != "" { if *bindIP != "" {
clientOptions = append(clientOptions, peerpkg.WithBindIP(*bindIP)) clientOptions = append(clientOptions, peerpkg.WithBindIP(*bindIP))
} }