package latencylog import ( "bufio" "encoding/json" "fmt" "os" "path/filepath" "sort" "omnisocketgo/cmd/internal/protocol" ) // Summary 是针对单条消息的时延的规则列表。 var requiredTimestampNames = []string{ EventAAppPrepBegin, // A 端应用开始准备这条消息 EventATXSched, // A 端进入 Linux qdisc 之前 EventATXSoftware, // A 端即将交给网卡驱动 EventBRXSoftware, // B 端网卡驱动把数据交给 Linux 接收栈 EventBAppRecv, // B 端应用真正读到完整消息 EventBPersistEnd, // B 端写盘完成 } // Summary 是针对单条消息的时延整理结果。 type Summary struct { MessageType protocol.MessageType `json:"message_type"` //消息类型 MessageID uint64 `json:"message_id"` //消息ID From string `json:"from"` //发送方 To string `json:"to"` //接收方 FileName string `json:"file_name,omitempty"` //文件名(仅文件消息) BodySize int `json:"body_size"` //消息体大小(字节数) Timestamps map[string]int64 `json:"timestamps"` //事件时间戳,key 是事件名称,value 是 UnixNano 时间戳 AProcessingLatencyNS *int64 `json:"a_processing_latency_ns,omitempty"` // A 处理时延:A_TX_SCHED - A_APP_PREP_BEGIN AQueueLatencyNS *int64 `json:"a_queue_latency_ns,omitempty"` // A 排队时延:A_TX_SOFTWARE - A_TX_SCHED ABTransportPropagationNS *int64 `json:"a_b_transport_propagation_ns,omitempty"` // A-B 传输+传播时延近似:B_APP_RECV - A_TX_SOFTWARE BKernelReceivePathLatencyNS *int64 `json:"b_kernel_receive_path_latency_ns,omitempty"` // B 内核接收路径近似:B_APP_RECV - B_RX_SOFTWARE BProcessingLatencyNS *int64 `json:"b_processing_latency_ns,omitempty"` // B 处理时延:B_PERSIST_END - B_APP_RECV EndToEndLatencyNS *int64 `json:"end_to_end_latency_ns,omitempty"` // 端到端时延:B_PERSIST_END - A_APP_PREP_BEGIN AProcessingBitrateBPS *float64 `json:"a_processing_bitrate_bps,omitempty"` // A 处理阶段近似比特率:(BodySize * 8) / A 处理时延(秒) ABTransportPropagationBitrateBPS *float64 `json:"a_b_transport_propagation_bitrate_bps,omitempty"` // A-B 传输+传播阶段近似比特率:(BodySize * 8) / A-B 传输+传播时延(秒) EndToEndBitrateBPS *float64 `json:"end_to_end_bitrate_bps,omitempty"` // 端到端近似比特率:(BodySize * 8) / 端到端时延(秒) ApproxRTTNS *int64 `json:"approx_rtt_ns,omitempty"` // 近似 RTT:首条反向应答的 B_APP_RECV - 当前请求的 A_TX_SOFTWARE MissingTimestamps []string `json:"missing_timestamps,omitempty"` // 缺失的时间戳列表,包含 requiredTimestampNames 中但在原始事件中没有的事件名称 } // LoadEventsFromFiles 从JSONL 原始日志文件中加载事件。 type messageKey struct { MessageType protocol.MessageType //消息类型 MessageID uint64 //消息ID From string //发送方 To string //接收方 } // LoadEventsFromFiles 从多个 JSONL 原始日志文件中加载事件。 func LoadEventsFromFiles(paths []string) ([]Event, error) { var events []Event for _, path := range paths { fileEvents, err := LoadEventsFromFile(path) if err != nil { return nil, err } events = append(events, fileEvents...) } return events, nil } // LoadEventsFromFile 从单个 JSONL 原始日志文件中加载事件。 func LoadEventsFromFile(path string) ([]Event, error) { file, err := os.Open(path) if err != nil { return nil, fmt.Errorf("latencylog: open raw log %s: %w", path, err) } defer file.Close() var events []Event scanner := bufio.NewScanner(file) for scanner.Scan() { if len(scanner.Bytes()) == 0 { continue } var event Event if err := json.Unmarshal(scanner.Bytes(), &event); err != nil { //解析 JSONL 行失败,返回错误 return nil, fmt.Errorf("latencylog: decode event from %s: %w", path, err) } events = append(events, event) } if err := scanner.Err(); err != nil { return nil, fmt.Errorf("latencylog: scan raw log %s: %w", path, err) } return events, nil } // SummarizeEvents 将原始事件整理成按消息分组的时延结果。 func SummarizeEvents(events []Event) []Summary { grouped := make(map[messageKey]*Summary) for _, event := range events { if !IsBusinessEvent(event) { continue } key := messageKey{ MessageType: event.MessageType, MessageID: event.MessageID, From: event.From, To: event.To, } summary, ok := grouped[key] if !ok { summary = &Summary{ MessageType: event.MessageType, MessageID: event.MessageID, From: event.From, To: event.To, FileName: event.FileName, BodySize: event.BodySize, Timestamps: make(map[string]int64), } grouped[key] = summary } if summary.FileName == "" { summary.FileName = event.FileName } if event.BodySize > 0 { summary.BodySize = event.BodySize } if existing, exists := summary.Timestamps[event.Event]; !exists || event.TsUnixNano < existing { summary.Timestamps[event.Event] = event.TsUnixNano } } summaryPointers := make([]*Summary, 0, len(grouped)) for _, summary := range grouped { completeSummary(summary) //补全时延指标和缺失时间戳信息 summaryPointers = append(summaryPointers, summary) } assignApproxRTTs(summaryPointers) summaries := make([]Summary, 0, len(summaryPointers)) for _, summary := range summaryPointers { summaries = append(summaries, *summary) } //对整理结果进行排序,先按发送方、再按接收方、再按消息 ID、最后按消息类型排序,保证输出的稳定性和可读性。 sort.Slice(summaries, func(i, j int) bool { if summaries[i].From != summaries[j].From { return summaries[i].From < summaries[j].From } if summaries[i].To != summaries[j].To { return summaries[i].To < summaries[j].To } if summaries[i].MessageID != summaries[j].MessageID { return summaries[i].MessageID < summaries[j].MessageID } return summaries[i].MessageType < summaries[j].MessageType }) return summaries } // WriteSummariesJSONL 将整理结果写成 JSONL 汇总文件。 func WriteSummariesJSONL(path string, summaries []Summary) error { if err := os.MkdirAll(filepath.Dir(path), 0o755); err != nil { return fmt.Errorf("latencylog: create summary dir for %s: %w", path, err) } file, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0o644) if err != nil { return fmt.Errorf("latencylog: open summary file %s: %w", path, err) } defer file.Close() writer := bufio.NewWriter(file) for _, summary := range summaries { //将每条整理结果编码成 JSONL 行并写入文件 line, err := json.Marshal(summary) if err != nil { return fmt.Errorf("latencylog: encode summary for message %d: %w", summary.MessageID, err) } if _, err := writer.Write(append(line, '\n')); err != nil { return fmt.Errorf("latencylog: write summary file %s: %w", path, err) } } if err := writer.Flush(); err != nil { //将缓冲区内容写入文件 return fmt.Errorf("latencylog: flush summary file %s: %w", path, err) } return nil } // completeSummary 根据事件时间戳计算时延指标,并找出缺失的时间戳。 func completeSummary(summary *Summary) { summary.MissingTimestamps = missingTimestampNames(summary.Timestamps) if value := subtractIfPresent(summary.Timestamps, EventATXSched, EventAAppPrepBegin); value != nil { summary.AProcessingLatencyNS = value } if value := subtractIfPresent(summary.Timestamps, EventATXSoftware, EventATXSched); value != nil { summary.AQueueLatencyNS = value } if value := subtractIfPresent(summary.Timestamps, EventBAppRecv, EventATXSoftware); value != nil { summary.ABTransportPropagationNS = value } if value := subtractIfPresent(summary.Timestamps, EventBAppRecv, EventBRXSoftware); value != nil { summary.BKernelReceivePathLatencyNS = value } if value := subtractIfPresent(summary.Timestamps, EventBPersistEnd, EventBAppRecv); value != nil { summary.BProcessingLatencyNS = value } if value := subtractIfPresent(summary.Timestamps, EventBPersistEnd, EventAAppPrepBegin); value != nil { summary.EndToEndLatencyNS = value } summary.AProcessingBitrateBPS = calculateBitrateBPS(summary.BodySize, summary.AProcessingLatencyNS) summary.ABTransportPropagationBitrateBPS = calculateBitrateBPS(summary.BodySize, summary.ABTransportPropagationNS) summary.EndToEndBitrateBPS = calculateBitrateBPS(summary.BodySize, summary.EndToEndLatencyNS) } type routeKey struct { From string To string } func assignApproxRTTs(summaries []*Summary) { grouped := make(map[routeKey][]*Summary) for _, summary := range summaries { grouped[routeKey{From: summary.From, To: summary.To}] = append(grouped[routeKey{From: summary.From, To: summary.To}], summary) } for key, requests := range grouped { replies := grouped[routeKey{From: key.To, To: key.From}] if len(replies) == 0 { continue } assignApproxRTTsForRoute( sortSummariesByTimestamp(requests, EventBAppRecv), sortSummariesByTimestamp(replies, EventATXSoftware), ) } } func assignApproxRTTsForRoute(requests, replies []*Summary) { replyIndex := 0 for _, request := range requests { requestReceivedAtResponder, ok := request.Timestamps[EventBAppRecv] if !ok { continue } for replyIndex < len(replies) { reply := replies[replyIndex] replySentAtResponder, ok := reply.Timestamps[EventATXSoftware] if !ok { replyIndex++ continue } if replySentAtResponder < requestReceivedAtResponder { replyIndex++ continue } if value := subtractSummaryTimestamps(reply, EventBAppRecv, request, EventATXSoftware); value != nil { request.ApproxRTTNS = value } replyIndex++ break } } } func sortSummariesByTimestamp(summaries []*Summary, eventName string) []*Summary { sorted := append([]*Summary(nil), summaries...) sort.SliceStable(sorted, func(i, j int) bool { leftTS, leftOK := sorted[i].Timestamps[eventName] rightTS, rightOK := sorted[j].Timestamps[eventName] switch { case leftOK && rightOK: if leftTS != rightTS { return leftTS < rightTS } case leftOK: return true case rightOK: return false } if sorted[i].MessageID != sorted[j].MessageID { return sorted[i].MessageID < sorted[j].MessageID } if sorted[i].From != sorted[j].From { return sorted[i].From < sorted[j].From } if sorted[i].To != sorted[j].To { return sorted[i].To < sorted[j].To } return sorted[i].MessageType < sorted[j].MessageType }) return sorted } // 返回 requiredTimestampNames 中哪些在给定的 timestamps 中缺失。 func missingTimestampNames(timestamps map[string]int64) []string { var missing []string for _, name := range requiredTimestampNames { if _, ok := timestamps[name]; !ok { missing = append(missing, name) } } return missing } // 如果 timestamps 中同时存在 endName 和 beginName,则返回它们的差值;否则返回 nil。 func subtractIfPresent(timestamps map[string]int64, endName, beginName string) *int64 { end, ok := timestamps[endName] if !ok { return nil } begin, ok := timestamps[beginName] if !ok { return nil } value := end - begin return &value } func subtractSummaryTimestamps(endSummary *Summary, endName string, beginSummary *Summary, beginName string) *int64 { end, ok := endSummary.Timestamps[endName] if !ok { return nil } begin, ok := beginSummary.Timestamps[beginName] if !ok { return nil } value := end - begin return &value } // 除法函数,如果 bodySize <= 0 或 latencyNS 不存在或 <= 0,则返回 nil;否则返回 bodySize / latencyNS 的结果。 func calculateBitrateBPS(bodySize int, latencyNS *int64) *float64 { if bodySize <= 0 || latencyNS == nil || *latencyNS <= 0 { return nil } value := float64(bodySize) * 8 * 1_000_000_000 / float64(*latencyNS) return &value } // 判断事件是否是业务相关的时延事件(其中一项) func IsBusinessEvent(event Event) bool { switch event.Event { case EventAAppPrepBegin, EventATXSched, EventATXSoftware, EventATXHardware, EventBRXHardware, EventBRXSoftware, EventBAppRecv, EventBPersistBegin, EventBPersistEnd: return true default: return false } }