Files
OmniSocketGo/cmd/internal/transport/kcp_session_stats.go

445 lines
12 KiB
Go

package transport
import (
"encoding/json"
"fmt"
"net"
"os"
"path/filepath"
"reflect"
"sync"
"time"
kcp "github.com/xtaci/kcp-go/v5"
)
const DefaultKCPSessionStatsInterval = 100 * time.Millisecond
const (
kcpSessionStatsRecordTypeSessionSample = "session_sample"
kcpSessionStatsRecordTypeProcessSNMPSample = "process_snmp_sample"
kcpStatsSampleReasonPeriodic = "periodic"
kcpStatsSampleReasonSendHandoffBegin = "send_handoff_begin"
kcpStatsSampleReasonSendHandoffEnd = "send_handoff_end"
kcpStatsSampleReasonReceive = "receive"
kcpStatsSampleReasonClose = "close"
)
// KCPSessionStatsRecord is a JSONL record for KCP session and process stats.
type KCPSessionStatsRecord struct {
RecordType string `json:"record_type"`
NodeRole string `json:"node_role,omitempty"`
NodeID string `json:"node_id,omitempty"`
LocalAddr string `json:"local_addr,omitempty"`
RemoteAddr string `json:"remote_addr,omitempty"`
Conv *uint32 `json:"conv,omitempty"`
TSUnixNano int64 `json:"ts_unix_nano"`
SampleReason string `json:"sample_reason"`
RTOMillis *uint32 `json:"rto_ms,omitempty"`
SRTTMillis *int32 `json:"srtt_ms,omitempty"`
SRTTVarMillis *int32 `json:"srttvar_ms,omitempty"`
BytesSent *uint64 `json:"bytes_sent,omitempty"`
BytesReceived *uint64 `json:"bytes_received,omitempty"`
InPkts *uint64 `json:"in_pkts,omitempty"`
OutPkts *uint64 `json:"out_pkts,omitempty"`
InSegs *uint64 `json:"in_segs,omitempty"`
OutSegs *uint64 `json:"out_segs,omitempty"`
RetransSegs *uint64 `json:"retrans_segs,omitempty"`
FastRetransSegs *uint64 `json:"fast_retrans_segs,omitempty"`
EarlyRetransSegs *uint64 `json:"early_retrans_segs,omitempty"`
LostSegs *uint64 `json:"lost_segs,omitempty"`
RepeatSegs *uint64 `json:"repeat_segs,omitempty"`
InErrs *uint64 `json:"in_errs,omitempty"`
KCPInErrs *uint64 `json:"kcp_in_errs,omitempty"`
RingBufferSndQueue *uint64 `json:"ring_buffer_snd_queue,omitempty"`
RingBufferRcvQueue *uint64 `json:"ring_buffer_rcv_queue,omitempty"`
RingBufferSndBuffer *uint64 `json:"ring_buffer_snd_buffer,omitempty"`
CurrEstab *uint64 `json:"curr_estab,omitempty"`
}
// KCPSessionStatsLogger receives KCP session stats records.
type KCPSessionStatsLogger interface {
LogKCPSessionStatsRecord(record KCPSessionStatsRecord) error
}
// JSONLKCPSessionStatsLogger appends KCP session stats as JSONL.
type JSONLKCPSessionStatsLogger struct {
mu sync.Mutex
closeOnce sync.Once
closeErr error
file *os.File
}
// NewJSONLKCPSessionStatsLogger creates a thread-safe JSONL stats logger.
func NewJSONLKCPSessionStatsLogger(path string) (*JSONLKCPSessionStatsLogger, error) {
dir := filepath.Dir(path)
if err := os.MkdirAll(dir, 0o755); err != nil {
return nil, fmt.Errorf("transport: create kcp session stats log dir %s: %w", dir, err)
}
file, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0o644)
if err != nil {
return nil, fmt.Errorf("transport: open kcp session stats log %s: %w", path, err)
}
return &JSONLKCPSessionStatsLogger{file: file}, nil
}
// LogKCPSessionStatsRecord appends one JSONL record.
func (l *JSONLKCPSessionStatsLogger) LogKCPSessionStatsRecord(record KCPSessionStatsRecord) error {
line, err := json.Marshal(record)
if err != nil {
return err
}
l.mu.Lock()
defer l.mu.Unlock()
if _, err := l.file.Write(append(line, '\n')); err != nil {
return err
}
return nil
}
// Close closes the underlying file.
func (l *JSONLKCPSessionStatsLogger) Close() error {
l.closeOnce.Do(func() {
l.closeErr = l.file.Close()
})
return l.closeErr
}
// ParseKCPSessionStatsInterval parses a duration string for stats sampling.
func ParseKCPSessionStatsInterval(raw string) (time.Duration, error) {
if raw == "" {
return DefaultKCPSessionStatsInterval, nil
}
interval, err := time.ParseDuration(raw)
if err != nil {
return 0, fmt.Errorf("transport: parse kcp session stats interval %q: %w", raw, err)
}
if interval <= 0 {
return 0, fmt.Errorf("transport: kcp session stats interval must be greater than zero")
}
return interval, nil
}
type kcpSessionStatsSource interface {
GetConv() uint32
GetRTO() uint32
GetSRTT() int32
GetSRTTVar() int32
LocalAddr() net.Addr
RemoteAddr() net.Addr
}
type kcpSessionStatsSampler struct {
source kcpSessionStatsSource
logger KCPSessionStatsLogger
nodeRole string
nodeID string
interval time.Duration
processSampler *kcpProcessStatsSampler
sampleMu sync.Mutex
stopOnce sync.Once
stopCh chan struct{}
stoppedCh chan struct{}
}
func newKCPSessionStatsSampler(source kcpSessionStatsSource, logger KCPSessionStatsLogger, nodeRole, nodeID string, interval time.Duration) *kcpSessionStatsSampler {
if source == nil || logger == nil {
return nil
}
if interval <= 0 {
interval = DefaultKCPSessionStatsInterval
}
sampler := &kcpSessionStatsSampler{
source: source,
logger: logger,
nodeRole: nodeRole,
nodeID: nodeID,
interval: interval,
processSampler: acquireKCPProcessStatsSampler(logger, nodeRole, nodeID, interval),
stopCh: make(chan struct{}),
stoppedCh: make(chan struct{}),
}
go sampler.run()
return sampler
}
func (s *kcpSessionStatsSampler) run() {
ticker := time.NewTicker(s.interval)
defer ticker.Stop()
defer close(s.stoppedCh)
for {
select {
case <-ticker.C:
s.logSessionSample(kcpStatsSampleReasonPeriodic)
case <-s.stopCh:
return
}
}
}
func (s *kcpSessionStatsSampler) SampleEvent(reason string) {
if s == nil {
return
}
s.logSessionSample(reason)
if s.processSampler == nil {
return
}
if reason == kcpStatsSampleReasonClose {
s.processSampler.requestSampleAndWait(reason)
return
}
s.processSampler.requestSample(reason)
}
func (s *kcpSessionStatsSampler) Close() {
if s == nil {
return
}
s.stopOnce.Do(func() {
s.SampleEvent(kcpStatsSampleReasonClose)
close(s.stopCh)
<-s.stoppedCh
releaseKCPProcessStatsSampler(s.processSampler)
})
}
func (s *kcpSessionStatsSampler) logSessionSample(reason string) {
s.sampleMu.Lock()
defer s.sampleMu.Unlock()
conv := s.source.GetConv()
rto := s.source.GetRTO()
srtt := s.source.GetSRTT()
srttVar := s.source.GetSRTTVar()
record := KCPSessionStatsRecord{
RecordType: kcpSessionStatsRecordTypeSessionSample,
NodeRole: s.nodeRole,
NodeID: s.nodeID,
LocalAddr: addrString(s.source.LocalAddr()),
RemoteAddr: addrString(s.source.RemoteAddr()),
Conv: uint32Ptr(conv),
TSUnixNano: time.Now().UTC().UnixNano(),
SampleReason: reason,
RTOMillis: uint32Ptr(rto),
SRTTMillis: int32Ptr(srtt),
SRTTVarMillis: int32Ptr(srttVar),
}
_ = s.logger.LogKCPSessionStatsRecord(record)
}
type kcpProcessSampleRequest struct {
reason string
done chan struct{}
}
type kcpProcessStatsSampler struct {
key string
logger KCPSessionStatsLogger
nodeRole string
nodeID string
interval time.Duration
requestCh chan kcpProcessSampleRequest
stopCh chan struct{}
stoppedCh chan struct{}
sampleMu sync.Mutex
previous *kcp.Snmp
refCount int
}
var (
kcpProcessSamplersMu sync.Mutex
kcpProcessSamplers = make(map[string]*kcpProcessStatsSampler)
)
func acquireKCPProcessStatsSampler(logger KCPSessionStatsLogger, nodeRole, nodeID string, interval time.Duration) *kcpProcessStatsSampler {
if logger == nil {
return nil
}
if interval <= 0 {
interval = DefaultKCPSessionStatsInterval
}
key := fmt.Sprintf("%s|%s|%s|%d", kcpStatsLoggerIdentity(logger), nodeRole, nodeID, interval)
kcpProcessSamplersMu.Lock()
defer kcpProcessSamplersMu.Unlock()
if sampler, ok := kcpProcessSamplers[key]; ok {
sampler.refCount++
return sampler
}
sampler := &kcpProcessStatsSampler{
key: key,
logger: logger,
nodeRole: nodeRole,
nodeID: nodeID,
interval: interval,
requestCh: make(chan kcpProcessSampleRequest, 1),
stopCh: make(chan struct{}),
stoppedCh: make(chan struct{}),
previous: kcp.DefaultSnmp.Copy(),
refCount: 1,
}
kcpProcessSamplers[key] = sampler
go sampler.run()
return sampler
}
func releaseKCPProcessStatsSampler(sampler *kcpProcessStatsSampler) {
if sampler == nil {
return
}
kcpProcessSamplersMu.Lock()
sampler.refCount--
if sampler.refCount > 0 {
kcpProcessSamplersMu.Unlock()
return
}
delete(kcpProcessSamplers, sampler.key)
close(sampler.stopCh)
kcpProcessSamplersMu.Unlock()
<-sampler.stoppedCh
}
func (s *kcpProcessStatsSampler) run() {
ticker := time.NewTicker(s.interval)
defer ticker.Stop()
defer close(s.stoppedCh)
for {
select {
case <-ticker.C:
s.logProcessSample(kcpStatsSampleReasonPeriodic)
case req := <-s.requestCh:
s.logProcessSample(req.reason)
if req.done != nil {
close(req.done)
}
case <-s.stopCh:
return
}
}
}
func (s *kcpProcessStatsSampler) requestSample(reason string) {
if s == nil {
return
}
select {
case s.requestCh <- kcpProcessSampleRequest{reason: reason}:
default:
}
}
func (s *kcpProcessStatsSampler) requestSampleAndWait(reason string) {
if s == nil {
return
}
done := make(chan struct{})
select {
case s.requestCh <- kcpProcessSampleRequest{reason: reason, done: done}:
<-done
default:
s.logProcessSample(reason)
}
}
func (s *kcpProcessStatsSampler) logProcessSample(reason string) {
s.sampleMu.Lock()
defer s.sampleMu.Unlock()
current := kcp.DefaultSnmp.Copy()
record := newKCPProcessSNMPSampleRecord(s.nodeRole, s.nodeID, reason, s.previous, current)
s.previous = current
_ = s.logger.LogKCPSessionStatsRecord(record)
}
func newKCPProcessSNMPSampleRecord(nodeRole, nodeID, reason string, previous, current *kcp.Snmp) KCPSessionStatsRecord {
return KCPSessionStatsRecord{
RecordType: kcpSessionStatsRecordTypeProcessSNMPSample,
NodeRole: nodeRole,
NodeID: nodeID,
TSUnixNano: time.Now().UTC().UnixNano(),
SampleReason: reason,
BytesSent: uint64Ptr(diffUint64(previous.BytesSent, current.BytesSent)),
BytesReceived: uint64Ptr(diffUint64(previous.BytesReceived, current.BytesReceived)),
InPkts: uint64Ptr(diffUint64(previous.InPkts, current.InPkts)),
OutPkts: uint64Ptr(diffUint64(previous.OutPkts, current.OutPkts)),
InSegs: uint64Ptr(diffUint64(previous.InSegs, current.InSegs)),
OutSegs: uint64Ptr(diffUint64(previous.OutSegs, current.OutSegs)),
RetransSegs: uint64Ptr(diffUint64(previous.RetransSegs, current.RetransSegs)),
FastRetransSegs: uint64Ptr(diffUint64(previous.FastRetransSegs, current.FastRetransSegs)),
EarlyRetransSegs: uint64Ptr(diffUint64(previous.EarlyRetransSegs, current.EarlyRetransSegs)),
LostSegs: uint64Ptr(diffUint64(previous.LostSegs, current.LostSegs)),
RepeatSegs: uint64Ptr(diffUint64(previous.RepeatSegs, current.RepeatSegs)),
InErrs: uint64Ptr(diffUint64(previous.InErrs, current.InErrs)),
KCPInErrs: uint64Ptr(diffUint64(previous.KCPInErrors, current.KCPInErrors)),
RingBufferSndQueue: uint64Ptr(current.RingBufferSndQueue),
RingBufferRcvQueue: uint64Ptr(current.RingBufferRcvQueue),
RingBufferSndBuffer: uint64Ptr(current.RingBufferSndBuffer),
CurrEstab: uint64Ptr(current.CurrEstab),
}
}
func kcpStatsLoggerIdentity(logger KCPSessionStatsLogger) string {
value := reflect.ValueOf(logger)
switch value.Kind() {
case reflect.Pointer, reflect.Map, reflect.Func, reflect.Slice, reflect.Chan, reflect.UnsafePointer:
return fmt.Sprintf("%T:%x", logger, value.Pointer())
default:
return fmt.Sprintf("%T:%v", logger, logger)
}
}
func diffUint64(previous, current uint64) uint64 {
if current < previous {
return 0
}
return current - previous
}
func addrString(addr net.Addr) string {
if addr == nil {
return ""
}
return addr.String()
}
func uint32Ptr(value uint32) *uint32 {
return &value
}
func uint64Ptr(value uint64) *uint64 {
return &value
}
func int32Ptr(value int32) *int32 {
return &value
}