Files
OmniSocketGo/cmd/internal/transport/kcp_session_stats_test.go

410 lines
13 KiB
Go

package transport
import (
"encoding/binary"
"net"
"sync"
"testing"
"time"
kcp "github.com/xtaci/kcp-go/v5"
"omnisocketgo/cmd/internal/latencylog"
"omnisocketgo/cmd/internal/protocol"
)
type recordingKCPSessionStatsLogger struct {
mu sync.Mutex
records []KCPSessionStatsRecord
}
func (l *recordingKCPSessionStatsLogger) LogKCPSessionStatsRecord(record KCPSessionStatsRecord) error {
l.mu.Lock()
defer l.mu.Unlock()
l.records = append(l.records, record)
return nil
}
func (l *recordingKCPSessionStatsLogger) Records() []KCPSessionStatsRecord {
l.mu.Lock()
defer l.mu.Unlock()
return append([]KCPSessionStatsRecord(nil), l.records...)
}
type stubKCPSessionStatsSource struct {
conv uint32
rto uint32
srtt int32
srttVar int32
local net.Addr
remote net.Addr
}
func (s stubKCPSessionStatsSource) GetConv() uint32 { return s.conv }
func (s stubKCPSessionStatsSource) GetRTO() uint32 { return s.rto }
func (s stubKCPSessionStatsSource) GetSRTT() int32 { return s.srtt }
func (s stubKCPSessionStatsSource) GetSRTTVar() int32 { return s.srttVar }
func (s stubKCPSessionStatsSource) LocalAddr() net.Addr { return s.local }
func (s stubKCPSessionStatsSource) RemoteAddr() net.Addr { return s.remote }
func TestParseKCPPacketMetadataSingleSegment(t *testing.T) {
packet := buildTestKCPDatagram(42, []testKCPSegment{
{cmd: 81, sn: 7, una: 3, frg: 1, wnd: 128, length: 5},
})
conv, segments := parseKCPPacketMetadata(packet)
if conv == nil || *conv != 42 {
t.Fatalf("conv = %v, want 42", conv)
}
if len(segments) != 1 {
t.Fatalf("segment count = %d, want 1", len(segments))
}
if segments[0].Cmd != 81 || segments[0].SN != 7 || segments[0].UNA != 3 || segments[0].Frg != 1 || segments[0].Wnd != 128 || segments[0].Len != 5 {
t.Fatalf("segment = %+v, want expected header values", segments[0])
}
}
func TestParseKCPPacketMetadataMultiSegment(t *testing.T) {
packet := buildTestKCPDatagram(99, []testKCPSegment{
{cmd: 82, sn: 10, una: 5, frg: 2, wnd: 64, length: 3},
{cmd: 83, sn: 11, una: 6, frg: 0, wnd: 96, length: 0},
})
conv, segments := parseKCPPacketMetadata(packet)
if conv == nil || *conv != 99 {
t.Fatalf("conv = %v, want 99", conv)
}
if len(segments) != 2 {
t.Fatalf("segment count = %d, want 2", len(segments))
}
if segments[0].SN != 10 || segments[1].SN != 11 {
t.Fatalf("segments = %+v, want in-order sequence numbers", segments)
}
}
func TestParseKCPPacketMetadataReturnsNoSegmentsForTruncatedPacket(t *testing.T) {
packet := buildTestKCPDatagram(7, []testKCPSegment{
{cmd: 81, sn: 1, una: 0, frg: 0, wnd: 32, length: 4},
})
packet = packet[:len(packet)-1]
conv, segments := parseKCPPacketMetadata(packet)
if conv == nil || *conv != 7 {
t.Fatalf("conv = %v, want 7", conv)
}
if len(segments) != 0 {
t.Fatalf("segments = %+v, want empty on truncated packet", segments)
}
}
func TestParseKCPSessionStatsInterval(t *testing.T) {
tests := []struct {
name string
raw string
want time.Duration
wantErr bool
}{
{name: "default", raw: "", want: DefaultKCPSessionStatsInterval},
{name: "valid", raw: "250ms", want: 250 * time.Millisecond},
{name: "invalid format", raw: "soon", wantErr: true},
{name: "invalid zero", raw: "0s", wantErr: true},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := ParseKCPSessionStatsInterval(tt.raw)
if tt.wantErr {
if err == nil {
t.Fatalf("ParseKCPSessionStatsInterval(%q) error = nil, want non-nil", tt.raw)
}
return
}
if err != nil {
t.Fatalf("ParseKCPSessionStatsInterval(%q) error = %v", tt.raw, err)
}
if got != tt.want {
t.Fatalf("ParseKCPSessionStatsInterval(%q) = %v, want %v", tt.raw, got, tt.want)
}
})
}
}
func TestKCPProcessSNMPSampleRecordUsesCounterDeltasAndGaugeSnapshots(t *testing.T) {
previous := &kcp.Snmp{
BytesSent: 10,
BytesReceived: 20,
InPkts: 30,
OutPkts: 40,
InSegs: 50,
OutSegs: 60,
RetransSegs: 70,
FastRetransSegs: 80,
EarlyRetransSegs: 90,
LostSegs: 100,
RepeatSegs: 110,
InErrs: 120,
KCPInErrors: 130,
RingBufferSndQueue: 4,
RingBufferRcvQueue: 5,
RingBufferSndBuffer: 6,
CurrEstab: 1,
}
current := &kcp.Snmp{
BytesSent: 15,
BytesReceived: 22,
InPkts: 31,
OutPkts: 44,
InSegs: 55,
OutSegs: 61,
RetransSegs: 79,
FastRetransSegs: 81,
EarlyRetransSegs: 95,
LostSegs: 101,
RepeatSegs: 115,
InErrs: 121,
KCPInErrors: 135,
RingBufferSndQueue: 9,
RingBufferRcvQueue: 8,
RingBufferSndBuffer: 7,
CurrEstab: 3,
}
record := newKCPProcessSNMPSampleRecord(latencylog.NodeRoleServer, "hub", kcpStatsSampleReasonReceive, previous, current)
if record.RecordType != kcpSessionStatsRecordTypeProcessSNMPSample {
t.Fatalf("record type = %q, want %q", record.RecordType, kcpSessionStatsRecordTypeProcessSNMPSample)
}
if record.Conv != nil {
t.Fatalf("process sample conv = %v, want nil", record.Conv)
}
if record.BytesSent == nil || *record.BytesSent != 5 {
t.Fatalf("bytes_sent = %v, want 5", record.BytesSent)
}
if record.KCPInErrs == nil || *record.KCPInErrs != 5 {
t.Fatalf("kcp_in_errs = %v, want 5", record.KCPInErrs)
}
if record.RingBufferSndQueue == nil || *record.RingBufferSndQueue != 9 {
t.Fatalf("ring_buffer_snd_queue = %v, want 9", record.RingBufferSndQueue)
}
if record.CurrEstab == nil || *record.CurrEstab != 3 {
t.Fatalf("curr_estab = %v, want 3", record.CurrEstab)
}
}
func TestKCPProcessStatsSamplerIsSharedPerLoggerRoleNodeAndInterval(t *testing.T) {
logger := &recordingKCPSessionStatsLogger{}
first := acquireKCPProcessStatsSampler(logger, latencylog.NodeRoleServer, "hub", 10*time.Millisecond)
second := acquireKCPProcessStatsSampler(logger, latencylog.NodeRoleServer, "hub", 10*time.Millisecond)
if first != second {
t.Fatal("expected acquireKCPProcessStatsSampler to reuse sampler for same logger/role/node/interval")
}
releaseKCPProcessStatsSampler(first)
releaseKCPProcessStatsSampler(second)
}
func TestKCPSessionStatsSamplerRecordsEventAndPeriodicSamples(t *testing.T) {
kcp.DefaultSnmp.Reset()
logger := &recordingKCPSessionStatsLogger{}
source := stubKCPSessionStatsSource{
conv: 77,
rto: 25,
srtt: 10,
srttVar: 3,
local: &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 9001},
remote: &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 9002},
}
sampler := newKCPSessionStatsSampler(source, logger, latencylog.NodeRolePeer, "peer-a", 10*time.Millisecond)
waitForKCPSessionStatsRecords(t, logger, func(records []KCPSessionStatsRecord) bool {
return hasKCPSessionStatsRecord(records, kcpSessionStatsRecordTypeSessionSample, kcpStatsSampleReasonPeriodic)
}, "periodic session sample")
sampler.SampleEvent(kcpStatsSampleReasonSendHandoffBegin)
sampler.SampleEvent(kcpStatsSampleReasonSendHandoffEnd)
sampler.SampleEvent(kcpStatsSampleReasonReceive)
waitForKCPSessionStatsRecords(t, logger, func(records []KCPSessionStatsRecord) bool {
return hasKCPSessionStatsRecord(records, kcpSessionStatsRecordTypeSessionSample, kcpStatsSampleReasonSendHandoffBegin) &&
hasKCPSessionStatsRecord(records, kcpSessionStatsRecordTypeSessionSample, kcpStatsSampleReasonSendHandoffEnd) &&
hasKCPSessionStatsRecord(records, kcpSessionStatsRecordTypeSessionSample, kcpStatsSampleReasonReceive)
}, "event-driven session samples")
sampler.Close()
waitForKCPSessionStatsRecords(t, logger, func(records []KCPSessionStatsRecord) bool {
return hasKCPSessionStatsRecord(records, kcpSessionStatsRecordTypeSessionSample, kcpStatsSampleReasonClose) &&
hasKCPSessionStatsRecord(records, kcpSessionStatsRecordTypeProcessSNMPSample, kcpStatsSampleReasonClose)
}, "close samples")
recordCount := len(logger.Records())
time.Sleep(30 * time.Millisecond)
if got := len(logger.Records()); got != recordCount {
t.Fatalf("record count after Close() = %d, want %d", got, recordCount)
}
}
func TestKCPConnSessionStatsLoggingSparseTraffic(t *testing.T) {
senderLogger := &recordingKCPSessionStatsLogger{}
receiverLogger := &recordingKCPSessionStatsLogger{}
sender, accepted, cleanup := newKCPConnPair(
t,
[]KCPOption{
WithKCPLogger(latencylog.NoopLogger{}, latencylog.NodeRolePeer, "peer-a"),
WithKCPSessionStatsLogger(senderLogger, 10*time.Millisecond),
},
[]KCPOption{
WithKCPLogger(latencylog.NoopLogger{}, latencylog.NodeRolePeer, "peer-b"),
WithKCPSessionStatsLogger(receiverLogger, 10*time.Millisecond),
},
nil,
nil,
)
defer cleanup()
msg := protocol.Message{
Type: protocol.MessageTypeText,
ID: 1,
From: "peer-a",
To: "peer-b",
Body: []byte("hello sparse"),
}
sendErr := make(chan error, 1)
go func() {
sendErr <- sender.Send(msg)
}()
receiver := awaitAcceptedKCPConn(t, accepted)
if _, err := receiver.Receive(); err != nil {
t.Fatalf("receiver.Receive() error = %v", err)
}
if err := <-sendErr; err != nil {
t.Fatalf("sender.Send() error = %v", err)
}
if err := sender.Close(); err != nil {
t.Fatalf("sender.Close() error = %v", err)
}
if err := receiver.Close(); err != nil {
t.Fatalf("receiver.Close() error = %v", err)
}
waitForKCPSessionStatsRecords(t, senderLogger, func(records []KCPSessionStatsRecord) bool {
return hasKCPSessionStatsRecord(records, kcpSessionStatsRecordTypeSessionSample, kcpStatsSampleReasonSendHandoffBegin) &&
hasKCPSessionStatsRecord(records, kcpSessionStatsRecordTypeSessionSample, kcpStatsSampleReasonSendHandoffEnd) &&
hasKCPSessionStatsRecord(records, kcpSessionStatsRecordTypeSessionSample, kcpStatsSampleReasonClose) &&
hasKCPSessionStatsRecord(records, kcpSessionStatsRecordTypeProcessSNMPSample, kcpStatsSampleReasonClose)
}, "sender sparse session stats")
waitForKCPSessionStatsRecords(t, receiverLogger, func(records []KCPSessionStatsRecord) bool {
return hasKCPSessionStatsRecord(records, kcpSessionStatsRecordTypeSessionSample, kcpStatsSampleReasonReceive) &&
hasKCPSessionStatsRecord(records, kcpSessionStatsRecordTypeSessionSample, kcpStatsSampleReasonClose)
}, "receiver sparse session stats")
}
func TestKCPConnSessionStatsLoggingContinuousTraffic(t *testing.T) {
logger := &recordingKCPSessionStatsLogger{}
sender, accepted, cleanup := newKCPConnPair(
t,
[]KCPOption{
WithKCPLogger(latencylog.NoopLogger{}, latencylog.NodeRolePeer, "peer-a"),
WithKCPSessionStatsLogger(logger, 20*time.Millisecond),
},
nil,
nil,
nil,
)
defer cleanup()
receiver := awaitAcceptedKCPConn(t, accepted)
receiveErr := make(chan error, 1)
go func() {
for i := 0; i < 12; i++ {
if _, err := receiver.Receive(); err != nil {
receiveErr <- err
return
}
}
receiveErr <- nil
}()
for i := 0; i < 12; i++ {
msg := protocol.Message{
Type: protocol.MessageTypeText,
ID: uint64(i + 1),
From: "peer-a",
To: "peer-b",
Body: []byte("hello continuous"),
}
if err := sender.Send(msg); err != nil {
t.Fatalf("sender.Send(message %d) error = %v", i+1, err)
}
time.Sleep(15 * time.Millisecond)
}
if err := <-receiveErr; err != nil {
t.Fatalf("receiver.Receive() error = %v", err)
}
waitForKCPSessionStatsRecords(t, logger, func(records []KCPSessionStatsRecord) bool {
return countKCPSessionStatsRecords(records, kcpSessionStatsRecordTypeSessionSample, kcpStatsSampleReasonPeriodic) >= 2 &&
countKCPSessionStatsRecords(records, kcpSessionStatsRecordTypeProcessSNMPSample, kcpStatsSampleReasonPeriodic) >= 2
}, "continuous periodic session stats")
}
type testKCPSegment struct {
cmd uint8
sn uint32
una uint32
frg uint8
wnd uint16
length uint32
}
func buildTestKCPDatagram(conv uint32, segments []testKCPSegment) []byte {
packet := make([]byte, 0)
for _, segment := range segments {
entry := make([]byte, kcpPacketHeaderSize+int(segment.length))
binary.LittleEndian.PutUint32(entry[0:4], conv)
entry[4] = segment.cmd
entry[5] = segment.frg
binary.LittleEndian.PutUint16(entry[6:8], segment.wnd)
binary.LittleEndian.PutUint32(entry[12:16], segment.sn)
binary.LittleEndian.PutUint32(entry[16:20], segment.una)
binary.LittleEndian.PutUint32(entry[20:24], segment.length)
packet = append(packet, entry...)
}
return packet
}
func hasKCPSessionStatsRecord(records []KCPSessionStatsRecord, recordType, reason string) bool {
return countKCPSessionStatsRecords(records, recordType, reason) > 0
}
func countKCPSessionStatsRecords(records []KCPSessionStatsRecord, recordType, reason string) int {
count := 0
for _, record := range records {
if record.RecordType == recordType && record.SampleReason == reason {
count++
}
}
return count
}
func waitForKCPSessionStatsRecords(t *testing.T, logger *recordingKCPSessionStatsLogger, condition func([]KCPSessionStatsRecord) bool, description string) {
t.Helper()
deadline := time.Now().Add(2 * time.Second)
for time.Now().Before(deadline) {
records := logger.Records()
if condition(records) {
return
}
time.Sleep(10 * time.Millisecond)
}
t.Fatalf("timed out waiting for %s", description)
}