diff --git a/.claude/settings.local.json b/.claude/settings.local.json index 3a6bd95..7db5d03 100644 --- a/.claude/settings.local.json +++ b/.claude/settings.local.json @@ -3,7 +3,12 @@ "allow": [ "Bash(go vet:*)", "Bash(go build:*)", - "Bash(go test:*)" + "Bash(go test:*)", + "Bash(xargs grep:*)", + "Bash(find /c/Users/64187/Desktop/Workspace/OmniSocketGo -type f -name *.go)", + "Bash(git status:*)", + "Bash(git fetch:*)", + "Bash(git pull:*)" ] } } diff --git a/cmd/internal/peer/client.go b/cmd/internal/peer/client.go index 814b7a7..2500667 100644 --- a/cmd/internal/peer/client.go +++ b/cmd/internal/peer/client.go @@ -7,6 +7,7 @@ import ( "path/filepath" "sync/atomic" "syscall" + "time" "omnisocketgo/cmd/internal/latencylog" "omnisocketgo/cmd/internal/protocol" @@ -16,11 +17,13 @@ import ( var dialServer = dialServerWithOptions type clientOptions struct { - logger latencylog.Logger - txTimestampDebugLogger transport.TXTimestampDebugLogger - kcpPacketDebugLogger transport.KCPPacketDebugLogger - bindIP string - bindDevice string + logger latencylog.Logger + txTimestampDebugLogger transport.TXTimestampDebugLogger + kcpPacketDebugLogger transport.KCPPacketDebugLogger + kcpSessionStatsLogger transport.KCPSessionStatsLogger + kcpSessionStatsInterval time.Duration + bindIP string + bindDevice string } // Option 用于配置 Client 的可选行为,例如时延日志。 @@ -47,6 +50,14 @@ func WithKCPPacketDebugLogger(logger transport.KCPPacketDebugLogger) Option { } } +// WithKCPSessionStatsLogger 为 KCP 会话统计日志注入记录器与采样间隔。 +func WithKCPSessionStatsLogger(logger transport.KCPSessionStatsLogger, interval time.Duration) Option { + return func(options *clientOptions) { + options.kcpSessionStatsLogger = logger + options.kcpSessionStatsInterval = interval + } +} + // WithBindIP 指定拨号时使用的本地源 IP。 func WithBindIP(ip string) Option { return func(options *clientOptions) { diff --git a/cmd/internal/peer/kcp_client.go b/cmd/internal/peer/kcp_client.go index 3433446..130294e 100644 --- a/cmd/internal/peer/kcp_client.go +++ b/cmd/internal/peer/kcp_client.go @@ -47,6 +47,7 @@ func DialKCP(serverAddr, peerID string, opts ...Option) (*KCPClient, error) { conn, err := transport.NewKCPConn( session, transport.WithKCPLogger(options.logger, latencylog.NodeRolePeer, peerID), + transport.WithKCPSessionStatsLogger(options.kcpSessionStatsLogger, options.kcpSessionStatsInterval), ) if err != nil { _ = session.Close() diff --git a/cmd/internal/server/kcp_hub.go b/cmd/internal/server/kcp_hub.go index 789c6a7..96f7b1d 100644 --- a/cmd/internal/server/kcp_hub.go +++ b/cmd/internal/server/kcp_hub.go @@ -3,6 +3,7 @@ package server import ( "fmt" "sync" + "time" kcp "github.com/xtaci/kcp-go/v5" @@ -21,11 +22,21 @@ func WithKCPLogger(logger latencylog.Logger) KCPOption { } } +// WithKCPSessionStatsLogger 为 KCP hub 注入会话统计日志器。 +func WithKCPSessionStatsLogger(logger transport.KCPSessionStatsLogger, interval time.Duration) KCPOption { + return func(hub *KCPHub) { + hub.sessionStatsLogger = logger + hub.sessionStatsInterval = interval + } +} + // KCPHub 管理已注册 peer 的 KCP 会话,并负责在它们之间转发消息。 type KCPHub struct { - mu sync.RWMutex - peers map[string]*transport.KCPConn - logger latencylog.Logger + mu sync.RWMutex + peers map[string]*transport.KCPConn + logger latencylog.Logger + sessionStatsLogger transport.KCPSessionStatsLogger + sessionStatsInterval time.Duration } // NewKCPHub 创建一个空的 KCP 连接中心。 @@ -57,6 +68,7 @@ func (h *KCPHub) ServeSession(session *kcp.UDPSession) error { conn, err := transport.NewKCPConn( session, transport.WithKCPLogger(h.logger, latencylog.NodeRoleServer, "hub"), + transport.WithKCPSessionStatsLogger(h.sessionStatsLogger, h.sessionStatsInterval), ) if err != nil { _ = session.Close() diff --git a/cmd/internal/transport/kcp.go b/cmd/internal/transport/kcp.go index 0b1c4a2..f88a525 100644 --- a/cmd/internal/transport/kcp.go +++ b/cmd/internal/transport/kcp.go @@ -7,6 +7,7 @@ import ( "fmt" "net" "sync" + "time" kcp "github.com/xtaci/kcp-go/v5" @@ -31,6 +32,10 @@ type KCPConn struct { nodeRole string nodeID string + sessionStatsLogger KCPSessionStatsLogger + sessionStatsInterval time.Duration + sessionStatsSampler *kcpSessionStatsSampler + writeMu sync.Mutex closeOnce sync.Once closeErr error @@ -48,6 +53,14 @@ func WithKCPLogger(logger latencylog.Logger, nodeRole, nodeID string) KCPOption } } +// WithKCPSessionStatsLogger 为 KCP 连接注入会话级与进程级统计日志器。 +func WithKCPSessionStatsLogger(logger KCPSessionStatsLogger, interval time.Duration) KCPOption { + return func(conn *KCPConn) { + conn.sessionStatsLogger = logger + conn.sessionStatsInterval = interval + } +} + // NewKCPConn 用已有的 KCP 会话创建 transport 连接封装。 func NewKCPConn(session *kcp.UDPSession, opts ...KCPOption) (*KCPConn, error) { if session == nil { @@ -66,6 +79,7 @@ func NewKCPConn(session *kcp.UDPSession, opts ...KCPOption) (*KCPConn, error) { } configureKCPSession(session) + conn.sessionStatsSampler = newKCPSessionStatsSampler(session, conn.sessionStatsLogger, conn.nodeRole, conn.nodeID, conn.sessionStatsInterval) return conn, nil } @@ -75,10 +89,16 @@ func (c *KCPConn) Send(msg protocol.Message) error { defer c.writeMu.Unlock() latencylog.LogMessageEvent(c.logger, c.nodeRole, c.nodeID, latencylog.EventSendHandoffBegin, msg) + if c.sessionStatsSampler != nil { + c.sessionStatsSampler.SampleEvent(kcpStatsSampleReasonSendHandoffBegin) + } if err := protocol.WriteMessage(c.session, msg); err != nil { return fmt.Errorf("transport: kcp send message: %w", err) } latencylog.LogMessageEvent(c.logger, c.nodeRole, c.nodeID, latencylog.EventSendHandoffEnd, msg) + if c.sessionStatsSampler != nil { + c.sessionStatsSampler.SampleEvent(kcpStatsSampleReasonSendHandoffEnd) + } return nil } @@ -88,6 +108,9 @@ func (c *KCPConn) Receive() (protocol.Message, error) { if err != nil { return protocol.Message{}, fmt.Errorf("transport: kcp receive message: %w", err) } + if c.sessionStatsSampler != nil { + c.sessionStatsSampler.SampleEvent(kcpStatsSampleReasonReceive) + } return msg, nil } @@ -110,6 +133,9 @@ func (c *KCPConn) ReceiveLoop(handler func(protocol.Message) error) error { // Close 关闭底层 KCP 会话,并保证重复调用是安全的。 func (c *KCPConn) Close() error { c.closeOnce.Do(func() { + if c.sessionStatsSampler != nil { + c.sessionStatsSampler.Close() + } c.closeErr = c.session.Close() }) return c.closeErr diff --git a/cmd/internal/transport/kcp_linux_test.go b/cmd/internal/transport/kcp_linux_test.go index 2948aaf..ec1efdd 100644 --- a/cmd/internal/transport/kcp_linux_test.go +++ b/cmd/internal/transport/kcp_linux_test.go @@ -77,6 +77,9 @@ func assertKCPPacketRecord(t *testing.T, records []KCPPacketDebugRecord, wantEve if record.KCPConv == nil { t.Fatalf("record %s missing kcp_conv: %+v", wantEvent, record) } + if len(record.Segments) == 0 { + t.Fatalf("record %s missing parsed segments: %+v", wantEvent, record) + } if wantUDPTXID && record.UDPTXID == nil { t.Fatalf("record %s missing udp_tx_id: %+v", wantEvent, record) } diff --git a/cmd/internal/transport/kcp_packet_conn.go b/cmd/internal/transport/kcp_packet_conn.go index 3f64225..68908f0 100644 --- a/cmd/internal/transport/kcp_packet_conn.go +++ b/cmd/internal/transport/kcp_packet_conn.go @@ -1,7 +1,6 @@ package transport import ( - "encoding/binary" "net" "sync" "time" @@ -61,7 +60,7 @@ func (c *kcpPacketConnBase) logKCPPacketDebugRecord(record KCPPacketDebugRecord) _ = c.logger.LogKCPPacketDebugRecord(record) } -func (c *kcpPacketConnBase) newKCPPacketDebugRecord(event string, remoteAddr net.Addr, packetBytes int, tsUnixNano int64, udpTxID *uint32, kcpConv *uint32) KCPPacketDebugRecord { +func (c *kcpPacketConnBase) newKCPPacketDebugRecord(event string, remoteAddr net.Addr, packetBytes int, tsUnixNano int64, udpTxID *uint32, kcpConv *uint32, segments []KCPPacketDebugSegment) KCPPacketDebugRecord { record := KCPPacketDebugRecord{ Event: event, NodeRole: c.nodeRole, @@ -71,6 +70,7 @@ func (c *kcpPacketConnBase) newKCPPacketDebugRecord(event string, remoteAddr net PacketBytes: packetBytes, UDPTXID: udpTxID, KCPConv: kcpConv, + Segments: append([]KCPPacketDebugSegment(nil), segments...), TSUnixNano: tsUnixNano, } if localAddr := c.conn.LocalAddr(); localAddr != nil { @@ -81,11 +81,3 @@ func (c *kcpPacketConnBase) newKCPPacketDebugRecord(event string, remoteAddr net } return record } - -func parseKCPConversationID(packet []byte) *uint32 { - if len(packet) < 4 { - return nil - } - conv := binary.LittleEndian.Uint32(packet[:4]) - return &conv -} diff --git a/cmd/internal/transport/kcp_packet_conn_linux.go b/cmd/internal/transport/kcp_packet_conn_linux.go index 868c3ec..91b8a75 100644 --- a/cmd/internal/transport/kcp_packet_conn_linux.go +++ b/cmd/internal/transport/kcp_packet_conn_linux.go @@ -17,6 +17,7 @@ type kcpPendingPacketDebug struct { remoteAddr net.Addr packetBytes int kcpConv *uint32 + segments []KCPPacketDebugSegment timestamps map[string]int64 } @@ -75,13 +76,15 @@ func (c *platformKCPPacketConn) ReadFrom(p []byte) (int, net.Addr, error) { } if rxTimestamp > 0 { + kcpConv, segments := parseKCPPacketMetadata(p[:n]) c.logKCPPacketDebugRecord(c.newKCPPacketDebugRecord( latencylog.EventBRXSoftware, addr, n, rxTimestamp, nil, - parseKCPConversationID(p[:n]), + kcpConv, + segments, )) } @@ -102,7 +105,11 @@ func (c *platformKCPPacketConn) WriteTo(p []byte, addr net.Addr) (int, error) { return 0, fmt.Errorf("transport: kcp packet write target must be UDPAddr, got %T", addr) } - expectedTXID := c.nextExpectedTXID() + // Reserve the local txID before the send so an immediately-arriving errqueue + // event can still find its pending record. If the send never succeeds, roll + // the reservation back to keep the local txID mirror aligned with the kernel. + kcpConv, segments := parseKCPPacketMetadata(p) + expectedTXID := c.reservePendingTX(udpAddr, len(p), kcpConv, segments) for { err := c.sendmsgRaw(p, udpAddr) if err != nil { @@ -110,10 +117,10 @@ func (c *platformKCPPacketConn) WriteTo(p []byte, addr net.Addr) (int, error) { time.Sleep(linuxDataPollInterval) continue } + c.rollbackPendingTX(expectedTXID) return 0, err } - c.storePendingTX(expectedTXID, udpAddr, len(p), parseKCPConversationID(p)) return len(p), nil } } @@ -247,6 +254,7 @@ func (c *platformKCPPacketConn) collectTXErrqueueLoop() { event.TSUnixNano, &udpTxID, record.kcpConv, + record.segments, )) if complete { @@ -281,25 +289,31 @@ func (c *platformKCPPacketConn) recvTXErrqueueOnce() (txTimestampEvent, error) { return event, nil } -func (c *platformKCPPacketConn) nextExpectedTXID() uint32 { +func (c *platformKCPPacketConn) reservePendingTX(remoteAddr net.Addr, packetBytes int, kcpConv *uint32, segments []KCPPacketDebugSegment) uint32 { c.pendingMu.Lock() defer c.pendingMu.Unlock() - next := c.nextTXID + txID := c.nextTXID c.nextTXID++ - return next -} - -func (c *platformKCPPacketConn) storePendingTX(txID uint32, remoteAddr net.Addr, packetBytes int, kcpConv *uint32) { - c.pendingMu.Lock() - defer c.pendingMu.Unlock() - c.pendingTX[txID] = kcpPendingPacketDebug{ remoteAddr: remoteAddr, packetBytes: packetBytes, kcpConv: kcpConv, + segments: append([]KCPPacketDebugSegment(nil), segments...), timestamps: make(map[string]int64, 2), } + + return txID +} + +func (c *platformKCPPacketConn) rollbackPendingTX(txID uint32) { + c.pendingMu.Lock() + defer c.pendingMu.Unlock() + + delete(c.pendingTX, txID) + if c.nextTXID == txID+1 { + c.nextTXID = txID + } } func (c *platformKCPPacketConn) recordPendingTXEvent(event txTimestampEvent) (*kcpPendingPacketDebug, bool) { diff --git a/cmd/internal/transport/kcp_packet_conn_linux_test.go b/cmd/internal/transport/kcp_packet_conn_linux_test.go new file mode 100644 index 0000000..d66825c --- /dev/null +++ b/cmd/internal/transport/kcp_packet_conn_linux_test.go @@ -0,0 +1,62 @@ +//go:build linux + +package transport + +import ( + "net" + "testing" +) + +func TestKCPPendingTXReservationRollbackRestoresSequence(t *testing.T) { + conn := &platformKCPPacketConn{ + kcpPacketConnBase: &kcpPacketConnBase{ + closed: make(chan struct{}), + }, + pendingTX: make(map[uint32]kcpPendingPacketDebug), + } + + conv := uint32(42) + txID := conn.reservePendingTX(&net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 9000}, 128, &conv, nil) + if txID != 0 { + t.Fatalf("reservePendingTX() txID = %d, want 0", txID) + } + if conn.nextTXID != 1 { + t.Fatalf("nextTXID after reserve = %d, want 1", conn.nextTXID) + } + if _, ok := conn.pendingTX[txID]; !ok { + t.Fatal("pendingTX missing reserved record") + } + + conn.rollbackPendingTX(txID) + + if conn.nextTXID != 0 { + t.Fatalf("nextTXID after rollback = %d, want 0", conn.nextTXID) + } + if _, ok := conn.pendingTX[txID]; ok { + t.Fatal("pendingTX still contains rolled back record") + } +} + +func TestKCPPendingTXReservationPreservesLaterSequenceOnOutOfOrderRollback(t *testing.T) { + conn := &platformKCPPacketConn{ + kcpPacketConnBase: &kcpPacketConnBase{ + closed: make(chan struct{}), + }, + pendingTX: make(map[uint32]kcpPendingPacketDebug), + } + + first := conn.reservePendingTX(&net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 9000}, 64, nil, nil) + second := conn.reservePendingTX(&net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 9001}, 64, nil, nil) + if first != 0 || second != 1 { + t.Fatalf("reserved tx IDs = %d,%d, want 0,1", first, second) + } + + conn.rollbackPendingTX(first) + + if conn.nextTXID != 2 { + t.Fatalf("nextTXID after out-of-order rollback = %d, want 2", conn.nextTXID) + } + if _, ok := conn.pendingTX[second]; !ok { + t.Fatal("pendingTX lost later reservation after rollback") + } +} diff --git a/cmd/internal/transport/kcp_packet_debug.go b/cmd/internal/transport/kcp_packet_debug.go index 3fce529..cfc289b 100644 --- a/cmd/internal/transport/kcp_packet_debug.go +++ b/cmd/internal/transport/kcp_packet_debug.go @@ -10,15 +10,26 @@ import ( // KCPPacketDebugRecord 是 KCP 底层 UDP packet kernel timestamp 的一条 JSONL 调试记录。 type KCPPacketDebugRecord struct { - Event string `json:"event"` - NodeRole string `json:"node_role,omitempty"` - NodeID string `json:"node_id,omitempty"` - LocalAddr string `json:"local_addr,omitempty"` - RemoteAddr string `json:"remote_addr,omitempty"` - PacketBytes int `json:"packet_bytes"` - UDPTXID *uint32 `json:"udp_tx_id,omitempty"` - KCPConv *uint32 `json:"kcp_conv,omitempty"` - TSUnixNano int64 `json:"ts_unix_nano"` + Event string `json:"event"` + NodeRole string `json:"node_role,omitempty"` + NodeID string `json:"node_id,omitempty"` + LocalAddr string `json:"local_addr,omitempty"` + RemoteAddr string `json:"remote_addr,omitempty"` + PacketBytes int `json:"packet_bytes"` + UDPTXID *uint32 `json:"udp_tx_id,omitempty"` + KCPConv *uint32 `json:"kcp_conv,omitempty"` + Segments []KCPPacketDebugSegment `json:"segments,omitempty"` + TSUnixNano int64 `json:"ts_unix_nano"` +} + +// KCPPacketDebugSegment 是一个 UDP datagram 中解析出的 KCP segment 头信息。 +type KCPPacketDebugSegment struct { + Cmd uint8 `json:"cmd"` + SN uint32 `json:"sn"` + UNA uint32 `json:"una"` + Frg uint8 `json:"frg"` + Wnd uint16 `json:"wnd"` + Len uint32 `json:"len"` } // KCPPacketDebugLogger 接收 KCP packet 级调试记录。 diff --git a/cmd/internal/transport/kcp_packet_metadata.go b/cmd/internal/transport/kcp_packet_metadata.go new file mode 100644 index 0000000..7b50adc --- /dev/null +++ b/cmd/internal/transport/kcp_packet_metadata.go @@ -0,0 +1,55 @@ +package transport + +import "encoding/binary" + +const kcpPacketHeaderSize = 24 + +func parseKCPPacketMetadata(packet []byte) (*uint32, []KCPPacketDebugSegment) { + conv := parseKCPConversationID(packet) + segments, ok := parseKCPPacketSegments(packet) + if !ok { + return conv, nil + } + return conv, segments +} + +func parseKCPConversationID(packet []byte) *uint32 { + if len(packet) < 4 { + return nil + } + conv := binary.LittleEndian.Uint32(packet[:4]) + return &conv +} + +func parseKCPPacketSegments(packet []byte) ([]KCPPacketDebugSegment, bool) { + if len(packet) == 0 { + return nil, false + } + + data := packet + segments := make([]KCPPacketDebugSegment, 0, 1) + for len(data) > 0 { + if len(data) < kcpPacketHeaderSize { + return nil, false + } + + segmentLen := binary.LittleEndian.Uint32(data[20:]) + totalLen := kcpPacketHeaderSize + int(segmentLen) + if len(data) < totalLen { + return nil, false + } + + segments = append(segments, KCPPacketDebugSegment{ + Cmd: data[4], + Frg: data[5], + Wnd: binary.LittleEndian.Uint16(data[6:8]), + SN: binary.LittleEndian.Uint32(data[12:16]), + UNA: binary.LittleEndian.Uint32(data[16:20]), + Len: segmentLen, + }) + + data = data[totalLen:] + } + + return segments, true +} diff --git a/cmd/internal/transport/kcp_session_stats.go b/cmd/internal/transport/kcp_session_stats.go new file mode 100644 index 0000000..ca4311d --- /dev/null +++ b/cmd/internal/transport/kcp_session_stats.go @@ -0,0 +1,444 @@ +package transport + +import ( + "encoding/json" + "fmt" + "net" + "os" + "path/filepath" + "reflect" + "sync" + "time" + + kcp "github.com/xtaci/kcp-go/v5" +) + +const DefaultKCPSessionStatsInterval = 100 * time.Millisecond + +const ( + kcpSessionStatsRecordTypeSessionSample = "session_sample" + kcpSessionStatsRecordTypeProcessSNMPSample = "process_snmp_sample" + + kcpStatsSampleReasonPeriodic = "periodic" + kcpStatsSampleReasonSendHandoffBegin = "send_handoff_begin" + kcpStatsSampleReasonSendHandoffEnd = "send_handoff_end" + kcpStatsSampleReasonReceive = "receive" + kcpStatsSampleReasonClose = "close" +) + +// KCPSessionStatsRecord is a JSONL record for KCP session and process stats. +type KCPSessionStatsRecord struct { + RecordType string `json:"record_type"` + NodeRole string `json:"node_role,omitempty"` + NodeID string `json:"node_id,omitempty"` + LocalAddr string `json:"local_addr,omitempty"` + RemoteAddr string `json:"remote_addr,omitempty"` + Conv *uint32 `json:"conv,omitempty"` + TSUnixNano int64 `json:"ts_unix_nano"` + SampleReason string `json:"sample_reason"` + + RTOMillis *uint32 `json:"rto_ms,omitempty"` + SRTTMillis *int32 `json:"srtt_ms,omitempty"` + SRTTVarMillis *int32 `json:"srttvar_ms,omitempty"` + + BytesSent *uint64 `json:"bytes_sent,omitempty"` + BytesReceived *uint64 `json:"bytes_received,omitempty"` + InPkts *uint64 `json:"in_pkts,omitempty"` + OutPkts *uint64 `json:"out_pkts,omitempty"` + InSegs *uint64 `json:"in_segs,omitempty"` + OutSegs *uint64 `json:"out_segs,omitempty"` + RetransSegs *uint64 `json:"retrans_segs,omitempty"` + FastRetransSegs *uint64 `json:"fast_retrans_segs,omitempty"` + EarlyRetransSegs *uint64 `json:"early_retrans_segs,omitempty"` + LostSegs *uint64 `json:"lost_segs,omitempty"` + RepeatSegs *uint64 `json:"repeat_segs,omitempty"` + InErrs *uint64 `json:"in_errs,omitempty"` + KCPInErrs *uint64 `json:"kcp_in_errs,omitempty"` + + RingBufferSndQueue *uint64 `json:"ring_buffer_snd_queue,omitempty"` + RingBufferRcvQueue *uint64 `json:"ring_buffer_rcv_queue,omitempty"` + RingBufferSndBuffer *uint64 `json:"ring_buffer_snd_buffer,omitempty"` + CurrEstab *uint64 `json:"curr_estab,omitempty"` +} + +// KCPSessionStatsLogger receives KCP session stats records. +type KCPSessionStatsLogger interface { + LogKCPSessionStatsRecord(record KCPSessionStatsRecord) error +} + +// JSONLKCPSessionStatsLogger appends KCP session stats as JSONL. +type JSONLKCPSessionStatsLogger struct { + mu sync.Mutex + closeOnce sync.Once + closeErr error + file *os.File +} + +// NewJSONLKCPSessionStatsLogger creates a thread-safe JSONL stats logger. +func NewJSONLKCPSessionStatsLogger(path string) (*JSONLKCPSessionStatsLogger, error) { + dir := filepath.Dir(path) + if err := os.MkdirAll(dir, 0o755); err != nil { + return nil, fmt.Errorf("transport: create kcp session stats log dir %s: %w", dir, err) + } + + file, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0o644) + if err != nil { + return nil, fmt.Errorf("transport: open kcp session stats log %s: %w", path, err) + } + + return &JSONLKCPSessionStatsLogger{file: file}, nil +} + +// LogKCPSessionStatsRecord appends one JSONL record. +func (l *JSONLKCPSessionStatsLogger) LogKCPSessionStatsRecord(record KCPSessionStatsRecord) error { + line, err := json.Marshal(record) + if err != nil { + return err + } + + l.mu.Lock() + defer l.mu.Unlock() + + if _, err := l.file.Write(append(line, '\n')); err != nil { + return err + } + return nil +} + +// Close closes the underlying file. +func (l *JSONLKCPSessionStatsLogger) Close() error { + l.closeOnce.Do(func() { + l.closeErr = l.file.Close() + }) + return l.closeErr +} + +// ParseKCPSessionStatsInterval parses a duration string for stats sampling. +func ParseKCPSessionStatsInterval(raw string) (time.Duration, error) { + if raw == "" { + return DefaultKCPSessionStatsInterval, nil + } + + interval, err := time.ParseDuration(raw) + if err != nil { + return 0, fmt.Errorf("transport: parse kcp session stats interval %q: %w", raw, err) + } + if interval <= 0 { + return 0, fmt.Errorf("transport: kcp session stats interval must be greater than zero") + } + return interval, nil +} + +type kcpSessionStatsSource interface { + GetConv() uint32 + GetRTO() uint32 + GetSRTT() int32 + GetSRTTVar() int32 + LocalAddr() net.Addr + RemoteAddr() net.Addr +} + +type kcpSessionStatsSampler struct { + source kcpSessionStatsSource + logger KCPSessionStatsLogger + nodeRole string + nodeID string + interval time.Duration + processSampler *kcpProcessStatsSampler + + sampleMu sync.Mutex + stopOnce sync.Once + stopCh chan struct{} + stoppedCh chan struct{} +} + +func newKCPSessionStatsSampler(source kcpSessionStatsSource, logger KCPSessionStatsLogger, nodeRole, nodeID string, interval time.Duration) *kcpSessionStatsSampler { + if source == nil || logger == nil { + return nil + } + if interval <= 0 { + interval = DefaultKCPSessionStatsInterval + } + + sampler := &kcpSessionStatsSampler{ + source: source, + logger: logger, + nodeRole: nodeRole, + nodeID: nodeID, + interval: interval, + processSampler: acquireKCPProcessStatsSampler(logger, nodeRole, nodeID, interval), + stopCh: make(chan struct{}), + stoppedCh: make(chan struct{}), + } + + go sampler.run() + return sampler +} + +func (s *kcpSessionStatsSampler) run() { + ticker := time.NewTicker(s.interval) + defer ticker.Stop() + defer close(s.stoppedCh) + + for { + select { + case <-ticker.C: + s.logSessionSample(kcpStatsSampleReasonPeriodic) + case <-s.stopCh: + return + } + } +} + +func (s *kcpSessionStatsSampler) SampleEvent(reason string) { + if s == nil { + return + } + + s.logSessionSample(reason) + if s.processSampler == nil { + return + } + + if reason == kcpStatsSampleReasonClose { + s.processSampler.requestSampleAndWait(reason) + return + } + s.processSampler.requestSample(reason) +} + +func (s *kcpSessionStatsSampler) Close() { + if s == nil { + return + } + + s.stopOnce.Do(func() { + s.SampleEvent(kcpStatsSampleReasonClose) + close(s.stopCh) + <-s.stoppedCh + releaseKCPProcessStatsSampler(s.processSampler) + }) +} + +func (s *kcpSessionStatsSampler) logSessionSample(reason string) { + s.sampleMu.Lock() + defer s.sampleMu.Unlock() + + conv := s.source.GetConv() + rto := s.source.GetRTO() + srtt := s.source.GetSRTT() + srttVar := s.source.GetSRTTVar() + + record := KCPSessionStatsRecord{ + RecordType: kcpSessionStatsRecordTypeSessionSample, + NodeRole: s.nodeRole, + NodeID: s.nodeID, + LocalAddr: addrString(s.source.LocalAddr()), + RemoteAddr: addrString(s.source.RemoteAddr()), + Conv: uint32Ptr(conv), + TSUnixNano: time.Now().UTC().UnixNano(), + SampleReason: reason, + RTOMillis: uint32Ptr(rto), + SRTTMillis: int32Ptr(srtt), + SRTTVarMillis: int32Ptr(srttVar), + } + + _ = s.logger.LogKCPSessionStatsRecord(record) +} + +type kcpProcessSampleRequest struct { + reason string + done chan struct{} +} + +type kcpProcessStatsSampler struct { + key string + logger KCPSessionStatsLogger + nodeRole string + nodeID string + interval time.Duration + requestCh chan kcpProcessSampleRequest + stopCh chan struct{} + stoppedCh chan struct{} + + sampleMu sync.Mutex + previous *kcp.Snmp + refCount int +} + +var ( + kcpProcessSamplersMu sync.Mutex + kcpProcessSamplers = make(map[string]*kcpProcessStatsSampler) +) + +func acquireKCPProcessStatsSampler(logger KCPSessionStatsLogger, nodeRole, nodeID string, interval time.Duration) *kcpProcessStatsSampler { + if logger == nil { + return nil + } + if interval <= 0 { + interval = DefaultKCPSessionStatsInterval + } + + key := fmt.Sprintf("%s|%s|%s|%d", kcpStatsLoggerIdentity(logger), nodeRole, nodeID, interval) + + kcpProcessSamplersMu.Lock() + defer kcpProcessSamplersMu.Unlock() + + if sampler, ok := kcpProcessSamplers[key]; ok { + sampler.refCount++ + return sampler + } + + sampler := &kcpProcessStatsSampler{ + key: key, + logger: logger, + nodeRole: nodeRole, + nodeID: nodeID, + interval: interval, + requestCh: make(chan kcpProcessSampleRequest, 1), + stopCh: make(chan struct{}), + stoppedCh: make(chan struct{}), + previous: kcp.DefaultSnmp.Copy(), + refCount: 1, + } + kcpProcessSamplers[key] = sampler + go sampler.run() + return sampler +} + +func releaseKCPProcessStatsSampler(sampler *kcpProcessStatsSampler) { + if sampler == nil { + return + } + + kcpProcessSamplersMu.Lock() + sampler.refCount-- + if sampler.refCount > 0 { + kcpProcessSamplersMu.Unlock() + return + } + delete(kcpProcessSamplers, sampler.key) + close(sampler.stopCh) + kcpProcessSamplersMu.Unlock() + + <-sampler.stoppedCh +} + +func (s *kcpProcessStatsSampler) run() { + ticker := time.NewTicker(s.interval) + defer ticker.Stop() + defer close(s.stoppedCh) + + for { + select { + case <-ticker.C: + s.logProcessSample(kcpStatsSampleReasonPeriodic) + case req := <-s.requestCh: + s.logProcessSample(req.reason) + if req.done != nil { + close(req.done) + } + case <-s.stopCh: + return + } + } +} + +func (s *kcpProcessStatsSampler) requestSample(reason string) { + if s == nil { + return + } + + select { + case s.requestCh <- kcpProcessSampleRequest{reason: reason}: + default: + } +} + +func (s *kcpProcessStatsSampler) requestSampleAndWait(reason string) { + if s == nil { + return + } + + done := make(chan struct{}) + select { + case s.requestCh <- kcpProcessSampleRequest{reason: reason, done: done}: + <-done + default: + s.logProcessSample(reason) + } +} + +func (s *kcpProcessStatsSampler) logProcessSample(reason string) { + s.sampleMu.Lock() + defer s.sampleMu.Unlock() + + current := kcp.DefaultSnmp.Copy() + record := newKCPProcessSNMPSampleRecord(s.nodeRole, s.nodeID, reason, s.previous, current) + s.previous = current + + _ = s.logger.LogKCPSessionStatsRecord(record) +} + +func newKCPProcessSNMPSampleRecord(nodeRole, nodeID, reason string, previous, current *kcp.Snmp) KCPSessionStatsRecord { + return KCPSessionStatsRecord{ + RecordType: kcpSessionStatsRecordTypeProcessSNMPSample, + NodeRole: nodeRole, + NodeID: nodeID, + TSUnixNano: time.Now().UTC().UnixNano(), + SampleReason: reason, + BytesSent: uint64Ptr(diffUint64(previous.BytesSent, current.BytesSent)), + BytesReceived: uint64Ptr(diffUint64(previous.BytesReceived, current.BytesReceived)), + InPkts: uint64Ptr(diffUint64(previous.InPkts, current.InPkts)), + OutPkts: uint64Ptr(diffUint64(previous.OutPkts, current.OutPkts)), + InSegs: uint64Ptr(diffUint64(previous.InSegs, current.InSegs)), + OutSegs: uint64Ptr(diffUint64(previous.OutSegs, current.OutSegs)), + RetransSegs: uint64Ptr(diffUint64(previous.RetransSegs, current.RetransSegs)), + FastRetransSegs: uint64Ptr(diffUint64(previous.FastRetransSegs, current.FastRetransSegs)), + EarlyRetransSegs: uint64Ptr(diffUint64(previous.EarlyRetransSegs, current.EarlyRetransSegs)), + LostSegs: uint64Ptr(diffUint64(previous.LostSegs, current.LostSegs)), + RepeatSegs: uint64Ptr(diffUint64(previous.RepeatSegs, current.RepeatSegs)), + InErrs: uint64Ptr(diffUint64(previous.InErrs, current.InErrs)), + KCPInErrs: uint64Ptr(diffUint64(previous.KCPInErrors, current.KCPInErrors)), + RingBufferSndQueue: uint64Ptr(current.RingBufferSndQueue), + RingBufferRcvQueue: uint64Ptr(current.RingBufferRcvQueue), + RingBufferSndBuffer: uint64Ptr(current.RingBufferSndBuffer), + CurrEstab: uint64Ptr(current.CurrEstab), + } +} + +func kcpStatsLoggerIdentity(logger KCPSessionStatsLogger) string { + value := reflect.ValueOf(logger) + switch value.Kind() { + case reflect.Pointer, reflect.Map, reflect.Func, reflect.Slice, reflect.Chan, reflect.UnsafePointer: + return fmt.Sprintf("%T:%x", logger, value.Pointer()) + default: + return fmt.Sprintf("%T:%v", logger, logger) + } +} + +func diffUint64(previous, current uint64) uint64 { + if current < previous { + return 0 + } + return current - previous +} + +func addrString(addr net.Addr) string { + if addr == nil { + return "" + } + return addr.String() +} + +func uint32Ptr(value uint32) *uint32 { + return &value +} + +func uint64Ptr(value uint64) *uint64 { + return &value +} + +func int32Ptr(value int32) *int32 { + return &value +} diff --git a/cmd/internal/transport/kcp_session_stats_test.go b/cmd/internal/transport/kcp_session_stats_test.go new file mode 100644 index 0000000..c542172 --- /dev/null +++ b/cmd/internal/transport/kcp_session_stats_test.go @@ -0,0 +1,409 @@ +package transport + +import ( + "encoding/binary" + "net" + "sync" + "testing" + "time" + + kcp "github.com/xtaci/kcp-go/v5" + + "omnisocketgo/cmd/internal/latencylog" + "omnisocketgo/cmd/internal/protocol" +) + +type recordingKCPSessionStatsLogger struct { + mu sync.Mutex + records []KCPSessionStatsRecord +} + +func (l *recordingKCPSessionStatsLogger) LogKCPSessionStatsRecord(record KCPSessionStatsRecord) error { + l.mu.Lock() + defer l.mu.Unlock() + + l.records = append(l.records, record) + return nil +} + +func (l *recordingKCPSessionStatsLogger) Records() []KCPSessionStatsRecord { + l.mu.Lock() + defer l.mu.Unlock() + + return append([]KCPSessionStatsRecord(nil), l.records...) +} + +type stubKCPSessionStatsSource struct { + conv uint32 + rto uint32 + srtt int32 + srttVar int32 + local net.Addr + remote net.Addr +} + +func (s stubKCPSessionStatsSource) GetConv() uint32 { return s.conv } +func (s stubKCPSessionStatsSource) GetRTO() uint32 { return s.rto } +func (s stubKCPSessionStatsSource) GetSRTT() int32 { return s.srtt } +func (s stubKCPSessionStatsSource) GetSRTTVar() int32 { return s.srttVar } +func (s stubKCPSessionStatsSource) LocalAddr() net.Addr { return s.local } +func (s stubKCPSessionStatsSource) RemoteAddr() net.Addr { return s.remote } + +func TestParseKCPPacketMetadataSingleSegment(t *testing.T) { + packet := buildTestKCPDatagram(42, []testKCPSegment{ + {cmd: 81, sn: 7, una: 3, frg: 1, wnd: 128, length: 5}, + }) + + conv, segments := parseKCPPacketMetadata(packet) + if conv == nil || *conv != 42 { + t.Fatalf("conv = %v, want 42", conv) + } + if len(segments) != 1 { + t.Fatalf("segment count = %d, want 1", len(segments)) + } + if segments[0].Cmd != 81 || segments[0].SN != 7 || segments[0].UNA != 3 || segments[0].Frg != 1 || segments[0].Wnd != 128 || segments[0].Len != 5 { + t.Fatalf("segment = %+v, want expected header values", segments[0]) + } +} + +func TestParseKCPPacketMetadataMultiSegment(t *testing.T) { + packet := buildTestKCPDatagram(99, []testKCPSegment{ + {cmd: 82, sn: 10, una: 5, frg: 2, wnd: 64, length: 3}, + {cmd: 83, sn: 11, una: 6, frg: 0, wnd: 96, length: 0}, + }) + + conv, segments := parseKCPPacketMetadata(packet) + if conv == nil || *conv != 99 { + t.Fatalf("conv = %v, want 99", conv) + } + if len(segments) != 2 { + t.Fatalf("segment count = %d, want 2", len(segments)) + } + if segments[0].SN != 10 || segments[1].SN != 11 { + t.Fatalf("segments = %+v, want in-order sequence numbers", segments) + } +} + +func TestParseKCPPacketMetadataReturnsNoSegmentsForTruncatedPacket(t *testing.T) { + packet := buildTestKCPDatagram(7, []testKCPSegment{ + {cmd: 81, sn: 1, una: 0, frg: 0, wnd: 32, length: 4}, + }) + packet = packet[:len(packet)-1] + + conv, segments := parseKCPPacketMetadata(packet) + if conv == nil || *conv != 7 { + t.Fatalf("conv = %v, want 7", conv) + } + if len(segments) != 0 { + t.Fatalf("segments = %+v, want empty on truncated packet", segments) + } +} + +func TestParseKCPSessionStatsInterval(t *testing.T) { + tests := []struct { + name string + raw string + want time.Duration + wantErr bool + }{ + {name: "default", raw: "", want: DefaultKCPSessionStatsInterval}, + {name: "valid", raw: "250ms", want: 250 * time.Millisecond}, + {name: "invalid format", raw: "soon", wantErr: true}, + {name: "invalid zero", raw: "0s", wantErr: true}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := ParseKCPSessionStatsInterval(tt.raw) + if tt.wantErr { + if err == nil { + t.Fatalf("ParseKCPSessionStatsInterval(%q) error = nil, want non-nil", tt.raw) + } + return + } + if err != nil { + t.Fatalf("ParseKCPSessionStatsInterval(%q) error = %v", tt.raw, err) + } + if got != tt.want { + t.Fatalf("ParseKCPSessionStatsInterval(%q) = %v, want %v", tt.raw, got, tt.want) + } + }) + } +} + +func TestKCPProcessSNMPSampleRecordUsesCounterDeltasAndGaugeSnapshots(t *testing.T) { + previous := &kcp.Snmp{ + BytesSent: 10, + BytesReceived: 20, + InPkts: 30, + OutPkts: 40, + InSegs: 50, + OutSegs: 60, + RetransSegs: 70, + FastRetransSegs: 80, + EarlyRetransSegs: 90, + LostSegs: 100, + RepeatSegs: 110, + InErrs: 120, + KCPInErrors: 130, + RingBufferSndQueue: 4, + RingBufferRcvQueue: 5, + RingBufferSndBuffer: 6, + CurrEstab: 1, + } + current := &kcp.Snmp{ + BytesSent: 15, + BytesReceived: 22, + InPkts: 31, + OutPkts: 44, + InSegs: 55, + OutSegs: 61, + RetransSegs: 79, + FastRetransSegs: 81, + EarlyRetransSegs: 95, + LostSegs: 101, + RepeatSegs: 115, + InErrs: 121, + KCPInErrors: 135, + RingBufferSndQueue: 9, + RingBufferRcvQueue: 8, + RingBufferSndBuffer: 7, + CurrEstab: 3, + } + + record := newKCPProcessSNMPSampleRecord(latencylog.NodeRoleServer, "hub", kcpStatsSampleReasonReceive, previous, current) + if record.RecordType != kcpSessionStatsRecordTypeProcessSNMPSample { + t.Fatalf("record type = %q, want %q", record.RecordType, kcpSessionStatsRecordTypeProcessSNMPSample) + } + if record.Conv != nil { + t.Fatalf("process sample conv = %v, want nil", record.Conv) + } + if record.BytesSent == nil || *record.BytesSent != 5 { + t.Fatalf("bytes_sent = %v, want 5", record.BytesSent) + } + if record.KCPInErrs == nil || *record.KCPInErrs != 5 { + t.Fatalf("kcp_in_errs = %v, want 5", record.KCPInErrs) + } + if record.RingBufferSndQueue == nil || *record.RingBufferSndQueue != 9 { + t.Fatalf("ring_buffer_snd_queue = %v, want 9", record.RingBufferSndQueue) + } + if record.CurrEstab == nil || *record.CurrEstab != 3 { + t.Fatalf("curr_estab = %v, want 3", record.CurrEstab) + } +} + +func TestKCPProcessStatsSamplerIsSharedPerLoggerRoleNodeAndInterval(t *testing.T) { + logger := &recordingKCPSessionStatsLogger{} + + first := acquireKCPProcessStatsSampler(logger, latencylog.NodeRoleServer, "hub", 10*time.Millisecond) + second := acquireKCPProcessStatsSampler(logger, latencylog.NodeRoleServer, "hub", 10*time.Millisecond) + if first != second { + t.Fatal("expected acquireKCPProcessStatsSampler to reuse sampler for same logger/role/node/interval") + } + + releaseKCPProcessStatsSampler(first) + releaseKCPProcessStatsSampler(second) +} + +func TestKCPSessionStatsSamplerRecordsEventAndPeriodicSamples(t *testing.T) { + kcp.DefaultSnmp.Reset() + + logger := &recordingKCPSessionStatsLogger{} + source := stubKCPSessionStatsSource{ + conv: 77, + rto: 25, + srtt: 10, + srttVar: 3, + local: &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 9001}, + remote: &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 9002}, + } + + sampler := newKCPSessionStatsSampler(source, logger, latencylog.NodeRolePeer, "peer-a", 10*time.Millisecond) + waitForKCPSessionStatsRecords(t, logger, func(records []KCPSessionStatsRecord) bool { + return hasKCPSessionStatsRecord(records, kcpSessionStatsRecordTypeSessionSample, kcpStatsSampleReasonPeriodic) + }, "periodic session sample") + + sampler.SampleEvent(kcpStatsSampleReasonSendHandoffBegin) + sampler.SampleEvent(kcpStatsSampleReasonSendHandoffEnd) + sampler.SampleEvent(kcpStatsSampleReasonReceive) + waitForKCPSessionStatsRecords(t, logger, func(records []KCPSessionStatsRecord) bool { + return hasKCPSessionStatsRecord(records, kcpSessionStatsRecordTypeSessionSample, kcpStatsSampleReasonSendHandoffBegin) && + hasKCPSessionStatsRecord(records, kcpSessionStatsRecordTypeSessionSample, kcpStatsSampleReasonSendHandoffEnd) && + hasKCPSessionStatsRecord(records, kcpSessionStatsRecordTypeSessionSample, kcpStatsSampleReasonReceive) + }, "event-driven session samples") + + sampler.Close() + waitForKCPSessionStatsRecords(t, logger, func(records []KCPSessionStatsRecord) bool { + return hasKCPSessionStatsRecord(records, kcpSessionStatsRecordTypeSessionSample, kcpStatsSampleReasonClose) && + hasKCPSessionStatsRecord(records, kcpSessionStatsRecordTypeProcessSNMPSample, kcpStatsSampleReasonClose) + }, "close samples") + + recordCount := len(logger.Records()) + time.Sleep(30 * time.Millisecond) + if got := len(logger.Records()); got != recordCount { + t.Fatalf("record count after Close() = %d, want %d", got, recordCount) + } +} + +func TestKCPConnSessionStatsLoggingSparseTraffic(t *testing.T) { + senderLogger := &recordingKCPSessionStatsLogger{} + receiverLogger := &recordingKCPSessionStatsLogger{} + + sender, accepted, cleanup := newKCPConnPair( + t, + []KCPOption{ + WithKCPLogger(latencylog.NoopLogger{}, latencylog.NodeRolePeer, "peer-a"), + WithKCPSessionStatsLogger(senderLogger, 10*time.Millisecond), + }, + []KCPOption{ + WithKCPLogger(latencylog.NoopLogger{}, latencylog.NodeRolePeer, "peer-b"), + WithKCPSessionStatsLogger(receiverLogger, 10*time.Millisecond), + }, + nil, + nil, + ) + defer cleanup() + + msg := protocol.Message{ + Type: protocol.MessageTypeText, + ID: 1, + From: "peer-a", + To: "peer-b", + Body: []byte("hello sparse"), + } + + sendErr := make(chan error, 1) + go func() { + sendErr <- sender.Send(msg) + }() + + receiver := awaitAcceptedKCPConn(t, accepted) + if _, err := receiver.Receive(); err != nil { + t.Fatalf("receiver.Receive() error = %v", err) + } + if err := <-sendErr; err != nil { + t.Fatalf("sender.Send() error = %v", err) + } + if err := sender.Close(); err != nil { + t.Fatalf("sender.Close() error = %v", err) + } + if err := receiver.Close(); err != nil { + t.Fatalf("receiver.Close() error = %v", err) + } + + waitForKCPSessionStatsRecords(t, senderLogger, func(records []KCPSessionStatsRecord) bool { + return hasKCPSessionStatsRecord(records, kcpSessionStatsRecordTypeSessionSample, kcpStatsSampleReasonSendHandoffBegin) && + hasKCPSessionStatsRecord(records, kcpSessionStatsRecordTypeSessionSample, kcpStatsSampleReasonSendHandoffEnd) && + hasKCPSessionStatsRecord(records, kcpSessionStatsRecordTypeSessionSample, kcpStatsSampleReasonClose) && + hasKCPSessionStatsRecord(records, kcpSessionStatsRecordTypeProcessSNMPSample, kcpStatsSampleReasonClose) + }, "sender sparse session stats") + waitForKCPSessionStatsRecords(t, receiverLogger, func(records []KCPSessionStatsRecord) bool { + return hasKCPSessionStatsRecord(records, kcpSessionStatsRecordTypeSessionSample, kcpStatsSampleReasonReceive) && + hasKCPSessionStatsRecord(records, kcpSessionStatsRecordTypeSessionSample, kcpStatsSampleReasonClose) + }, "receiver sparse session stats") +} + +func TestKCPConnSessionStatsLoggingContinuousTraffic(t *testing.T) { + logger := &recordingKCPSessionStatsLogger{} + + sender, accepted, cleanup := newKCPConnPair( + t, + []KCPOption{ + WithKCPLogger(latencylog.NoopLogger{}, latencylog.NodeRolePeer, "peer-a"), + WithKCPSessionStatsLogger(logger, 20*time.Millisecond), + }, + nil, + nil, + nil, + ) + defer cleanup() + + receiver := awaitAcceptedKCPConn(t, accepted) + receiveErr := make(chan error, 1) + go func() { + for i := 0; i < 12; i++ { + if _, err := receiver.Receive(); err != nil { + receiveErr <- err + return + } + } + receiveErr <- nil + }() + + for i := 0; i < 12; i++ { + msg := protocol.Message{ + Type: protocol.MessageTypeText, + ID: uint64(i + 1), + From: "peer-a", + To: "peer-b", + Body: []byte("hello continuous"), + } + if err := sender.Send(msg); err != nil { + t.Fatalf("sender.Send(message %d) error = %v", i+1, err) + } + time.Sleep(15 * time.Millisecond) + } + + if err := <-receiveErr; err != nil { + t.Fatalf("receiver.Receive() error = %v", err) + } + + waitForKCPSessionStatsRecords(t, logger, func(records []KCPSessionStatsRecord) bool { + return countKCPSessionStatsRecords(records, kcpSessionStatsRecordTypeSessionSample, kcpStatsSampleReasonPeriodic) >= 2 && + countKCPSessionStatsRecords(records, kcpSessionStatsRecordTypeProcessSNMPSample, kcpStatsSampleReasonPeriodic) >= 2 + }, "continuous periodic session stats") +} + +type testKCPSegment struct { + cmd uint8 + sn uint32 + una uint32 + frg uint8 + wnd uint16 + length uint32 +} + +func buildTestKCPDatagram(conv uint32, segments []testKCPSegment) []byte { + packet := make([]byte, 0) + for _, segment := range segments { + entry := make([]byte, kcpPacketHeaderSize+int(segment.length)) + binary.LittleEndian.PutUint32(entry[0:4], conv) + entry[4] = segment.cmd + entry[5] = segment.frg + binary.LittleEndian.PutUint16(entry[6:8], segment.wnd) + binary.LittleEndian.PutUint32(entry[12:16], segment.sn) + binary.LittleEndian.PutUint32(entry[16:20], segment.una) + binary.LittleEndian.PutUint32(entry[20:24], segment.length) + packet = append(packet, entry...) + } + return packet +} + +func hasKCPSessionStatsRecord(records []KCPSessionStatsRecord, recordType, reason string) bool { + return countKCPSessionStatsRecords(records, recordType, reason) > 0 +} + +func countKCPSessionStatsRecords(records []KCPSessionStatsRecord, recordType, reason string) int { + count := 0 + for _, record := range records { + if record.RecordType == recordType && record.SampleReason == reason { + count++ + } + } + return count +} + +func waitForKCPSessionStatsRecords(t *testing.T, logger *recordingKCPSessionStatsLogger, condition func([]KCPSessionStatsRecord) bool, description string) { + t.Helper() + + deadline := time.Now().Add(2 * time.Second) + for time.Now().Before(deadline) { + records := logger.Records() + if condition(records) { + return + } + time.Sleep(10 * time.Millisecond) + } + + t.Fatalf("timed out waiting for %s", description) +} diff --git a/cmd/kcppeer/main.go b/cmd/kcppeer/main.go index a1abd63..cc90d3d 100644 --- a/cmd/kcppeer/main.go +++ b/cmd/kcppeer/main.go @@ -25,10 +25,17 @@ func main() { inboxDir := flag.String("inbox-dir", "inbox", "directory used to persist received text and file messages") logPath := flag.String("latency-log", "", "optional JSONL file path for latency timestamp logs") kcpTimestampDebugLogPath := flag.String("kcp-ts-debug-log", "", "optional JSONL file path for KCP packet kernel timestamp debug records") + kcpSessionStatsLogPath := flag.String("kcp-session-stats-log", "", "optional JSONL file path for KCP session stats records") + kcpSessionStatsInterval := flag.String("kcp-session-stats-interval", transport.DefaultKCPSessionStatsInterval.String(), "sampling interval for KCP session stats, for example 100ms") interactive := flag.Bool("interactive", true, "enable interactive REPL for repeated text/file sends on the same connection") flag.Parse() - clientOptions := make([]peerpkg.Option, 0, 5) + statsInterval, err := transport.ParseKCPSessionStatsInterval(*kcpSessionStatsInterval) + if err != nil { + log.Fatalf("parse -kcp-session-stats-interval=%q: %v", *kcpSessionStatsInterval, err) + } + + clientOptions := make([]peerpkg.Option, 0, 6) if *logPath != "" { logger, err := latencylog.NewJSONLLogger(*logPath) if err != nil { @@ -45,6 +52,14 @@ func main() { defer logger.Close() clientOptions = append(clientOptions, peerpkg.WithKCPPacketDebugLogger(logger)) } + if *kcpSessionStatsLogPath != "" { + logger, err := transport.NewJSONLKCPSessionStatsLogger(*kcpSessionStatsLogPath) + if err != nil { + log.Fatalf("create kcp session stats logger %s: %v", *kcpSessionStatsLogPath, err) + } + defer logger.Close() + clientOptions = append(clientOptions, peerpkg.WithKCPSessionStatsLogger(logger, statsInterval)) + } if *bindIP != "" { clientOptions = append(clientOptions, peerpkg.WithBindIP(*bindIP)) } diff --git a/cmd/kcpserver/main.go b/cmd/kcpserver/main.go index e804f9d..d6c6b62 100644 --- a/cmd/kcpserver/main.go +++ b/cmd/kcpserver/main.go @@ -17,9 +17,16 @@ func main() { bindDevice := flag.String("bind-device", "", "optional Linux network device used when listening") logPath := flag.String("latency-log", "", "optional JSONL file path for latency timestamp logs") kcpTimestampDebugLogPath := flag.String("kcp-ts-debug-log", "", "optional JSONL file path for KCP packet kernel timestamp debug records") + kcpSessionStatsLogPath := flag.String("kcp-session-stats-log", "", "optional JSONL file path for KCP session stats records") + kcpSessionStatsInterval := flag.String("kcp-session-stats-interval", transport.DefaultKCPSessionStatsInterval.String(), "sampling interval for KCP session stats, for example 100ms") flag.Parse() - hubOptions := make([]server.KCPOption, 0, 1) + statsInterval, err := transport.ParseKCPSessionStatsInterval(*kcpSessionStatsInterval) + if err != nil { + log.Fatalf("parse -kcp-session-stats-interval=%q: %v", *kcpSessionStatsInterval, err) + } + + hubOptions := make([]server.KCPOption, 0, 2) if *logPath != "" { logger, err := latencylog.NewJSONLLogger(*logPath) if err != nil { @@ -38,6 +45,14 @@ func main() { defer logger.Close() packetLogger = logger } + if *kcpSessionStatsLogPath != "" { + logger, err := transport.NewJSONLKCPSessionStatsLogger(*kcpSessionStatsLogPath) + if err != nil { + log.Fatalf("create kcp session stats logger %s: %v", *kcpSessionStatsLogPath, err) + } + defer logger.Close() + hubOptions = append(hubOptions, server.WithKCPSessionStatsLogger(logger, statsInterval)) + } listener, packetConn, err := transport.ListenKCPSessions(*listenAddr, *bindDevice, packetLogger, latencylog.NodeRoleServer, "hub") if err != nil {