Files
2026-03-24 16:08:13 +08:00

458 lines
14 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package latencylog
import (
"bufio"
"encoding/json"
"fmt"
"os"
"path/filepath"
"sort"
"omnisocketgo/cmd/internal/protocol"
)
// Summary 是针对单条消息的时延的规则列表。
var requiredTimestampNames = []string{
EventAAppPrepBegin, // A 端应用开始准备这条消息
EventATXSched, // A 端进入 Linux qdisc 之前
EventATXSoftware, // A 端即将交给网卡驱动
EventBRXSoftware, // B 端网卡驱动把数据交给 Linux 接收栈
EventBAppRecv, // B 端应用真正读到完整消息
EventBPersistEnd, // B 端写盘完成
}
// Summary 是针对单条消息的时延整理结果。
type Summary struct {
MessageType protocol.MessageType `json:"message_type"` //消息类型
MessageID uint64 `json:"message_id"` //消息ID
From string `json:"from"` //发送方
To string `json:"to"` //接收方
FileName string `json:"file_name,omitempty"` //文件名(仅文件消息)
BodySize int `json:"body_size"` //消息体大小(字节数)
Timestamps map[string]int64 `json:"timestamps"` //事件时间戳key 是事件名称value 是 UnixNano 时间戳
AProcessingLatencyNS *int64 `json:"a_processing_latency_ns,omitempty"` // A 处理时延A_TX_SCHED - A_APP_PREP_BEGIN
AQueueLatencyNS *int64 `json:"a_queue_latency_ns,omitempty"` // A 排队时延A_TX_SOFTWARE - A_TX_SCHED
ABTransportPropagationNS *int64 `json:"a_b_transport_propagation_ns,omitempty"` // A-B 传输+传播时延近似B_APP_RECV - A_TX_SOFTWARE
BKernelReceivePathLatencyNS *int64 `json:"b_kernel_receive_path_latency_ns,omitempty"` // B 内核接收路径近似B_APP_RECV - B_RX_SOFTWARE
BProcessingLatencyNS *int64 `json:"b_processing_latency_ns,omitempty"` // B 处理时延B_PERSIST_END - B_APP_RECV
EndToEndLatencyNS *int64 `json:"end_to_end_latency_ns,omitempty"` // 端到端时延B_PERSIST_END - A_APP_PREP_BEGIN
AProcessingBitrateBPS *float64 `json:"a_processing_bitrate_bps,omitempty"` // A 处理阶段近似比特率:(BodySize * 8) / A 处理时延(秒)
ABTransportPropagationBitrateBPS *float64 `json:"a_b_transport_propagation_bitrate_bps,omitempty"` // A-B 传输+传播阶段近似比特率:(BodySize * 8) / A-B 传输+传播时延(秒)
EndToEndBitrateBPS *float64 `json:"end_to_end_bitrate_bps,omitempty"` // 端到端近似比特率:(BodySize * 8) / 端到端时延(秒)
ApproxRTTNS *int64 `json:"approx_rtt_ns,omitempty"` // 近似 RTT首条反向应答的 B_APP_RECV - 当前请求的 A_TX_SOFTWARE
MissingTimestamps []string `json:"missing_timestamps,omitempty"` // 缺失的时间戳列表,包含 requiredTimestampNames 中但在原始事件中没有的事件名称
}
// LoadEventsFromFiles 从JSONL 原始日志文件中加载事件。
type messageKey struct {
MessageType protocol.MessageType //消息类型
MessageID uint64 //消息ID
From string //发送方
To string //接收方
}
// LoadEventsFromFiles 从多个 JSONL 原始日志文件中加载事件。
func LoadEventsFromFiles(paths []string) ([]Event, error) {
var events []Event
for _, path := range paths {
fileEvents, err := LoadEventsFromFile(path)
if err != nil {
return nil, err
}
events = append(events, fileEvents...)
}
return events, nil
}
// LoadEventsFromFilesWithSharedMaxOffset 从多个 JSONL 原始日志文件中加载事件,
// 并按每个输入文件的最大 message_id 计算共享截断点。
func LoadEventsFromFilesWithSharedMaxOffset(paths []string, sharedMaxOffset uint64) ([]Event, *uint64, error) {
eventsByFile := make([][]Event, 0, len(paths))
var minMaxMessageID uint64
hasSharedMax := false
for _, path := range paths {
fileEvents, err := LoadEventsFromFile(path)
if err != nil {
return nil, nil, err
}
eventsByFile = append(eventsByFile, fileEvents)
fileMaxMessageID, ok := maxBusinessMessageID(fileEvents)
if !ok {
return nil, nil, nil
}
if !hasSharedMax || fileMaxMessageID < minMaxMessageID {
minMaxMessageID = fileMaxMessageID
hasSharedMax = true
}
}
if !hasSharedMax {
return nil, nil, nil
}
cutoff, ok := subtractUint64(minMaxMessageID, sharedMaxOffset)
if !ok {
return []Event{}, nil, nil
}
var events []Event
for _, fileEvents := range eventsByFile {
events = append(events, filterEventsByMaxMessageID(fileEvents, cutoff)...)
}
return events, &cutoff, nil
}
// LoadEventsFromFile 从单个 JSONL 原始日志文件中加载事件。
func LoadEventsFromFile(path string) ([]Event, error) {
file, err := os.Open(path)
if err != nil {
return nil, fmt.Errorf("latencylog: open raw log %s: %w", path, err)
}
defer file.Close()
var events []Event
scanner := bufio.NewScanner(file)
for scanner.Scan() {
if len(scanner.Bytes()) == 0 {
continue
}
var event Event
if err := json.Unmarshal(scanner.Bytes(), &event); err != nil { //解析 JSONL 行失败,返回错误
return nil, fmt.Errorf("latencylog: decode event from %s: %w", path, err)
}
events = append(events, event)
}
if err := scanner.Err(); err != nil {
return nil, fmt.Errorf("latencylog: scan raw log %s: %w", path, err)
}
return events, nil
}
// SummarizeEvents 将原始事件整理成按消息分组的时延结果。
func SummarizeEvents(events []Event) []Summary {
grouped := make(map[messageKey]*Summary)
for _, event := range events {
if !IsBusinessEvent(event) {
continue
}
key := messageKey{
MessageType: event.MessageType,
MessageID: event.MessageID,
From: event.From,
To: event.To,
}
summary, ok := grouped[key]
if !ok {
summary = &Summary{
MessageType: event.MessageType,
MessageID: event.MessageID,
From: event.From,
To: event.To,
FileName: event.FileName,
BodySize: event.BodySize,
Timestamps: make(map[string]int64),
}
grouped[key] = summary
}
if summary.FileName == "" {
summary.FileName = event.FileName
}
if event.BodySize > 0 {
summary.BodySize = event.BodySize
}
if existing, exists := summary.Timestamps[event.Event]; !exists || event.TsUnixNano < existing {
summary.Timestamps[event.Event] = event.TsUnixNano
}
}
summaryPointers := make([]*Summary, 0, len(grouped))
for _, summary := range grouped {
completeSummary(summary) //补全时延指标和缺失时间戳信息
summaryPointers = append(summaryPointers, summary)
}
assignApproxRTTs(summaryPointers)
summaries := make([]Summary, 0, len(summaryPointers))
for _, summary := range summaryPointers {
summaries = append(summaries, *summary)
}
//对整理结果进行排序,先按发送方、再按接收方、再按消息 ID、最后按消息类型排序保证输出的稳定性和可读性。
sort.Slice(summaries, func(i, j int) bool {
if summaries[i].From != summaries[j].From {
return summaries[i].From < summaries[j].From
}
if summaries[i].To != summaries[j].To {
return summaries[i].To < summaries[j].To
}
if summaries[i].MessageID != summaries[j].MessageID {
return summaries[i].MessageID < summaries[j].MessageID
}
return summaries[i].MessageType < summaries[j].MessageType
})
return summaries
}
// WriteSummariesJSONL 将整理结果写成 JSONL 汇总文件。
func WriteSummariesJSONL(path string, summaries []Summary) error {
if err := os.MkdirAll(filepath.Dir(path), 0o755); err != nil {
return fmt.Errorf("latencylog: create summary dir for %s: %w", path, err)
}
file, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0o644)
if err != nil {
return fmt.Errorf("latencylog: open summary file %s: %w", path, err)
}
defer file.Close()
writer := bufio.NewWriter(file)
for _, summary := range summaries { //将每条整理结果编码成 JSONL 行并写入文件
line, err := json.Marshal(summary)
if err != nil {
return fmt.Errorf("latencylog: encode summary for message %d: %w", summary.MessageID, err)
}
if _, err := writer.Write(append(line, '\n')); err != nil {
return fmt.Errorf("latencylog: write summary file %s: %w", path, err)
}
}
if err := writer.Flush(); err != nil { //将缓冲区内容写入文件
return fmt.Errorf("latencylog: flush summary file %s: %w", path, err)
}
return nil
}
// completeSummary 根据事件时间戳计算时延指标,并找出缺失的时间戳。
func completeSummary(summary *Summary) {
summary.MissingTimestamps = missingTimestampNames(summary.Timestamps)
if value := subtractIfPresent(summary.Timestamps, EventATXSched, EventAAppPrepBegin); value != nil {
summary.AProcessingLatencyNS = value
}
if value := subtractIfPresent(summary.Timestamps, EventATXSoftware, EventATXSched); value != nil {
summary.AQueueLatencyNS = value
}
if value := subtractIfPresent(summary.Timestamps, EventBAppRecv, EventATXSoftware); value != nil {
summary.ABTransportPropagationNS = value
}
if value := subtractIfPresent(summary.Timestamps, EventBAppRecv, EventBRXSoftware); value != nil {
summary.BKernelReceivePathLatencyNS = value
}
if value := subtractIfPresent(summary.Timestamps, EventBPersistEnd, EventBAppRecv); value != nil {
summary.BProcessingLatencyNS = value
}
if value := subtractIfPresent(summary.Timestamps, EventBPersistEnd, EventAAppPrepBegin); value != nil {
summary.EndToEndLatencyNS = value
}
summary.AProcessingBitrateBPS = calculateBitrateBPS(summary.BodySize, summary.AProcessingLatencyNS)
summary.ABTransportPropagationBitrateBPS = calculateBitrateBPS(summary.BodySize, summary.ABTransportPropagationNS)
summary.EndToEndBitrateBPS = calculateBitrateBPS(summary.BodySize, summary.EndToEndLatencyNS)
}
type routeKey struct {
From string
To string
}
func assignApproxRTTs(summaries []*Summary) {
grouped := make(map[routeKey][]*Summary)
for _, summary := range summaries {
grouped[routeKey{From: summary.From, To: summary.To}] = append(grouped[routeKey{From: summary.From, To: summary.To}], summary)
}
for key, requests := range grouped {
replies := grouped[routeKey{From: key.To, To: key.From}]
if len(replies) == 0 {
continue
}
assignApproxRTTsForRoute(
sortSummariesByTimestamp(requests, EventBAppRecv),
sortSummariesByTimestamp(replies, EventATXSoftware),
)
}
}
func assignApproxRTTsForRoute(requests, replies []*Summary) {
replyIndex := 0
for _, request := range requests {
requestReceivedAtResponder, ok := request.Timestamps[EventBAppRecv]
if !ok {
continue
}
for replyIndex < len(replies) {
reply := replies[replyIndex]
replySentAtResponder, ok := reply.Timestamps[EventATXSoftware]
if !ok {
replyIndex++
continue
}
if replySentAtResponder < requestReceivedAtResponder {
replyIndex++
continue
}
if value := subtractSummaryTimestamps(reply, EventBAppRecv, request, EventATXSoftware); value != nil {
request.ApproxRTTNS = value
}
replyIndex++
break
}
}
}
func sortSummariesByTimestamp(summaries []*Summary, eventName string) []*Summary {
sorted := append([]*Summary(nil), summaries...)
sort.SliceStable(sorted, func(i, j int) bool {
leftTS, leftOK := sorted[i].Timestamps[eventName]
rightTS, rightOK := sorted[j].Timestamps[eventName]
switch {
case leftOK && rightOK:
if leftTS != rightTS {
return leftTS < rightTS
}
case leftOK:
return true
case rightOK:
return false
}
if sorted[i].MessageID != sorted[j].MessageID {
return sorted[i].MessageID < sorted[j].MessageID
}
if sorted[i].From != sorted[j].From {
return sorted[i].From < sorted[j].From
}
if sorted[i].To != sorted[j].To {
return sorted[i].To < sorted[j].To
}
return sorted[i].MessageType < sorted[j].MessageType
})
return sorted
}
// 返回 requiredTimestampNames 中哪些在给定的 timestamps 中缺失。
func missingTimestampNames(timestamps map[string]int64) []string {
var missing []string
for _, name := range requiredTimestampNames {
if _, ok := timestamps[name]; !ok {
missing = append(missing, name)
}
}
return missing
}
// 如果 timestamps 中同时存在 endName 和 beginName则返回它们的差值否则返回 nil。
func subtractIfPresent(timestamps map[string]int64, endName, beginName string) *int64 {
end, ok := timestamps[endName]
if !ok {
return nil
}
begin, ok := timestamps[beginName]
if !ok {
return nil
}
value := end - begin
return &value
}
func subtractSummaryTimestamps(endSummary *Summary, endName string, beginSummary *Summary, beginName string) *int64 {
end, ok := endSummary.Timestamps[endName]
if !ok {
return nil
}
begin, ok := beginSummary.Timestamps[beginName]
if !ok {
return nil
}
value := end - begin
return &value
}
// 除法函数,如果 bodySize <= 0 或 latencyNS 不存在或 <= 0则返回 nil否则返回 bodySize / latencyNS 的结果。
func calculateBitrateBPS(bodySize int, latencyNS *int64) *float64 {
if bodySize <= 0 || latencyNS == nil || *latencyNS <= 0 {
return nil
}
value := float64(bodySize) * 8 * 1_000_000_000 / float64(*latencyNS)
return &value
}
// 最大 message_id 计算函数
func maxBusinessMessageID(events []Event) (uint64, bool) {
var maxMessageID uint64
hasBusinessMessage := false
for _, event := range events {
if !IsBusinessEvent(event) {
continue
}
if !hasBusinessMessage || event.MessageID > maxMessageID {
maxMessageID = event.MessageID
hasBusinessMessage = true
}
}
return maxMessageID, hasBusinessMessage
}
// 根据 message_id 截断事件列表的函数
func filterEventsByMaxMessageID(events []Event, maxMessageID uint64) []Event {
filtered := make([]Event, 0, len(events))
for _, event := range events {
if event.MessageID > maxMessageID {
continue
}
filtered = append(filtered, event)
}
return filtered
}
func subtractUint64(value, offset uint64) (uint64, bool) {
if offset > value {
return 0, false
}
return value - offset, true
}
// 判断事件是否是业务相关的时延事件(其中一项)
func IsBusinessEvent(event Event) bool {
switch event.Event {
case EventAAppPrepBegin,
EventATXSched,
EventATXSoftware,
EventATXHardware,
EventBRXHardware,
EventBRXSoftware,
EventBAppRecv,
EventBPersistBegin,
EventBPersistEnd:
return true
default:
return false
}
}