package transport import ( "encoding/json" "fmt" "net" "os" "path/filepath" "reflect" "sync" "time" kcp "github.com/xtaci/kcp-go/v5" ) const DefaultKCPSessionStatsInterval = 100 * time.Millisecond const ( kcpSessionStatsRecordTypeSessionSample = "session_sample" kcpSessionStatsRecordTypeProcessSNMPSample = "process_snmp_sample" kcpStatsSampleReasonPeriodic = "periodic" kcpStatsSampleReasonSendHandoffBegin = "send_handoff_begin" kcpStatsSampleReasonSendHandoffEnd = "send_handoff_end" kcpStatsSampleReasonReceive = "receive" kcpStatsSampleReasonClose = "close" ) // KCPSessionStatsRecord is a JSONL record for KCP session and process stats. type KCPSessionStatsRecord struct { RecordType string `json:"record_type"` NodeRole string `json:"node_role,omitempty"` NodeID string `json:"node_id,omitempty"` LocalAddr string `json:"local_addr,omitempty"` RemoteAddr string `json:"remote_addr,omitempty"` Conv *uint32 `json:"conv,omitempty"` TSUnixNano int64 `json:"ts_unix_nano"` SampleReason string `json:"sample_reason"` RTOMillis *uint32 `json:"rto_ms,omitempty"` SRTTMillis *int32 `json:"srtt_ms,omitempty"` SRTTVarMillis *int32 `json:"srttvar_ms,omitempty"` BytesSent *uint64 `json:"bytes_sent,omitempty"` BytesReceived *uint64 `json:"bytes_received,omitempty"` InPkts *uint64 `json:"in_pkts,omitempty"` OutPkts *uint64 `json:"out_pkts,omitempty"` InSegs *uint64 `json:"in_segs,omitempty"` OutSegs *uint64 `json:"out_segs,omitempty"` RetransSegs *uint64 `json:"retrans_segs,omitempty"` FastRetransSegs *uint64 `json:"fast_retrans_segs,omitempty"` EarlyRetransSegs *uint64 `json:"early_retrans_segs,omitempty"` LostSegs *uint64 `json:"lost_segs,omitempty"` RepeatSegs *uint64 `json:"repeat_segs,omitempty"` InErrs *uint64 `json:"in_errs,omitempty"` KCPInErrs *uint64 `json:"kcp_in_errs,omitempty"` RingBufferSndQueue *uint64 `json:"ring_buffer_snd_queue,omitempty"` RingBufferRcvQueue *uint64 `json:"ring_buffer_rcv_queue,omitempty"` RingBufferSndBuffer *uint64 `json:"ring_buffer_snd_buffer,omitempty"` CurrEstab *uint64 `json:"curr_estab,omitempty"` } // KCPSessionStatsLogger receives KCP session stats records. type KCPSessionStatsLogger interface { LogKCPSessionStatsRecord(record KCPSessionStatsRecord) error } // JSONLKCPSessionStatsLogger appends KCP session stats as JSONL. type JSONLKCPSessionStatsLogger struct { mu sync.Mutex closeOnce sync.Once closeErr error file *os.File } // NewJSONLKCPSessionStatsLogger creates a thread-safe JSONL stats logger. func NewJSONLKCPSessionStatsLogger(path string) (*JSONLKCPSessionStatsLogger, error) { dir := filepath.Dir(path) if err := os.MkdirAll(dir, 0o755); err != nil { return nil, fmt.Errorf("transport: create kcp session stats log dir %s: %w", dir, err) } file, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0o644) if err != nil { return nil, fmt.Errorf("transport: open kcp session stats log %s: %w", path, err) } return &JSONLKCPSessionStatsLogger{file: file}, nil } // LogKCPSessionStatsRecord appends one JSONL record. func (l *JSONLKCPSessionStatsLogger) LogKCPSessionStatsRecord(record KCPSessionStatsRecord) error { line, err := json.Marshal(record) if err != nil { return err } l.mu.Lock() defer l.mu.Unlock() if _, err := l.file.Write(append(line, '\n')); err != nil { return err } return nil } // Close closes the underlying file. func (l *JSONLKCPSessionStatsLogger) Close() error { l.closeOnce.Do(func() { l.closeErr = l.file.Close() }) return l.closeErr } // ParseKCPSessionStatsInterval parses a duration string for stats sampling. func ParseKCPSessionStatsInterval(raw string) (time.Duration, error) { if raw == "" { return DefaultKCPSessionStatsInterval, nil } interval, err := time.ParseDuration(raw) if err != nil { return 0, fmt.Errorf("transport: parse kcp session stats interval %q: %w", raw, err) } if interval <= 0 { return 0, fmt.Errorf("transport: kcp session stats interval must be greater than zero") } return interval, nil } type kcpSessionStatsSource interface { GetConv() uint32 GetRTO() uint32 GetSRTT() int32 GetSRTTVar() int32 LocalAddr() net.Addr RemoteAddr() net.Addr } type kcpSessionStatsSampler struct { source kcpSessionStatsSource logger KCPSessionStatsLogger nodeRole string nodeID string interval time.Duration processSampler *kcpProcessStatsSampler sampleMu sync.Mutex stopOnce sync.Once stopCh chan struct{} stoppedCh chan struct{} } func newKCPSessionStatsSampler(source kcpSessionStatsSource, logger KCPSessionStatsLogger, nodeRole, nodeID string, interval time.Duration) *kcpSessionStatsSampler { if source == nil || logger == nil { return nil } if interval <= 0 { interval = DefaultKCPSessionStatsInterval } sampler := &kcpSessionStatsSampler{ source: source, logger: logger, nodeRole: nodeRole, nodeID: nodeID, interval: interval, processSampler: acquireKCPProcessStatsSampler(logger, nodeRole, nodeID, interval), stopCh: make(chan struct{}), stoppedCh: make(chan struct{}), } go sampler.run() return sampler } func (s *kcpSessionStatsSampler) run() { ticker := time.NewTicker(s.interval) defer ticker.Stop() defer close(s.stoppedCh) for { select { case <-ticker.C: s.logSessionSample(kcpStatsSampleReasonPeriodic) case <-s.stopCh: return } } } func (s *kcpSessionStatsSampler) SampleEvent(reason string) { if s == nil { return } s.logSessionSample(reason) if s.processSampler == nil { return } if reason == kcpStatsSampleReasonClose { s.processSampler.requestSampleAndWait(reason) return } s.processSampler.requestSample(reason) } func (s *kcpSessionStatsSampler) Close() { if s == nil { return } s.stopOnce.Do(func() { s.SampleEvent(kcpStatsSampleReasonClose) close(s.stopCh) <-s.stoppedCh releaseKCPProcessStatsSampler(s.processSampler) }) } func (s *kcpSessionStatsSampler) logSessionSample(reason string) { s.sampleMu.Lock() defer s.sampleMu.Unlock() conv := s.source.GetConv() rto := s.source.GetRTO() srtt := s.source.GetSRTT() srttVar := s.source.GetSRTTVar() record := KCPSessionStatsRecord{ RecordType: kcpSessionStatsRecordTypeSessionSample, NodeRole: s.nodeRole, NodeID: s.nodeID, LocalAddr: addrString(s.source.LocalAddr()), RemoteAddr: addrString(s.source.RemoteAddr()), Conv: uint32Ptr(conv), TSUnixNano: time.Now().UTC().UnixNano(), SampleReason: reason, RTOMillis: uint32Ptr(rto), SRTTMillis: int32Ptr(srtt), SRTTVarMillis: int32Ptr(srttVar), } _ = s.logger.LogKCPSessionStatsRecord(record) } type kcpProcessSampleRequest struct { reason string done chan struct{} } type kcpProcessStatsSampler struct { key string logger KCPSessionStatsLogger nodeRole string nodeID string interval time.Duration requestCh chan kcpProcessSampleRequest stopCh chan struct{} stoppedCh chan struct{} sampleMu sync.Mutex previous *kcp.Snmp refCount int } var ( kcpProcessSamplersMu sync.Mutex kcpProcessSamplers = make(map[string]*kcpProcessStatsSampler) ) func acquireKCPProcessStatsSampler(logger KCPSessionStatsLogger, nodeRole, nodeID string, interval time.Duration) *kcpProcessStatsSampler { if logger == nil { return nil } if interval <= 0 { interval = DefaultKCPSessionStatsInterval } key := fmt.Sprintf("%s|%s|%s|%d", kcpStatsLoggerIdentity(logger), nodeRole, nodeID, interval) kcpProcessSamplersMu.Lock() defer kcpProcessSamplersMu.Unlock() if sampler, ok := kcpProcessSamplers[key]; ok { sampler.refCount++ return sampler } sampler := &kcpProcessStatsSampler{ key: key, logger: logger, nodeRole: nodeRole, nodeID: nodeID, interval: interval, requestCh: make(chan kcpProcessSampleRequest, 1), stopCh: make(chan struct{}), stoppedCh: make(chan struct{}), previous: kcp.DefaultSnmp.Copy(), refCount: 1, } kcpProcessSamplers[key] = sampler go sampler.run() return sampler } func releaseKCPProcessStatsSampler(sampler *kcpProcessStatsSampler) { if sampler == nil { return } kcpProcessSamplersMu.Lock() sampler.refCount-- if sampler.refCount > 0 { kcpProcessSamplersMu.Unlock() return } delete(kcpProcessSamplers, sampler.key) close(sampler.stopCh) kcpProcessSamplersMu.Unlock() <-sampler.stoppedCh } func (s *kcpProcessStatsSampler) run() { ticker := time.NewTicker(s.interval) defer ticker.Stop() defer close(s.stoppedCh) for { select { case <-ticker.C: s.logProcessSample(kcpStatsSampleReasonPeriodic) case req := <-s.requestCh: s.logProcessSample(req.reason) if req.done != nil { close(req.done) } case <-s.stopCh: return } } } func (s *kcpProcessStatsSampler) requestSample(reason string) { if s == nil { return } select { case s.requestCh <- kcpProcessSampleRequest{reason: reason}: default: } } func (s *kcpProcessStatsSampler) requestSampleAndWait(reason string) { if s == nil { return } done := make(chan struct{}) select { case s.requestCh <- kcpProcessSampleRequest{reason: reason, done: done}: <-done default: s.logProcessSample(reason) } } func (s *kcpProcessStatsSampler) logProcessSample(reason string) { s.sampleMu.Lock() defer s.sampleMu.Unlock() current := kcp.DefaultSnmp.Copy() record := newKCPProcessSNMPSampleRecord(s.nodeRole, s.nodeID, reason, s.previous, current) s.previous = current _ = s.logger.LogKCPSessionStatsRecord(record) } func newKCPProcessSNMPSampleRecord(nodeRole, nodeID, reason string, previous, current *kcp.Snmp) KCPSessionStatsRecord { return KCPSessionStatsRecord{ RecordType: kcpSessionStatsRecordTypeProcessSNMPSample, NodeRole: nodeRole, NodeID: nodeID, TSUnixNano: time.Now().UTC().UnixNano(), SampleReason: reason, BytesSent: uint64Ptr(diffUint64(previous.BytesSent, current.BytesSent)), BytesReceived: uint64Ptr(diffUint64(previous.BytesReceived, current.BytesReceived)), InPkts: uint64Ptr(diffUint64(previous.InPkts, current.InPkts)), OutPkts: uint64Ptr(diffUint64(previous.OutPkts, current.OutPkts)), InSegs: uint64Ptr(diffUint64(previous.InSegs, current.InSegs)), OutSegs: uint64Ptr(diffUint64(previous.OutSegs, current.OutSegs)), RetransSegs: uint64Ptr(diffUint64(previous.RetransSegs, current.RetransSegs)), FastRetransSegs: uint64Ptr(diffUint64(previous.FastRetransSegs, current.FastRetransSegs)), EarlyRetransSegs: uint64Ptr(diffUint64(previous.EarlyRetransSegs, current.EarlyRetransSegs)), LostSegs: uint64Ptr(diffUint64(previous.LostSegs, current.LostSegs)), RepeatSegs: uint64Ptr(diffUint64(previous.RepeatSegs, current.RepeatSegs)), InErrs: uint64Ptr(diffUint64(previous.InErrs, current.InErrs)), KCPInErrs: uint64Ptr(diffUint64(previous.KCPInErrors, current.KCPInErrors)), RingBufferSndQueue: uint64Ptr(current.RingBufferSndQueue), RingBufferRcvQueue: uint64Ptr(current.RingBufferRcvQueue), RingBufferSndBuffer: uint64Ptr(current.RingBufferSndBuffer), CurrEstab: uint64Ptr(current.CurrEstab), } } func kcpStatsLoggerIdentity(logger KCPSessionStatsLogger) string { value := reflect.ValueOf(logger) switch value.Kind() { case reflect.Pointer, reflect.Map, reflect.Func, reflect.Slice, reflect.Chan, reflect.UnsafePointer: return fmt.Sprintf("%T:%x", logger, value.Pointer()) default: return fmt.Sprintf("%T:%v", logger, logger) } } func diffUint64(previous, current uint64) uint64 { if current < previous { return 0 } return current - previous } func addrString(addr net.Addr) string { if addr == nil { return "" } return addr.String() } func uint32Ptr(value uint32) *uint32 { return &value } func uint64Ptr(value uint64) *uint64 { return &value } func int32Ptr(value int32) *int32 { return &value }