377 lines
12 KiB
Go
377 lines
12 KiB
Go
package latencylog
|
||
|
||
import (
|
||
"bufio"
|
||
"encoding/json"
|
||
"fmt"
|
||
"os"
|
||
"path/filepath"
|
||
"sort"
|
||
|
||
"omnisocketgo/cmd/internal/protocol"
|
||
)
|
||
|
||
// Summary 是针对单条消息的时延的规则列表。
|
||
var requiredTimestampNames = []string{
|
||
EventAAppPrepBegin, // A 端应用开始准备这条消息
|
||
EventATXSched, // A 端进入 Linux qdisc 之前
|
||
EventATXSoftware, // A 端即将交给网卡驱动
|
||
EventBRXSoftware, // B 端网卡驱动把数据交给 Linux 接收栈
|
||
EventBAppRecv, // B 端应用真正读到完整消息
|
||
EventBPersistEnd, // B 端写盘完成
|
||
}
|
||
|
||
// Summary 是针对单条消息的时延整理结果。
|
||
type Summary struct {
|
||
MessageType protocol.MessageType `json:"message_type"` //消息类型
|
||
MessageID uint64 `json:"message_id"` //消息ID
|
||
From string `json:"from"` //发送方
|
||
To string `json:"to"` //接收方
|
||
FileName string `json:"file_name,omitempty"` //文件名(仅文件消息)
|
||
BodySize int `json:"body_size"` //消息体大小(字节数)
|
||
Timestamps map[string]int64 `json:"timestamps"` //事件时间戳,key 是事件名称,value 是 UnixNano 时间戳
|
||
|
||
AProcessingLatencyNS *int64 `json:"a_processing_latency_ns,omitempty"` // A 处理时延:A_TX_SCHED - A_APP_PREP_BEGIN
|
||
AQueueLatencyNS *int64 `json:"a_queue_latency_ns,omitempty"` // A 排队时延:A_TX_SOFTWARE - A_TX_SCHED
|
||
ABTransportPropagationNS *int64 `json:"a_b_transport_propagation_ns,omitempty"` // A-B 传输+传播时延近似:B_APP_RECV - A_TX_SOFTWARE
|
||
BKernelReceivePathLatencyNS *int64 `json:"b_kernel_receive_path_latency_ns,omitempty"` // B 内核接收路径近似:B_APP_RECV - B_RX_SOFTWARE
|
||
BProcessingLatencyNS *int64 `json:"b_processing_latency_ns,omitempty"` // B 处理时延:B_PERSIST_END - B_APP_RECV
|
||
EndToEndLatencyNS *int64 `json:"end_to_end_latency_ns,omitempty"` // 端到端时延:B_PERSIST_END - A_APP_PREP_BEGIN
|
||
AProcessingBitrateBPS *float64 `json:"a_processing_bitrate_bps,omitempty"` // A 处理阶段近似比特率:(BodySize * 8) / A 处理时延(秒)
|
||
ABTransportPropagationBitrateBPS *float64 `json:"a_b_transport_propagation_bitrate_bps,omitempty"` // A-B 传输+传播阶段近似比特率:(BodySize * 8) / A-B 传输+传播时延(秒)
|
||
EndToEndBitrateBPS *float64 `json:"end_to_end_bitrate_bps,omitempty"` // 端到端近似比特率:(BodySize * 8) / 端到端时延(秒)
|
||
ApproxRTTNS *int64 `json:"approx_rtt_ns,omitempty"` // 近似 RTT:首条反向应答的 B_APP_RECV - 当前请求的 A_TX_SOFTWARE
|
||
MissingTimestamps []string `json:"missing_timestamps,omitempty"` // 缺失的时间戳列表,包含 requiredTimestampNames 中但在原始事件中没有的事件名称
|
||
}
|
||
|
||
// LoadEventsFromFiles 从JSONL 原始日志文件中加载事件。
|
||
type messageKey struct {
|
||
MessageType protocol.MessageType //消息类型
|
||
MessageID uint64 //消息ID
|
||
From string //发送方
|
||
To string //接收方
|
||
}
|
||
|
||
// LoadEventsFromFiles 从多个 JSONL 原始日志文件中加载事件。
|
||
func LoadEventsFromFiles(paths []string) ([]Event, error) {
|
||
var events []Event
|
||
for _, path := range paths {
|
||
fileEvents, err := LoadEventsFromFile(path)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
events = append(events, fileEvents...)
|
||
}
|
||
|
||
return events, nil
|
||
}
|
||
|
||
// LoadEventsFromFile 从单个 JSONL 原始日志文件中加载事件。
|
||
func LoadEventsFromFile(path string) ([]Event, error) {
|
||
file, err := os.Open(path)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("latencylog: open raw log %s: %w", path, err)
|
||
}
|
||
defer file.Close()
|
||
|
||
var events []Event
|
||
scanner := bufio.NewScanner(file)
|
||
for scanner.Scan() {
|
||
if len(scanner.Bytes()) == 0 {
|
||
continue
|
||
}
|
||
|
||
var event Event
|
||
if err := json.Unmarshal(scanner.Bytes(), &event); err != nil { //解析 JSONL 行失败,返回错误
|
||
return nil, fmt.Errorf("latencylog: decode event from %s: %w", path, err)
|
||
}
|
||
events = append(events, event)
|
||
}
|
||
if err := scanner.Err(); err != nil {
|
||
return nil, fmt.Errorf("latencylog: scan raw log %s: %w", path, err)
|
||
}
|
||
|
||
return events, nil
|
||
}
|
||
|
||
// SummarizeEvents 将原始事件整理成按消息分组的时延结果。
|
||
func SummarizeEvents(events []Event) []Summary {
|
||
grouped := make(map[messageKey]*Summary)
|
||
|
||
for _, event := range events {
|
||
if !IsBusinessEvent(event) {
|
||
continue
|
||
}
|
||
|
||
key := messageKey{
|
||
MessageType: event.MessageType,
|
||
MessageID: event.MessageID,
|
||
From: event.From,
|
||
To: event.To,
|
||
}
|
||
|
||
summary, ok := grouped[key]
|
||
if !ok {
|
||
summary = &Summary{
|
||
MessageType: event.MessageType,
|
||
MessageID: event.MessageID,
|
||
From: event.From,
|
||
To: event.To,
|
||
FileName: event.FileName,
|
||
BodySize: event.BodySize,
|
||
Timestamps: make(map[string]int64),
|
||
}
|
||
grouped[key] = summary
|
||
}
|
||
|
||
if summary.FileName == "" {
|
||
summary.FileName = event.FileName
|
||
}
|
||
if event.BodySize > 0 {
|
||
summary.BodySize = event.BodySize
|
||
}
|
||
|
||
if existing, exists := summary.Timestamps[event.Event]; !exists || event.TsUnixNano < existing {
|
||
summary.Timestamps[event.Event] = event.TsUnixNano
|
||
}
|
||
}
|
||
|
||
summaryPointers := make([]*Summary, 0, len(grouped))
|
||
for _, summary := range grouped {
|
||
completeSummary(summary) //补全时延指标和缺失时间戳信息
|
||
summaryPointers = append(summaryPointers, summary)
|
||
}
|
||
assignApproxRTTs(summaryPointers)
|
||
|
||
summaries := make([]Summary, 0, len(summaryPointers))
|
||
for _, summary := range summaryPointers {
|
||
summaries = append(summaries, *summary)
|
||
}
|
||
//对整理结果进行排序,先按发送方、再按接收方、再按消息 ID、最后按消息类型排序,保证输出的稳定性和可读性。
|
||
sort.Slice(summaries, func(i, j int) bool {
|
||
if summaries[i].From != summaries[j].From {
|
||
return summaries[i].From < summaries[j].From
|
||
}
|
||
if summaries[i].To != summaries[j].To {
|
||
return summaries[i].To < summaries[j].To
|
||
}
|
||
if summaries[i].MessageID != summaries[j].MessageID {
|
||
return summaries[i].MessageID < summaries[j].MessageID
|
||
}
|
||
return summaries[i].MessageType < summaries[j].MessageType
|
||
})
|
||
|
||
return summaries
|
||
}
|
||
|
||
// WriteSummariesJSONL 将整理结果写成 JSONL 汇总文件。
|
||
func WriteSummariesJSONL(path string, summaries []Summary) error {
|
||
if err := os.MkdirAll(filepath.Dir(path), 0o755); err != nil {
|
||
return fmt.Errorf("latencylog: create summary dir for %s: %w", path, err)
|
||
}
|
||
|
||
file, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0o644)
|
||
if err != nil {
|
||
return fmt.Errorf("latencylog: open summary file %s: %w", path, err)
|
||
}
|
||
defer file.Close()
|
||
|
||
writer := bufio.NewWriter(file)
|
||
for _, summary := range summaries { //将每条整理结果编码成 JSONL 行并写入文件
|
||
line, err := json.Marshal(summary)
|
||
if err != nil {
|
||
return fmt.Errorf("latencylog: encode summary for message %d: %w", summary.MessageID, err)
|
||
}
|
||
if _, err := writer.Write(append(line, '\n')); err != nil {
|
||
return fmt.Errorf("latencylog: write summary file %s: %w", path, err)
|
||
}
|
||
}
|
||
|
||
if err := writer.Flush(); err != nil { //将缓冲区内容写入文件
|
||
return fmt.Errorf("latencylog: flush summary file %s: %w", path, err)
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
// completeSummary 根据事件时间戳计算时延指标,并找出缺失的时间戳。
|
||
func completeSummary(summary *Summary) {
|
||
summary.MissingTimestamps = missingTimestampNames(summary.Timestamps)
|
||
|
||
if value := subtractIfPresent(summary.Timestamps, EventATXSched, EventAAppPrepBegin); value != nil {
|
||
summary.AProcessingLatencyNS = value
|
||
}
|
||
if value := subtractIfPresent(summary.Timestamps, EventATXSoftware, EventATXSched); value != nil {
|
||
summary.AQueueLatencyNS = value
|
||
}
|
||
if value := subtractIfPresent(summary.Timestamps, EventBAppRecv, EventATXSoftware); value != nil {
|
||
summary.ABTransportPropagationNS = value
|
||
}
|
||
if value := subtractIfPresent(summary.Timestamps, EventBAppRecv, EventBRXSoftware); value != nil {
|
||
summary.BKernelReceivePathLatencyNS = value
|
||
}
|
||
if value := subtractIfPresent(summary.Timestamps, EventBPersistEnd, EventBAppRecv); value != nil {
|
||
summary.BProcessingLatencyNS = value
|
||
}
|
||
if value := subtractIfPresent(summary.Timestamps, EventBPersistEnd, EventAAppPrepBegin); value != nil {
|
||
summary.EndToEndLatencyNS = value
|
||
}
|
||
|
||
summary.AProcessingBitrateBPS = calculateBitrateBPS(summary.BodySize, summary.AProcessingLatencyNS)
|
||
summary.ABTransportPropagationBitrateBPS = calculateBitrateBPS(summary.BodySize, summary.ABTransportPropagationNS)
|
||
summary.EndToEndBitrateBPS = calculateBitrateBPS(summary.BodySize, summary.EndToEndLatencyNS)
|
||
}
|
||
|
||
type routeKey struct {
|
||
From string
|
||
To string
|
||
}
|
||
|
||
func assignApproxRTTs(summaries []*Summary) {
|
||
grouped := make(map[routeKey][]*Summary)
|
||
for _, summary := range summaries {
|
||
grouped[routeKey{From: summary.From, To: summary.To}] = append(grouped[routeKey{From: summary.From, To: summary.To}], summary)
|
||
}
|
||
|
||
for key, requests := range grouped {
|
||
replies := grouped[routeKey{From: key.To, To: key.From}]
|
||
if len(replies) == 0 {
|
||
continue
|
||
}
|
||
|
||
assignApproxRTTsForRoute(
|
||
sortSummariesByTimestamp(requests, EventBAppRecv),
|
||
sortSummariesByTimestamp(replies, EventATXSoftware),
|
||
)
|
||
}
|
||
}
|
||
|
||
func assignApproxRTTsForRoute(requests, replies []*Summary) {
|
||
replyIndex := 0
|
||
for _, request := range requests {
|
||
requestReceivedAtResponder, ok := request.Timestamps[EventBAppRecv]
|
||
if !ok {
|
||
continue
|
||
}
|
||
|
||
for replyIndex < len(replies) {
|
||
reply := replies[replyIndex]
|
||
replySentAtResponder, ok := reply.Timestamps[EventATXSoftware]
|
||
if !ok {
|
||
replyIndex++
|
||
continue
|
||
}
|
||
if replySentAtResponder < requestReceivedAtResponder {
|
||
replyIndex++
|
||
continue
|
||
}
|
||
|
||
if value := subtractSummaryTimestamps(reply, EventBAppRecv, request, EventATXSoftware); value != nil {
|
||
request.ApproxRTTNS = value
|
||
}
|
||
replyIndex++
|
||
break
|
||
}
|
||
}
|
||
}
|
||
|
||
func sortSummariesByTimestamp(summaries []*Summary, eventName string) []*Summary {
|
||
sorted := append([]*Summary(nil), summaries...)
|
||
sort.SliceStable(sorted, func(i, j int) bool {
|
||
leftTS, leftOK := sorted[i].Timestamps[eventName]
|
||
rightTS, rightOK := sorted[j].Timestamps[eventName]
|
||
switch {
|
||
case leftOK && rightOK:
|
||
if leftTS != rightTS {
|
||
return leftTS < rightTS
|
||
}
|
||
case leftOK:
|
||
return true
|
||
case rightOK:
|
||
return false
|
||
}
|
||
|
||
if sorted[i].MessageID != sorted[j].MessageID {
|
||
return sorted[i].MessageID < sorted[j].MessageID
|
||
}
|
||
if sorted[i].From != sorted[j].From {
|
||
return sorted[i].From < sorted[j].From
|
||
}
|
||
if sorted[i].To != sorted[j].To {
|
||
return sorted[i].To < sorted[j].To
|
||
}
|
||
|
||
return sorted[i].MessageType < sorted[j].MessageType
|
||
})
|
||
return sorted
|
||
}
|
||
|
||
// 返回 requiredTimestampNames 中哪些在给定的 timestamps 中缺失。
|
||
func missingTimestampNames(timestamps map[string]int64) []string {
|
||
var missing []string
|
||
for _, name := range requiredTimestampNames {
|
||
if _, ok := timestamps[name]; !ok {
|
||
missing = append(missing, name)
|
||
}
|
||
}
|
||
|
||
return missing
|
||
}
|
||
|
||
// 如果 timestamps 中同时存在 endName 和 beginName,则返回它们的差值;否则返回 nil。
|
||
func subtractIfPresent(timestamps map[string]int64, endName, beginName string) *int64 {
|
||
end, ok := timestamps[endName]
|
||
if !ok {
|
||
return nil
|
||
}
|
||
begin, ok := timestamps[beginName]
|
||
if !ok {
|
||
return nil
|
||
}
|
||
|
||
value := end - begin
|
||
return &value
|
||
}
|
||
|
||
func subtractSummaryTimestamps(endSummary *Summary, endName string, beginSummary *Summary, beginName string) *int64 {
|
||
end, ok := endSummary.Timestamps[endName]
|
||
if !ok {
|
||
return nil
|
||
}
|
||
begin, ok := beginSummary.Timestamps[beginName]
|
||
if !ok {
|
||
return nil
|
||
}
|
||
|
||
value := end - begin
|
||
return &value
|
||
}
|
||
|
||
// 除法函数,如果 bodySize <= 0 或 latencyNS 不存在或 <= 0,则返回 nil;否则返回 bodySize / latencyNS 的结果。
|
||
func calculateBitrateBPS(bodySize int, latencyNS *int64) *float64 {
|
||
if bodySize <= 0 || latencyNS == nil || *latencyNS <= 0 {
|
||
return nil
|
||
}
|
||
|
||
value := float64(bodySize) * 8 * 1_000_000_000 / float64(*latencyNS)
|
||
return &value
|
||
}
|
||
|
||
// 判断事件是否是业务相关的时延事件(其中一项)
|
||
func IsBusinessEvent(event Event) bool {
|
||
switch event.Event {
|
||
case EventAAppPrepBegin,
|
||
EventATXSched,
|
||
EventATXSoftware,
|
||
EventATXHardware,
|
||
EventBRXHardware,
|
||
EventBRXSoftware,
|
||
EventBAppRecv,
|
||
EventBPersistBegin,
|
||
EventBPersistEnd:
|
||
return true
|
||
default:
|
||
return false
|
||
}
|
||
}
|