fix: kcp 协议内部日志细节

This commit is contained in:
2026-03-25 15:09:32 +08:00
parent be013b701b
commit 665a908421
15 changed files with 1117 additions and 42 deletions

View File

@@ -3,7 +3,12 @@
"allow": [ "allow": [
"Bash(go vet:*)", "Bash(go vet:*)",
"Bash(go build:*)", "Bash(go build:*)",
"Bash(go test:*)" "Bash(go test:*)",
"Bash(xargs grep:*)",
"Bash(find /c/Users/64187/Desktop/Workspace/OmniSocketGo -type f -name *.go)",
"Bash(git status:*)",
"Bash(git fetch:*)",
"Bash(git pull:*)"
] ]
} }
} }

View File

@@ -7,6 +7,7 @@ import (
"path/filepath" "path/filepath"
"sync/atomic" "sync/atomic"
"syscall" "syscall"
"time"
"omnisocketgo/cmd/internal/latencylog" "omnisocketgo/cmd/internal/latencylog"
"omnisocketgo/cmd/internal/protocol" "omnisocketgo/cmd/internal/protocol"
@@ -16,11 +17,13 @@ import (
var dialServer = dialServerWithOptions var dialServer = dialServerWithOptions
type clientOptions struct { type clientOptions struct {
logger latencylog.Logger logger latencylog.Logger
txTimestampDebugLogger transport.TXTimestampDebugLogger txTimestampDebugLogger transport.TXTimestampDebugLogger
kcpPacketDebugLogger transport.KCPPacketDebugLogger kcpPacketDebugLogger transport.KCPPacketDebugLogger
bindIP string kcpSessionStatsLogger transport.KCPSessionStatsLogger
bindDevice string kcpSessionStatsInterval time.Duration
bindIP string
bindDevice string
} }
// Option 用于配置 Client 的可选行为,例如时延日志。 // Option 用于配置 Client 的可选行为,例如时延日志。
@@ -47,6 +50,14 @@ func WithKCPPacketDebugLogger(logger transport.KCPPacketDebugLogger) Option {
} }
} }
// WithKCPSessionStatsLogger 为 KCP 会话统计日志注入记录器与采样间隔。
func WithKCPSessionStatsLogger(logger transport.KCPSessionStatsLogger, interval time.Duration) Option {
return func(options *clientOptions) {
options.kcpSessionStatsLogger = logger
options.kcpSessionStatsInterval = interval
}
}
// WithBindIP 指定拨号时使用的本地源 IP。 // WithBindIP 指定拨号时使用的本地源 IP。
func WithBindIP(ip string) Option { func WithBindIP(ip string) Option {
return func(options *clientOptions) { return func(options *clientOptions) {

View File

@@ -47,6 +47,7 @@ func DialKCP(serverAddr, peerID string, opts ...Option) (*KCPClient, error) {
conn, err := transport.NewKCPConn( conn, err := transport.NewKCPConn(
session, session,
transport.WithKCPLogger(options.logger, latencylog.NodeRolePeer, peerID), transport.WithKCPLogger(options.logger, latencylog.NodeRolePeer, peerID),
transport.WithKCPSessionStatsLogger(options.kcpSessionStatsLogger, options.kcpSessionStatsInterval),
) )
if err != nil { if err != nil {
_ = session.Close() _ = session.Close()

View File

@@ -3,6 +3,7 @@ package server
import ( import (
"fmt" "fmt"
"sync" "sync"
"time"
kcp "github.com/xtaci/kcp-go/v5" kcp "github.com/xtaci/kcp-go/v5"
@@ -21,11 +22,21 @@ func WithKCPLogger(logger latencylog.Logger) KCPOption {
} }
} }
// WithKCPSessionStatsLogger 为 KCP hub 注入会话统计日志器。
func WithKCPSessionStatsLogger(logger transport.KCPSessionStatsLogger, interval time.Duration) KCPOption {
return func(hub *KCPHub) {
hub.sessionStatsLogger = logger
hub.sessionStatsInterval = interval
}
}
// KCPHub 管理已注册 peer 的 KCP 会话,并负责在它们之间转发消息。 // KCPHub 管理已注册 peer 的 KCP 会话,并负责在它们之间转发消息。
type KCPHub struct { type KCPHub struct {
mu sync.RWMutex mu sync.RWMutex
peers map[string]*transport.KCPConn peers map[string]*transport.KCPConn
logger latencylog.Logger logger latencylog.Logger
sessionStatsLogger transport.KCPSessionStatsLogger
sessionStatsInterval time.Duration
} }
// NewKCPHub 创建一个空的 KCP 连接中心。 // NewKCPHub 创建一个空的 KCP 连接中心。
@@ -57,6 +68,7 @@ func (h *KCPHub) ServeSession(session *kcp.UDPSession) error {
conn, err := transport.NewKCPConn( conn, err := transport.NewKCPConn(
session, session,
transport.WithKCPLogger(h.logger, latencylog.NodeRoleServer, "hub"), transport.WithKCPLogger(h.logger, latencylog.NodeRoleServer, "hub"),
transport.WithKCPSessionStatsLogger(h.sessionStatsLogger, h.sessionStatsInterval),
) )
if err != nil { if err != nil {
_ = session.Close() _ = session.Close()

View File

@@ -7,6 +7,7 @@ import (
"fmt" "fmt"
"net" "net"
"sync" "sync"
"time"
kcp "github.com/xtaci/kcp-go/v5" kcp "github.com/xtaci/kcp-go/v5"
@@ -31,6 +32,10 @@ type KCPConn struct {
nodeRole string nodeRole string
nodeID string nodeID string
sessionStatsLogger KCPSessionStatsLogger
sessionStatsInterval time.Duration
sessionStatsSampler *kcpSessionStatsSampler
writeMu sync.Mutex writeMu sync.Mutex
closeOnce sync.Once closeOnce sync.Once
closeErr error closeErr error
@@ -48,6 +53,14 @@ func WithKCPLogger(logger latencylog.Logger, nodeRole, nodeID string) KCPOption
} }
} }
// WithKCPSessionStatsLogger 为 KCP 连接注入会话级与进程级统计日志器。
func WithKCPSessionStatsLogger(logger KCPSessionStatsLogger, interval time.Duration) KCPOption {
return func(conn *KCPConn) {
conn.sessionStatsLogger = logger
conn.sessionStatsInterval = interval
}
}
// NewKCPConn 用已有的 KCP 会话创建 transport 连接封装。 // NewKCPConn 用已有的 KCP 会话创建 transport 连接封装。
func NewKCPConn(session *kcp.UDPSession, opts ...KCPOption) (*KCPConn, error) { func NewKCPConn(session *kcp.UDPSession, opts ...KCPOption) (*KCPConn, error) {
if session == nil { if session == nil {
@@ -66,6 +79,7 @@ func NewKCPConn(session *kcp.UDPSession, opts ...KCPOption) (*KCPConn, error) {
} }
configureKCPSession(session) configureKCPSession(session)
conn.sessionStatsSampler = newKCPSessionStatsSampler(session, conn.sessionStatsLogger, conn.nodeRole, conn.nodeID, conn.sessionStatsInterval)
return conn, nil return conn, nil
} }
@@ -75,10 +89,16 @@ func (c *KCPConn) Send(msg protocol.Message) error {
defer c.writeMu.Unlock() defer c.writeMu.Unlock()
latencylog.LogMessageEvent(c.logger, c.nodeRole, c.nodeID, latencylog.EventSendHandoffBegin, msg) latencylog.LogMessageEvent(c.logger, c.nodeRole, c.nodeID, latencylog.EventSendHandoffBegin, msg)
if c.sessionStatsSampler != nil {
c.sessionStatsSampler.SampleEvent(kcpStatsSampleReasonSendHandoffBegin)
}
if err := protocol.WriteMessage(c.session, msg); err != nil { if err := protocol.WriteMessage(c.session, msg); err != nil {
return fmt.Errorf("transport: kcp send message: %w", err) return fmt.Errorf("transport: kcp send message: %w", err)
} }
latencylog.LogMessageEvent(c.logger, c.nodeRole, c.nodeID, latencylog.EventSendHandoffEnd, msg) latencylog.LogMessageEvent(c.logger, c.nodeRole, c.nodeID, latencylog.EventSendHandoffEnd, msg)
if c.sessionStatsSampler != nil {
c.sessionStatsSampler.SampleEvent(kcpStatsSampleReasonSendHandoffEnd)
}
return nil return nil
} }
@@ -88,6 +108,9 @@ func (c *KCPConn) Receive() (protocol.Message, error) {
if err != nil { if err != nil {
return protocol.Message{}, fmt.Errorf("transport: kcp receive message: %w", err) return protocol.Message{}, fmt.Errorf("transport: kcp receive message: %w", err)
} }
if c.sessionStatsSampler != nil {
c.sessionStatsSampler.SampleEvent(kcpStatsSampleReasonReceive)
}
return msg, nil return msg, nil
} }
@@ -110,6 +133,9 @@ func (c *KCPConn) ReceiveLoop(handler func(protocol.Message) error) error {
// Close 关闭底层 KCP 会话,并保证重复调用是安全的。 // Close 关闭底层 KCP 会话,并保证重复调用是安全的。
func (c *KCPConn) Close() error { func (c *KCPConn) Close() error {
c.closeOnce.Do(func() { c.closeOnce.Do(func() {
if c.sessionStatsSampler != nil {
c.sessionStatsSampler.Close()
}
c.closeErr = c.session.Close() c.closeErr = c.session.Close()
}) })
return c.closeErr return c.closeErr

View File

@@ -77,6 +77,9 @@ func assertKCPPacketRecord(t *testing.T, records []KCPPacketDebugRecord, wantEve
if record.KCPConv == nil { if record.KCPConv == nil {
t.Fatalf("record %s missing kcp_conv: %+v", wantEvent, record) t.Fatalf("record %s missing kcp_conv: %+v", wantEvent, record)
} }
if len(record.Segments) == 0 {
t.Fatalf("record %s missing parsed segments: %+v", wantEvent, record)
}
if wantUDPTXID && record.UDPTXID == nil { if wantUDPTXID && record.UDPTXID == nil {
t.Fatalf("record %s missing udp_tx_id: %+v", wantEvent, record) t.Fatalf("record %s missing udp_tx_id: %+v", wantEvent, record)
} }

View File

@@ -1,7 +1,6 @@
package transport package transport
import ( import (
"encoding/binary"
"net" "net"
"sync" "sync"
"time" "time"
@@ -61,7 +60,7 @@ func (c *kcpPacketConnBase) logKCPPacketDebugRecord(record KCPPacketDebugRecord)
_ = c.logger.LogKCPPacketDebugRecord(record) _ = c.logger.LogKCPPacketDebugRecord(record)
} }
func (c *kcpPacketConnBase) newKCPPacketDebugRecord(event string, remoteAddr net.Addr, packetBytes int, tsUnixNano int64, udpTxID *uint32, kcpConv *uint32) KCPPacketDebugRecord { func (c *kcpPacketConnBase) newKCPPacketDebugRecord(event string, remoteAddr net.Addr, packetBytes int, tsUnixNano int64, udpTxID *uint32, kcpConv *uint32, segments []KCPPacketDebugSegment) KCPPacketDebugRecord {
record := KCPPacketDebugRecord{ record := KCPPacketDebugRecord{
Event: event, Event: event,
NodeRole: c.nodeRole, NodeRole: c.nodeRole,
@@ -71,6 +70,7 @@ func (c *kcpPacketConnBase) newKCPPacketDebugRecord(event string, remoteAddr net
PacketBytes: packetBytes, PacketBytes: packetBytes,
UDPTXID: udpTxID, UDPTXID: udpTxID,
KCPConv: kcpConv, KCPConv: kcpConv,
Segments: append([]KCPPacketDebugSegment(nil), segments...),
TSUnixNano: tsUnixNano, TSUnixNano: tsUnixNano,
} }
if localAddr := c.conn.LocalAddr(); localAddr != nil { if localAddr := c.conn.LocalAddr(); localAddr != nil {
@@ -81,11 +81,3 @@ func (c *kcpPacketConnBase) newKCPPacketDebugRecord(event string, remoteAddr net
} }
return record return record
} }
func parseKCPConversationID(packet []byte) *uint32 {
if len(packet) < 4 {
return nil
}
conv := binary.LittleEndian.Uint32(packet[:4])
return &conv
}

View File

@@ -17,6 +17,7 @@ type kcpPendingPacketDebug struct {
remoteAddr net.Addr remoteAddr net.Addr
packetBytes int packetBytes int
kcpConv *uint32 kcpConv *uint32
segments []KCPPacketDebugSegment
timestamps map[string]int64 timestamps map[string]int64
} }
@@ -75,13 +76,15 @@ func (c *platformKCPPacketConn) ReadFrom(p []byte) (int, net.Addr, error) {
} }
if rxTimestamp > 0 { if rxTimestamp > 0 {
kcpConv, segments := parseKCPPacketMetadata(p[:n])
c.logKCPPacketDebugRecord(c.newKCPPacketDebugRecord( c.logKCPPacketDebugRecord(c.newKCPPacketDebugRecord(
latencylog.EventBRXSoftware, latencylog.EventBRXSoftware,
addr, addr,
n, n,
rxTimestamp, rxTimestamp,
nil, nil,
parseKCPConversationID(p[:n]), kcpConv,
segments,
)) ))
} }
@@ -102,7 +105,11 @@ func (c *platformKCPPacketConn) WriteTo(p []byte, addr net.Addr) (int, error) {
return 0, fmt.Errorf("transport: kcp packet write target must be UDPAddr, got %T", addr) return 0, fmt.Errorf("transport: kcp packet write target must be UDPAddr, got %T", addr)
} }
expectedTXID := c.nextExpectedTXID() // Reserve the local txID before the send so an immediately-arriving errqueue
// event can still find its pending record. If the send never succeeds, roll
// the reservation back to keep the local txID mirror aligned with the kernel.
kcpConv, segments := parseKCPPacketMetadata(p)
expectedTXID := c.reservePendingTX(udpAddr, len(p), kcpConv, segments)
for { for {
err := c.sendmsgRaw(p, udpAddr) err := c.sendmsgRaw(p, udpAddr)
if err != nil { if err != nil {
@@ -110,10 +117,10 @@ func (c *platformKCPPacketConn) WriteTo(p []byte, addr net.Addr) (int, error) {
time.Sleep(linuxDataPollInterval) time.Sleep(linuxDataPollInterval)
continue continue
} }
c.rollbackPendingTX(expectedTXID)
return 0, err return 0, err
} }
c.storePendingTX(expectedTXID, udpAddr, len(p), parseKCPConversationID(p))
return len(p), nil return len(p), nil
} }
} }
@@ -247,6 +254,7 @@ func (c *platformKCPPacketConn) collectTXErrqueueLoop() {
event.TSUnixNano, event.TSUnixNano,
&udpTxID, &udpTxID,
record.kcpConv, record.kcpConv,
record.segments,
)) ))
if complete { if complete {
@@ -281,25 +289,31 @@ func (c *platformKCPPacketConn) recvTXErrqueueOnce() (txTimestampEvent, error) {
return event, nil return event, nil
} }
func (c *platformKCPPacketConn) nextExpectedTXID() uint32 { func (c *platformKCPPacketConn) reservePendingTX(remoteAddr net.Addr, packetBytes int, kcpConv *uint32, segments []KCPPacketDebugSegment) uint32 {
c.pendingMu.Lock() c.pendingMu.Lock()
defer c.pendingMu.Unlock() defer c.pendingMu.Unlock()
next := c.nextTXID txID := c.nextTXID
c.nextTXID++ c.nextTXID++
return next
}
func (c *platformKCPPacketConn) storePendingTX(txID uint32, remoteAddr net.Addr, packetBytes int, kcpConv *uint32) {
c.pendingMu.Lock()
defer c.pendingMu.Unlock()
c.pendingTX[txID] = kcpPendingPacketDebug{ c.pendingTX[txID] = kcpPendingPacketDebug{
remoteAddr: remoteAddr, remoteAddr: remoteAddr,
packetBytes: packetBytes, packetBytes: packetBytes,
kcpConv: kcpConv, kcpConv: kcpConv,
segments: append([]KCPPacketDebugSegment(nil), segments...),
timestamps: make(map[string]int64, 2), timestamps: make(map[string]int64, 2),
} }
return txID
}
func (c *platformKCPPacketConn) rollbackPendingTX(txID uint32) {
c.pendingMu.Lock()
defer c.pendingMu.Unlock()
delete(c.pendingTX, txID)
if c.nextTXID == txID+1 {
c.nextTXID = txID
}
} }
func (c *platformKCPPacketConn) recordPendingTXEvent(event txTimestampEvent) (*kcpPendingPacketDebug, bool) { func (c *platformKCPPacketConn) recordPendingTXEvent(event txTimestampEvent) (*kcpPendingPacketDebug, bool) {

View File

@@ -0,0 +1,62 @@
//go:build linux
package transport
import (
"net"
"testing"
)
func TestKCPPendingTXReservationRollbackRestoresSequence(t *testing.T) {
conn := &platformKCPPacketConn{
kcpPacketConnBase: &kcpPacketConnBase{
closed: make(chan struct{}),
},
pendingTX: make(map[uint32]kcpPendingPacketDebug),
}
conv := uint32(42)
txID := conn.reservePendingTX(&net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 9000}, 128, &conv, nil)
if txID != 0 {
t.Fatalf("reservePendingTX() txID = %d, want 0", txID)
}
if conn.nextTXID != 1 {
t.Fatalf("nextTXID after reserve = %d, want 1", conn.nextTXID)
}
if _, ok := conn.pendingTX[txID]; !ok {
t.Fatal("pendingTX missing reserved record")
}
conn.rollbackPendingTX(txID)
if conn.nextTXID != 0 {
t.Fatalf("nextTXID after rollback = %d, want 0", conn.nextTXID)
}
if _, ok := conn.pendingTX[txID]; ok {
t.Fatal("pendingTX still contains rolled back record")
}
}
func TestKCPPendingTXReservationPreservesLaterSequenceOnOutOfOrderRollback(t *testing.T) {
conn := &platformKCPPacketConn{
kcpPacketConnBase: &kcpPacketConnBase{
closed: make(chan struct{}),
},
pendingTX: make(map[uint32]kcpPendingPacketDebug),
}
first := conn.reservePendingTX(&net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 9000}, 64, nil, nil)
second := conn.reservePendingTX(&net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 9001}, 64, nil, nil)
if first != 0 || second != 1 {
t.Fatalf("reserved tx IDs = %d,%d, want 0,1", first, second)
}
conn.rollbackPendingTX(first)
if conn.nextTXID != 2 {
t.Fatalf("nextTXID after out-of-order rollback = %d, want 2", conn.nextTXID)
}
if _, ok := conn.pendingTX[second]; !ok {
t.Fatal("pendingTX lost later reservation after rollback")
}
}

View File

@@ -10,15 +10,26 @@ import (
// KCPPacketDebugRecord 是 KCP 底层 UDP packet kernel timestamp 的一条 JSONL 调试记录。 // KCPPacketDebugRecord 是 KCP 底层 UDP packet kernel timestamp 的一条 JSONL 调试记录。
type KCPPacketDebugRecord struct { type KCPPacketDebugRecord struct {
Event string `json:"event"` Event string `json:"event"`
NodeRole string `json:"node_role,omitempty"` NodeRole string `json:"node_role,omitempty"`
NodeID string `json:"node_id,omitempty"` NodeID string `json:"node_id,omitempty"`
LocalAddr string `json:"local_addr,omitempty"` LocalAddr string `json:"local_addr,omitempty"`
RemoteAddr string `json:"remote_addr,omitempty"` RemoteAddr string `json:"remote_addr,omitempty"`
PacketBytes int `json:"packet_bytes"` PacketBytes int `json:"packet_bytes"`
UDPTXID *uint32 `json:"udp_tx_id,omitempty"` UDPTXID *uint32 `json:"udp_tx_id,omitempty"`
KCPConv *uint32 `json:"kcp_conv,omitempty"` KCPConv *uint32 `json:"kcp_conv,omitempty"`
TSUnixNano int64 `json:"ts_unix_nano"` Segments []KCPPacketDebugSegment `json:"segments,omitempty"`
TSUnixNano int64 `json:"ts_unix_nano"`
}
// KCPPacketDebugSegment 是一个 UDP datagram 中解析出的 KCP segment 头信息。
type KCPPacketDebugSegment struct {
Cmd uint8 `json:"cmd"`
SN uint32 `json:"sn"`
UNA uint32 `json:"una"`
Frg uint8 `json:"frg"`
Wnd uint16 `json:"wnd"`
Len uint32 `json:"len"`
} }
// KCPPacketDebugLogger 接收 KCP packet 级调试记录。 // KCPPacketDebugLogger 接收 KCP packet 级调试记录。

View File

@@ -0,0 +1,55 @@
package transport
import "encoding/binary"
const kcpPacketHeaderSize = 24
func parseKCPPacketMetadata(packet []byte) (*uint32, []KCPPacketDebugSegment) {
conv := parseKCPConversationID(packet)
segments, ok := parseKCPPacketSegments(packet)
if !ok {
return conv, nil
}
return conv, segments
}
func parseKCPConversationID(packet []byte) *uint32 {
if len(packet) < 4 {
return nil
}
conv := binary.LittleEndian.Uint32(packet[:4])
return &conv
}
func parseKCPPacketSegments(packet []byte) ([]KCPPacketDebugSegment, bool) {
if len(packet) == 0 {
return nil, false
}
data := packet
segments := make([]KCPPacketDebugSegment, 0, 1)
for len(data) > 0 {
if len(data) < kcpPacketHeaderSize {
return nil, false
}
segmentLen := binary.LittleEndian.Uint32(data[20:])
totalLen := kcpPacketHeaderSize + int(segmentLen)
if len(data) < totalLen {
return nil, false
}
segments = append(segments, KCPPacketDebugSegment{
Cmd: data[4],
Frg: data[5],
Wnd: binary.LittleEndian.Uint16(data[6:8]),
SN: binary.LittleEndian.Uint32(data[12:16]),
UNA: binary.LittleEndian.Uint32(data[16:20]),
Len: segmentLen,
})
data = data[totalLen:]
}
return segments, true
}

View File

@@ -0,0 +1,444 @@
package transport
import (
"encoding/json"
"fmt"
"net"
"os"
"path/filepath"
"reflect"
"sync"
"time"
kcp "github.com/xtaci/kcp-go/v5"
)
const DefaultKCPSessionStatsInterval = 100 * time.Millisecond
const (
kcpSessionStatsRecordTypeSessionSample = "session_sample"
kcpSessionStatsRecordTypeProcessSNMPSample = "process_snmp_sample"
kcpStatsSampleReasonPeriodic = "periodic"
kcpStatsSampleReasonSendHandoffBegin = "send_handoff_begin"
kcpStatsSampleReasonSendHandoffEnd = "send_handoff_end"
kcpStatsSampleReasonReceive = "receive"
kcpStatsSampleReasonClose = "close"
)
// KCPSessionStatsRecord is a JSONL record for KCP session and process stats.
type KCPSessionStatsRecord struct {
RecordType string `json:"record_type"`
NodeRole string `json:"node_role,omitempty"`
NodeID string `json:"node_id,omitempty"`
LocalAddr string `json:"local_addr,omitempty"`
RemoteAddr string `json:"remote_addr,omitempty"`
Conv *uint32 `json:"conv,omitempty"`
TSUnixNano int64 `json:"ts_unix_nano"`
SampleReason string `json:"sample_reason"`
RTOMillis *uint32 `json:"rto_ms,omitempty"`
SRTTMillis *int32 `json:"srtt_ms,omitempty"`
SRTTVarMillis *int32 `json:"srttvar_ms,omitempty"`
BytesSent *uint64 `json:"bytes_sent,omitempty"`
BytesReceived *uint64 `json:"bytes_received,omitempty"`
InPkts *uint64 `json:"in_pkts,omitempty"`
OutPkts *uint64 `json:"out_pkts,omitempty"`
InSegs *uint64 `json:"in_segs,omitempty"`
OutSegs *uint64 `json:"out_segs,omitempty"`
RetransSegs *uint64 `json:"retrans_segs,omitempty"`
FastRetransSegs *uint64 `json:"fast_retrans_segs,omitempty"`
EarlyRetransSegs *uint64 `json:"early_retrans_segs,omitempty"`
LostSegs *uint64 `json:"lost_segs,omitempty"`
RepeatSegs *uint64 `json:"repeat_segs,omitempty"`
InErrs *uint64 `json:"in_errs,omitempty"`
KCPInErrs *uint64 `json:"kcp_in_errs,omitempty"`
RingBufferSndQueue *uint64 `json:"ring_buffer_snd_queue,omitempty"`
RingBufferRcvQueue *uint64 `json:"ring_buffer_rcv_queue,omitempty"`
RingBufferSndBuffer *uint64 `json:"ring_buffer_snd_buffer,omitempty"`
CurrEstab *uint64 `json:"curr_estab,omitempty"`
}
// KCPSessionStatsLogger receives KCP session stats records.
type KCPSessionStatsLogger interface {
LogKCPSessionStatsRecord(record KCPSessionStatsRecord) error
}
// JSONLKCPSessionStatsLogger appends KCP session stats as JSONL.
type JSONLKCPSessionStatsLogger struct {
mu sync.Mutex
closeOnce sync.Once
closeErr error
file *os.File
}
// NewJSONLKCPSessionStatsLogger creates a thread-safe JSONL stats logger.
func NewJSONLKCPSessionStatsLogger(path string) (*JSONLKCPSessionStatsLogger, error) {
dir := filepath.Dir(path)
if err := os.MkdirAll(dir, 0o755); err != nil {
return nil, fmt.Errorf("transport: create kcp session stats log dir %s: %w", dir, err)
}
file, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0o644)
if err != nil {
return nil, fmt.Errorf("transport: open kcp session stats log %s: %w", path, err)
}
return &JSONLKCPSessionStatsLogger{file: file}, nil
}
// LogKCPSessionStatsRecord appends one JSONL record.
func (l *JSONLKCPSessionStatsLogger) LogKCPSessionStatsRecord(record KCPSessionStatsRecord) error {
line, err := json.Marshal(record)
if err != nil {
return err
}
l.mu.Lock()
defer l.mu.Unlock()
if _, err := l.file.Write(append(line, '\n')); err != nil {
return err
}
return nil
}
// Close closes the underlying file.
func (l *JSONLKCPSessionStatsLogger) Close() error {
l.closeOnce.Do(func() {
l.closeErr = l.file.Close()
})
return l.closeErr
}
// ParseKCPSessionStatsInterval parses a duration string for stats sampling.
func ParseKCPSessionStatsInterval(raw string) (time.Duration, error) {
if raw == "" {
return DefaultKCPSessionStatsInterval, nil
}
interval, err := time.ParseDuration(raw)
if err != nil {
return 0, fmt.Errorf("transport: parse kcp session stats interval %q: %w", raw, err)
}
if interval <= 0 {
return 0, fmt.Errorf("transport: kcp session stats interval must be greater than zero")
}
return interval, nil
}
type kcpSessionStatsSource interface {
GetConv() uint32
GetRTO() uint32
GetSRTT() int32
GetSRTTVar() int32
LocalAddr() net.Addr
RemoteAddr() net.Addr
}
type kcpSessionStatsSampler struct {
source kcpSessionStatsSource
logger KCPSessionStatsLogger
nodeRole string
nodeID string
interval time.Duration
processSampler *kcpProcessStatsSampler
sampleMu sync.Mutex
stopOnce sync.Once
stopCh chan struct{}
stoppedCh chan struct{}
}
func newKCPSessionStatsSampler(source kcpSessionStatsSource, logger KCPSessionStatsLogger, nodeRole, nodeID string, interval time.Duration) *kcpSessionStatsSampler {
if source == nil || logger == nil {
return nil
}
if interval <= 0 {
interval = DefaultKCPSessionStatsInterval
}
sampler := &kcpSessionStatsSampler{
source: source,
logger: logger,
nodeRole: nodeRole,
nodeID: nodeID,
interval: interval,
processSampler: acquireKCPProcessStatsSampler(logger, nodeRole, nodeID, interval),
stopCh: make(chan struct{}),
stoppedCh: make(chan struct{}),
}
go sampler.run()
return sampler
}
func (s *kcpSessionStatsSampler) run() {
ticker := time.NewTicker(s.interval)
defer ticker.Stop()
defer close(s.stoppedCh)
for {
select {
case <-ticker.C:
s.logSessionSample(kcpStatsSampleReasonPeriodic)
case <-s.stopCh:
return
}
}
}
func (s *kcpSessionStatsSampler) SampleEvent(reason string) {
if s == nil {
return
}
s.logSessionSample(reason)
if s.processSampler == nil {
return
}
if reason == kcpStatsSampleReasonClose {
s.processSampler.requestSampleAndWait(reason)
return
}
s.processSampler.requestSample(reason)
}
func (s *kcpSessionStatsSampler) Close() {
if s == nil {
return
}
s.stopOnce.Do(func() {
s.SampleEvent(kcpStatsSampleReasonClose)
close(s.stopCh)
<-s.stoppedCh
releaseKCPProcessStatsSampler(s.processSampler)
})
}
func (s *kcpSessionStatsSampler) logSessionSample(reason string) {
s.sampleMu.Lock()
defer s.sampleMu.Unlock()
conv := s.source.GetConv()
rto := s.source.GetRTO()
srtt := s.source.GetSRTT()
srttVar := s.source.GetSRTTVar()
record := KCPSessionStatsRecord{
RecordType: kcpSessionStatsRecordTypeSessionSample,
NodeRole: s.nodeRole,
NodeID: s.nodeID,
LocalAddr: addrString(s.source.LocalAddr()),
RemoteAddr: addrString(s.source.RemoteAddr()),
Conv: uint32Ptr(conv),
TSUnixNano: time.Now().UTC().UnixNano(),
SampleReason: reason,
RTOMillis: uint32Ptr(rto),
SRTTMillis: int32Ptr(srtt),
SRTTVarMillis: int32Ptr(srttVar),
}
_ = s.logger.LogKCPSessionStatsRecord(record)
}
type kcpProcessSampleRequest struct {
reason string
done chan struct{}
}
type kcpProcessStatsSampler struct {
key string
logger KCPSessionStatsLogger
nodeRole string
nodeID string
interval time.Duration
requestCh chan kcpProcessSampleRequest
stopCh chan struct{}
stoppedCh chan struct{}
sampleMu sync.Mutex
previous *kcp.Snmp
refCount int
}
var (
kcpProcessSamplersMu sync.Mutex
kcpProcessSamplers = make(map[string]*kcpProcessStatsSampler)
)
func acquireKCPProcessStatsSampler(logger KCPSessionStatsLogger, nodeRole, nodeID string, interval time.Duration) *kcpProcessStatsSampler {
if logger == nil {
return nil
}
if interval <= 0 {
interval = DefaultKCPSessionStatsInterval
}
key := fmt.Sprintf("%s|%s|%s|%d", kcpStatsLoggerIdentity(logger), nodeRole, nodeID, interval)
kcpProcessSamplersMu.Lock()
defer kcpProcessSamplersMu.Unlock()
if sampler, ok := kcpProcessSamplers[key]; ok {
sampler.refCount++
return sampler
}
sampler := &kcpProcessStatsSampler{
key: key,
logger: logger,
nodeRole: nodeRole,
nodeID: nodeID,
interval: interval,
requestCh: make(chan kcpProcessSampleRequest, 1),
stopCh: make(chan struct{}),
stoppedCh: make(chan struct{}),
previous: kcp.DefaultSnmp.Copy(),
refCount: 1,
}
kcpProcessSamplers[key] = sampler
go sampler.run()
return sampler
}
func releaseKCPProcessStatsSampler(sampler *kcpProcessStatsSampler) {
if sampler == nil {
return
}
kcpProcessSamplersMu.Lock()
sampler.refCount--
if sampler.refCount > 0 {
kcpProcessSamplersMu.Unlock()
return
}
delete(kcpProcessSamplers, sampler.key)
close(sampler.stopCh)
kcpProcessSamplersMu.Unlock()
<-sampler.stoppedCh
}
func (s *kcpProcessStatsSampler) run() {
ticker := time.NewTicker(s.interval)
defer ticker.Stop()
defer close(s.stoppedCh)
for {
select {
case <-ticker.C:
s.logProcessSample(kcpStatsSampleReasonPeriodic)
case req := <-s.requestCh:
s.logProcessSample(req.reason)
if req.done != nil {
close(req.done)
}
case <-s.stopCh:
return
}
}
}
func (s *kcpProcessStatsSampler) requestSample(reason string) {
if s == nil {
return
}
select {
case s.requestCh <- kcpProcessSampleRequest{reason: reason}:
default:
}
}
func (s *kcpProcessStatsSampler) requestSampleAndWait(reason string) {
if s == nil {
return
}
done := make(chan struct{})
select {
case s.requestCh <- kcpProcessSampleRequest{reason: reason, done: done}:
<-done
default:
s.logProcessSample(reason)
}
}
func (s *kcpProcessStatsSampler) logProcessSample(reason string) {
s.sampleMu.Lock()
defer s.sampleMu.Unlock()
current := kcp.DefaultSnmp.Copy()
record := newKCPProcessSNMPSampleRecord(s.nodeRole, s.nodeID, reason, s.previous, current)
s.previous = current
_ = s.logger.LogKCPSessionStatsRecord(record)
}
func newKCPProcessSNMPSampleRecord(nodeRole, nodeID, reason string, previous, current *kcp.Snmp) KCPSessionStatsRecord {
return KCPSessionStatsRecord{
RecordType: kcpSessionStatsRecordTypeProcessSNMPSample,
NodeRole: nodeRole,
NodeID: nodeID,
TSUnixNano: time.Now().UTC().UnixNano(),
SampleReason: reason,
BytesSent: uint64Ptr(diffUint64(previous.BytesSent, current.BytesSent)),
BytesReceived: uint64Ptr(diffUint64(previous.BytesReceived, current.BytesReceived)),
InPkts: uint64Ptr(diffUint64(previous.InPkts, current.InPkts)),
OutPkts: uint64Ptr(diffUint64(previous.OutPkts, current.OutPkts)),
InSegs: uint64Ptr(diffUint64(previous.InSegs, current.InSegs)),
OutSegs: uint64Ptr(diffUint64(previous.OutSegs, current.OutSegs)),
RetransSegs: uint64Ptr(diffUint64(previous.RetransSegs, current.RetransSegs)),
FastRetransSegs: uint64Ptr(diffUint64(previous.FastRetransSegs, current.FastRetransSegs)),
EarlyRetransSegs: uint64Ptr(diffUint64(previous.EarlyRetransSegs, current.EarlyRetransSegs)),
LostSegs: uint64Ptr(diffUint64(previous.LostSegs, current.LostSegs)),
RepeatSegs: uint64Ptr(diffUint64(previous.RepeatSegs, current.RepeatSegs)),
InErrs: uint64Ptr(diffUint64(previous.InErrs, current.InErrs)),
KCPInErrs: uint64Ptr(diffUint64(previous.KCPInErrors, current.KCPInErrors)),
RingBufferSndQueue: uint64Ptr(current.RingBufferSndQueue),
RingBufferRcvQueue: uint64Ptr(current.RingBufferRcvQueue),
RingBufferSndBuffer: uint64Ptr(current.RingBufferSndBuffer),
CurrEstab: uint64Ptr(current.CurrEstab),
}
}
func kcpStatsLoggerIdentity(logger KCPSessionStatsLogger) string {
value := reflect.ValueOf(logger)
switch value.Kind() {
case reflect.Pointer, reflect.Map, reflect.Func, reflect.Slice, reflect.Chan, reflect.UnsafePointer:
return fmt.Sprintf("%T:%x", logger, value.Pointer())
default:
return fmt.Sprintf("%T:%v", logger, logger)
}
}
func diffUint64(previous, current uint64) uint64 {
if current < previous {
return 0
}
return current - previous
}
func addrString(addr net.Addr) string {
if addr == nil {
return ""
}
return addr.String()
}
func uint32Ptr(value uint32) *uint32 {
return &value
}
func uint64Ptr(value uint64) *uint64 {
return &value
}
func int32Ptr(value int32) *int32 {
return &value
}

View File

@@ -0,0 +1,409 @@
package transport
import (
"encoding/binary"
"net"
"sync"
"testing"
"time"
kcp "github.com/xtaci/kcp-go/v5"
"omnisocketgo/cmd/internal/latencylog"
"omnisocketgo/cmd/internal/protocol"
)
type recordingKCPSessionStatsLogger struct {
mu sync.Mutex
records []KCPSessionStatsRecord
}
func (l *recordingKCPSessionStatsLogger) LogKCPSessionStatsRecord(record KCPSessionStatsRecord) error {
l.mu.Lock()
defer l.mu.Unlock()
l.records = append(l.records, record)
return nil
}
func (l *recordingKCPSessionStatsLogger) Records() []KCPSessionStatsRecord {
l.mu.Lock()
defer l.mu.Unlock()
return append([]KCPSessionStatsRecord(nil), l.records...)
}
type stubKCPSessionStatsSource struct {
conv uint32
rto uint32
srtt int32
srttVar int32
local net.Addr
remote net.Addr
}
func (s stubKCPSessionStatsSource) GetConv() uint32 { return s.conv }
func (s stubKCPSessionStatsSource) GetRTO() uint32 { return s.rto }
func (s stubKCPSessionStatsSource) GetSRTT() int32 { return s.srtt }
func (s stubKCPSessionStatsSource) GetSRTTVar() int32 { return s.srttVar }
func (s stubKCPSessionStatsSource) LocalAddr() net.Addr { return s.local }
func (s stubKCPSessionStatsSource) RemoteAddr() net.Addr { return s.remote }
func TestParseKCPPacketMetadataSingleSegment(t *testing.T) {
packet := buildTestKCPDatagram(42, []testKCPSegment{
{cmd: 81, sn: 7, una: 3, frg: 1, wnd: 128, length: 5},
})
conv, segments := parseKCPPacketMetadata(packet)
if conv == nil || *conv != 42 {
t.Fatalf("conv = %v, want 42", conv)
}
if len(segments) != 1 {
t.Fatalf("segment count = %d, want 1", len(segments))
}
if segments[0].Cmd != 81 || segments[0].SN != 7 || segments[0].UNA != 3 || segments[0].Frg != 1 || segments[0].Wnd != 128 || segments[0].Len != 5 {
t.Fatalf("segment = %+v, want expected header values", segments[0])
}
}
func TestParseKCPPacketMetadataMultiSegment(t *testing.T) {
packet := buildTestKCPDatagram(99, []testKCPSegment{
{cmd: 82, sn: 10, una: 5, frg: 2, wnd: 64, length: 3},
{cmd: 83, sn: 11, una: 6, frg: 0, wnd: 96, length: 0},
})
conv, segments := parseKCPPacketMetadata(packet)
if conv == nil || *conv != 99 {
t.Fatalf("conv = %v, want 99", conv)
}
if len(segments) != 2 {
t.Fatalf("segment count = %d, want 2", len(segments))
}
if segments[0].SN != 10 || segments[1].SN != 11 {
t.Fatalf("segments = %+v, want in-order sequence numbers", segments)
}
}
func TestParseKCPPacketMetadataReturnsNoSegmentsForTruncatedPacket(t *testing.T) {
packet := buildTestKCPDatagram(7, []testKCPSegment{
{cmd: 81, sn: 1, una: 0, frg: 0, wnd: 32, length: 4},
})
packet = packet[:len(packet)-1]
conv, segments := parseKCPPacketMetadata(packet)
if conv == nil || *conv != 7 {
t.Fatalf("conv = %v, want 7", conv)
}
if len(segments) != 0 {
t.Fatalf("segments = %+v, want empty on truncated packet", segments)
}
}
func TestParseKCPSessionStatsInterval(t *testing.T) {
tests := []struct {
name string
raw string
want time.Duration
wantErr bool
}{
{name: "default", raw: "", want: DefaultKCPSessionStatsInterval},
{name: "valid", raw: "250ms", want: 250 * time.Millisecond},
{name: "invalid format", raw: "soon", wantErr: true},
{name: "invalid zero", raw: "0s", wantErr: true},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := ParseKCPSessionStatsInterval(tt.raw)
if tt.wantErr {
if err == nil {
t.Fatalf("ParseKCPSessionStatsInterval(%q) error = nil, want non-nil", tt.raw)
}
return
}
if err != nil {
t.Fatalf("ParseKCPSessionStatsInterval(%q) error = %v", tt.raw, err)
}
if got != tt.want {
t.Fatalf("ParseKCPSessionStatsInterval(%q) = %v, want %v", tt.raw, got, tt.want)
}
})
}
}
func TestKCPProcessSNMPSampleRecordUsesCounterDeltasAndGaugeSnapshots(t *testing.T) {
previous := &kcp.Snmp{
BytesSent: 10,
BytesReceived: 20,
InPkts: 30,
OutPkts: 40,
InSegs: 50,
OutSegs: 60,
RetransSegs: 70,
FastRetransSegs: 80,
EarlyRetransSegs: 90,
LostSegs: 100,
RepeatSegs: 110,
InErrs: 120,
KCPInErrors: 130,
RingBufferSndQueue: 4,
RingBufferRcvQueue: 5,
RingBufferSndBuffer: 6,
CurrEstab: 1,
}
current := &kcp.Snmp{
BytesSent: 15,
BytesReceived: 22,
InPkts: 31,
OutPkts: 44,
InSegs: 55,
OutSegs: 61,
RetransSegs: 79,
FastRetransSegs: 81,
EarlyRetransSegs: 95,
LostSegs: 101,
RepeatSegs: 115,
InErrs: 121,
KCPInErrors: 135,
RingBufferSndQueue: 9,
RingBufferRcvQueue: 8,
RingBufferSndBuffer: 7,
CurrEstab: 3,
}
record := newKCPProcessSNMPSampleRecord(latencylog.NodeRoleServer, "hub", kcpStatsSampleReasonReceive, previous, current)
if record.RecordType != kcpSessionStatsRecordTypeProcessSNMPSample {
t.Fatalf("record type = %q, want %q", record.RecordType, kcpSessionStatsRecordTypeProcessSNMPSample)
}
if record.Conv != nil {
t.Fatalf("process sample conv = %v, want nil", record.Conv)
}
if record.BytesSent == nil || *record.BytesSent != 5 {
t.Fatalf("bytes_sent = %v, want 5", record.BytesSent)
}
if record.KCPInErrs == nil || *record.KCPInErrs != 5 {
t.Fatalf("kcp_in_errs = %v, want 5", record.KCPInErrs)
}
if record.RingBufferSndQueue == nil || *record.RingBufferSndQueue != 9 {
t.Fatalf("ring_buffer_snd_queue = %v, want 9", record.RingBufferSndQueue)
}
if record.CurrEstab == nil || *record.CurrEstab != 3 {
t.Fatalf("curr_estab = %v, want 3", record.CurrEstab)
}
}
func TestKCPProcessStatsSamplerIsSharedPerLoggerRoleNodeAndInterval(t *testing.T) {
logger := &recordingKCPSessionStatsLogger{}
first := acquireKCPProcessStatsSampler(logger, latencylog.NodeRoleServer, "hub", 10*time.Millisecond)
second := acquireKCPProcessStatsSampler(logger, latencylog.NodeRoleServer, "hub", 10*time.Millisecond)
if first != second {
t.Fatal("expected acquireKCPProcessStatsSampler to reuse sampler for same logger/role/node/interval")
}
releaseKCPProcessStatsSampler(first)
releaseKCPProcessStatsSampler(second)
}
func TestKCPSessionStatsSamplerRecordsEventAndPeriodicSamples(t *testing.T) {
kcp.DefaultSnmp.Reset()
logger := &recordingKCPSessionStatsLogger{}
source := stubKCPSessionStatsSource{
conv: 77,
rto: 25,
srtt: 10,
srttVar: 3,
local: &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 9001},
remote: &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 9002},
}
sampler := newKCPSessionStatsSampler(source, logger, latencylog.NodeRolePeer, "peer-a", 10*time.Millisecond)
waitForKCPSessionStatsRecords(t, logger, func(records []KCPSessionStatsRecord) bool {
return hasKCPSessionStatsRecord(records, kcpSessionStatsRecordTypeSessionSample, kcpStatsSampleReasonPeriodic)
}, "periodic session sample")
sampler.SampleEvent(kcpStatsSampleReasonSendHandoffBegin)
sampler.SampleEvent(kcpStatsSampleReasonSendHandoffEnd)
sampler.SampleEvent(kcpStatsSampleReasonReceive)
waitForKCPSessionStatsRecords(t, logger, func(records []KCPSessionStatsRecord) bool {
return hasKCPSessionStatsRecord(records, kcpSessionStatsRecordTypeSessionSample, kcpStatsSampleReasonSendHandoffBegin) &&
hasKCPSessionStatsRecord(records, kcpSessionStatsRecordTypeSessionSample, kcpStatsSampleReasonSendHandoffEnd) &&
hasKCPSessionStatsRecord(records, kcpSessionStatsRecordTypeSessionSample, kcpStatsSampleReasonReceive)
}, "event-driven session samples")
sampler.Close()
waitForKCPSessionStatsRecords(t, logger, func(records []KCPSessionStatsRecord) bool {
return hasKCPSessionStatsRecord(records, kcpSessionStatsRecordTypeSessionSample, kcpStatsSampleReasonClose) &&
hasKCPSessionStatsRecord(records, kcpSessionStatsRecordTypeProcessSNMPSample, kcpStatsSampleReasonClose)
}, "close samples")
recordCount := len(logger.Records())
time.Sleep(30 * time.Millisecond)
if got := len(logger.Records()); got != recordCount {
t.Fatalf("record count after Close() = %d, want %d", got, recordCount)
}
}
func TestKCPConnSessionStatsLoggingSparseTraffic(t *testing.T) {
senderLogger := &recordingKCPSessionStatsLogger{}
receiverLogger := &recordingKCPSessionStatsLogger{}
sender, accepted, cleanup := newKCPConnPair(
t,
[]KCPOption{
WithKCPLogger(latencylog.NoopLogger{}, latencylog.NodeRolePeer, "peer-a"),
WithKCPSessionStatsLogger(senderLogger, 10*time.Millisecond),
},
[]KCPOption{
WithKCPLogger(latencylog.NoopLogger{}, latencylog.NodeRolePeer, "peer-b"),
WithKCPSessionStatsLogger(receiverLogger, 10*time.Millisecond),
},
nil,
nil,
)
defer cleanup()
msg := protocol.Message{
Type: protocol.MessageTypeText,
ID: 1,
From: "peer-a",
To: "peer-b",
Body: []byte("hello sparse"),
}
sendErr := make(chan error, 1)
go func() {
sendErr <- sender.Send(msg)
}()
receiver := awaitAcceptedKCPConn(t, accepted)
if _, err := receiver.Receive(); err != nil {
t.Fatalf("receiver.Receive() error = %v", err)
}
if err := <-sendErr; err != nil {
t.Fatalf("sender.Send() error = %v", err)
}
if err := sender.Close(); err != nil {
t.Fatalf("sender.Close() error = %v", err)
}
if err := receiver.Close(); err != nil {
t.Fatalf("receiver.Close() error = %v", err)
}
waitForKCPSessionStatsRecords(t, senderLogger, func(records []KCPSessionStatsRecord) bool {
return hasKCPSessionStatsRecord(records, kcpSessionStatsRecordTypeSessionSample, kcpStatsSampleReasonSendHandoffBegin) &&
hasKCPSessionStatsRecord(records, kcpSessionStatsRecordTypeSessionSample, kcpStatsSampleReasonSendHandoffEnd) &&
hasKCPSessionStatsRecord(records, kcpSessionStatsRecordTypeSessionSample, kcpStatsSampleReasonClose) &&
hasKCPSessionStatsRecord(records, kcpSessionStatsRecordTypeProcessSNMPSample, kcpStatsSampleReasonClose)
}, "sender sparse session stats")
waitForKCPSessionStatsRecords(t, receiverLogger, func(records []KCPSessionStatsRecord) bool {
return hasKCPSessionStatsRecord(records, kcpSessionStatsRecordTypeSessionSample, kcpStatsSampleReasonReceive) &&
hasKCPSessionStatsRecord(records, kcpSessionStatsRecordTypeSessionSample, kcpStatsSampleReasonClose)
}, "receiver sparse session stats")
}
func TestKCPConnSessionStatsLoggingContinuousTraffic(t *testing.T) {
logger := &recordingKCPSessionStatsLogger{}
sender, accepted, cleanup := newKCPConnPair(
t,
[]KCPOption{
WithKCPLogger(latencylog.NoopLogger{}, latencylog.NodeRolePeer, "peer-a"),
WithKCPSessionStatsLogger(logger, 20*time.Millisecond),
},
nil,
nil,
nil,
)
defer cleanup()
receiver := awaitAcceptedKCPConn(t, accepted)
receiveErr := make(chan error, 1)
go func() {
for i := 0; i < 12; i++ {
if _, err := receiver.Receive(); err != nil {
receiveErr <- err
return
}
}
receiveErr <- nil
}()
for i := 0; i < 12; i++ {
msg := protocol.Message{
Type: protocol.MessageTypeText,
ID: uint64(i + 1),
From: "peer-a",
To: "peer-b",
Body: []byte("hello continuous"),
}
if err := sender.Send(msg); err != nil {
t.Fatalf("sender.Send(message %d) error = %v", i+1, err)
}
time.Sleep(15 * time.Millisecond)
}
if err := <-receiveErr; err != nil {
t.Fatalf("receiver.Receive() error = %v", err)
}
waitForKCPSessionStatsRecords(t, logger, func(records []KCPSessionStatsRecord) bool {
return countKCPSessionStatsRecords(records, kcpSessionStatsRecordTypeSessionSample, kcpStatsSampleReasonPeriodic) >= 2 &&
countKCPSessionStatsRecords(records, kcpSessionStatsRecordTypeProcessSNMPSample, kcpStatsSampleReasonPeriodic) >= 2
}, "continuous periodic session stats")
}
type testKCPSegment struct {
cmd uint8
sn uint32
una uint32
frg uint8
wnd uint16
length uint32
}
func buildTestKCPDatagram(conv uint32, segments []testKCPSegment) []byte {
packet := make([]byte, 0)
for _, segment := range segments {
entry := make([]byte, kcpPacketHeaderSize+int(segment.length))
binary.LittleEndian.PutUint32(entry[0:4], conv)
entry[4] = segment.cmd
entry[5] = segment.frg
binary.LittleEndian.PutUint16(entry[6:8], segment.wnd)
binary.LittleEndian.PutUint32(entry[12:16], segment.sn)
binary.LittleEndian.PutUint32(entry[16:20], segment.una)
binary.LittleEndian.PutUint32(entry[20:24], segment.length)
packet = append(packet, entry...)
}
return packet
}
func hasKCPSessionStatsRecord(records []KCPSessionStatsRecord, recordType, reason string) bool {
return countKCPSessionStatsRecords(records, recordType, reason) > 0
}
func countKCPSessionStatsRecords(records []KCPSessionStatsRecord, recordType, reason string) int {
count := 0
for _, record := range records {
if record.RecordType == recordType && record.SampleReason == reason {
count++
}
}
return count
}
func waitForKCPSessionStatsRecords(t *testing.T, logger *recordingKCPSessionStatsLogger, condition func([]KCPSessionStatsRecord) bool, description string) {
t.Helper()
deadline := time.Now().Add(2 * time.Second)
for time.Now().Before(deadline) {
records := logger.Records()
if condition(records) {
return
}
time.Sleep(10 * time.Millisecond)
}
t.Fatalf("timed out waiting for %s", description)
}

View File

@@ -25,10 +25,17 @@ func main() {
inboxDir := flag.String("inbox-dir", "inbox", "directory used to persist received text and file messages") inboxDir := flag.String("inbox-dir", "inbox", "directory used to persist received text and file messages")
logPath := flag.String("latency-log", "", "optional JSONL file path for latency timestamp logs") logPath := flag.String("latency-log", "", "optional JSONL file path for latency timestamp logs")
kcpTimestampDebugLogPath := flag.String("kcp-ts-debug-log", "", "optional JSONL file path for KCP packet kernel timestamp debug records") kcpTimestampDebugLogPath := flag.String("kcp-ts-debug-log", "", "optional JSONL file path for KCP packet kernel timestamp debug records")
kcpSessionStatsLogPath := flag.String("kcp-session-stats-log", "", "optional JSONL file path for KCP session stats records")
kcpSessionStatsInterval := flag.String("kcp-session-stats-interval", transport.DefaultKCPSessionStatsInterval.String(), "sampling interval for KCP session stats, for example 100ms")
interactive := flag.Bool("interactive", true, "enable interactive REPL for repeated text/file sends on the same connection") interactive := flag.Bool("interactive", true, "enable interactive REPL for repeated text/file sends on the same connection")
flag.Parse() flag.Parse()
clientOptions := make([]peerpkg.Option, 0, 5) statsInterval, err := transport.ParseKCPSessionStatsInterval(*kcpSessionStatsInterval)
if err != nil {
log.Fatalf("parse -kcp-session-stats-interval=%q: %v", *kcpSessionStatsInterval, err)
}
clientOptions := make([]peerpkg.Option, 0, 6)
if *logPath != "" { if *logPath != "" {
logger, err := latencylog.NewJSONLLogger(*logPath) logger, err := latencylog.NewJSONLLogger(*logPath)
if err != nil { if err != nil {
@@ -45,6 +52,14 @@ func main() {
defer logger.Close() defer logger.Close()
clientOptions = append(clientOptions, peerpkg.WithKCPPacketDebugLogger(logger)) clientOptions = append(clientOptions, peerpkg.WithKCPPacketDebugLogger(logger))
} }
if *kcpSessionStatsLogPath != "" {
logger, err := transport.NewJSONLKCPSessionStatsLogger(*kcpSessionStatsLogPath)
if err != nil {
log.Fatalf("create kcp session stats logger %s: %v", *kcpSessionStatsLogPath, err)
}
defer logger.Close()
clientOptions = append(clientOptions, peerpkg.WithKCPSessionStatsLogger(logger, statsInterval))
}
if *bindIP != "" { if *bindIP != "" {
clientOptions = append(clientOptions, peerpkg.WithBindIP(*bindIP)) clientOptions = append(clientOptions, peerpkg.WithBindIP(*bindIP))
} }

View File

@@ -17,9 +17,16 @@ func main() {
bindDevice := flag.String("bind-device", "", "optional Linux network device used when listening") bindDevice := flag.String("bind-device", "", "optional Linux network device used when listening")
logPath := flag.String("latency-log", "", "optional JSONL file path for latency timestamp logs") logPath := flag.String("latency-log", "", "optional JSONL file path for latency timestamp logs")
kcpTimestampDebugLogPath := flag.String("kcp-ts-debug-log", "", "optional JSONL file path for KCP packet kernel timestamp debug records") kcpTimestampDebugLogPath := flag.String("kcp-ts-debug-log", "", "optional JSONL file path for KCP packet kernel timestamp debug records")
kcpSessionStatsLogPath := flag.String("kcp-session-stats-log", "", "optional JSONL file path for KCP session stats records")
kcpSessionStatsInterval := flag.String("kcp-session-stats-interval", transport.DefaultKCPSessionStatsInterval.String(), "sampling interval for KCP session stats, for example 100ms")
flag.Parse() flag.Parse()
hubOptions := make([]server.KCPOption, 0, 1) statsInterval, err := transport.ParseKCPSessionStatsInterval(*kcpSessionStatsInterval)
if err != nil {
log.Fatalf("parse -kcp-session-stats-interval=%q: %v", *kcpSessionStatsInterval, err)
}
hubOptions := make([]server.KCPOption, 0, 2)
if *logPath != "" { if *logPath != "" {
logger, err := latencylog.NewJSONLLogger(*logPath) logger, err := latencylog.NewJSONLLogger(*logPath)
if err != nil { if err != nil {
@@ -38,6 +45,14 @@ func main() {
defer logger.Close() defer logger.Close()
packetLogger = logger packetLogger = logger
} }
if *kcpSessionStatsLogPath != "" {
logger, err := transport.NewJSONLKCPSessionStatsLogger(*kcpSessionStatsLogPath)
if err != nil {
log.Fatalf("create kcp session stats logger %s: %v", *kcpSessionStatsLogPath, err)
}
defer logger.Close()
hubOptions = append(hubOptions, server.WithKCPSessionStatsLogger(logger, statsInterval))
}
listener, packetConn, err := transport.ListenKCPSessions(*listenAddr, *bindDevice, packetLogger, latencylog.NodeRoleServer, "hub") listener, packetConn, err := transport.ListenKCPSessions(*listenAddr, *bindDevice, packetLogger, latencylog.NodeRoleServer, "hub")
if err != nil { if err != nil {