From f0d297f27274a9dd82adc555d070786483b61438 Mon Sep 17 00:00:00 2001 From: nnbcccscdscdsc <2709767634@qq.com> Date: Tue, 24 Mar 2026 16:05:14 +0800 Subject: [PATCH] =?UTF-8?q?feat=EF=BC=9A=E6=96=B0=E5=A2=9E=E8=87=AA?= =?UTF-8?q?=E5=8A=A8=E5=8C=96=E6=8B=89=E5=8F=96=E5=B9=B6=E6=B1=87=E6=80=BB?= =?UTF-8?q?=E5=88=86=E6=9E=90=E6=95=B0=E6=8D=AE=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 3 + cmd/internal/latencylog/summary.go | 81 ++++++++++ cmd/internal/latencylog/summary_test.go | 196 ++++++++++++++++++++++++ cmd/latencysummary/main.go | 10 +- scripts/refresh-latency-summary.sh | 122 +++++++++++++++ 5 files changed, 411 insertions(+), 1 deletion(-) create mode 100755 scripts/refresh-latency-summary.sh diff --git a/README.md b/README.md index b9dad6c..8c78557 100644 --- a/README.md +++ b/README.md @@ -79,3 +79,6 @@ text peer-a hi file peer-b /tmp/test.bin quit ``` +### 自动化拉取更新汇总数据 +cd /home/limingjie/LMJ_Work/OmniSocketGo +./scripts/refresh-latency-summary.sh \ No newline at end of file diff --git a/cmd/internal/latencylog/summary.go b/cmd/internal/latencylog/summary.go index 89a1bba..dd825d1 100644 --- a/cmd/internal/latencylog/summary.go +++ b/cmd/internal/latencylog/summary.go @@ -66,6 +66,48 @@ func LoadEventsFromFiles(paths []string) ([]Event, error) { return events, nil } +// LoadEventsFromFilesWithSharedMaxOffset 从多个 JSONL 原始日志文件中加载事件, +// 并按每个输入文件的最大 message_id 计算共享截断点。 +func LoadEventsFromFilesWithSharedMaxOffset(paths []string, sharedMaxOffset uint64) ([]Event, *uint64, error) { + eventsByFile := make([][]Event, 0, len(paths)) + var minMaxMessageID uint64 + hasSharedMax := false + + for _, path := range paths { + fileEvents, err := LoadEventsFromFile(path) + if err != nil { + return nil, nil, err + } + + eventsByFile = append(eventsByFile, fileEvents) + + fileMaxMessageID, ok := maxBusinessMessageID(fileEvents) + if !ok { + return nil, nil, nil + } + if !hasSharedMax || fileMaxMessageID < minMaxMessageID { + minMaxMessageID = fileMaxMessageID + hasSharedMax = true + } + } + + if !hasSharedMax { + return nil, nil, nil + } + + cutoff, ok := subtractUint64(minMaxMessageID, sharedMaxOffset) + if !ok { + return []Event{}, nil, nil + } + + var events []Event + for _, fileEvents := range eventsByFile { + events = append(events, filterEventsByMaxMessageID(fileEvents, cutoff)...) + } + + return events, &cutoff, nil +} + // LoadEventsFromFile 从单个 JSONL 原始日志文件中加载事件。 func LoadEventsFromFile(path string) ([]Event, error) { file, err := os.Open(path) @@ -357,6 +399,45 @@ func calculateBitrateBPS(bodySize int, latencyNS *int64) *float64 { return &value } +// 最大 message_id 计算函数 +func maxBusinessMessageID(events []Event) (uint64, bool) { + var maxMessageID uint64 + hasBusinessMessage := false + + for _, event := range events { + if !IsBusinessEvent(event) { + continue + } + if !hasBusinessMessage || event.MessageID > maxMessageID { + maxMessageID = event.MessageID + hasBusinessMessage = true + } + } + + return maxMessageID, hasBusinessMessage +} + +// 根据 message_id 截断事件列表的函数 +func filterEventsByMaxMessageID(events []Event, maxMessageID uint64) []Event { + filtered := make([]Event, 0, len(events)) + for _, event := range events { + if event.MessageID > maxMessageID { + continue + } + filtered = append(filtered, event) + } + + return filtered +} + +func subtractUint64(value, offset uint64) (uint64, bool) { + if offset > value { + return 0, false + } + + return value - offset, true +} + // 判断事件是否是业务相关的时延事件(其中一项) func IsBusinessEvent(event Event) bool { switch event.Event { diff --git a/cmd/internal/latencylog/summary_test.go b/cmd/internal/latencylog/summary_test.go index cb4bc8b..f1bb8da 100644 --- a/cmd/internal/latencylog/summary_test.go +++ b/cmd/internal/latencylog/summary_test.go @@ -6,6 +6,7 @@ import ( "os" "path/filepath" "reflect" + "sort" "testing" "omnisocketgo/cmd/internal/protocol" @@ -188,6 +189,147 @@ func TestLoadAndWriteSummaryFiles(t *testing.T) { } } +func TestLoadEventsFromFilesWithSharedMaxOffsetFiltersToSharedCutoff(t *testing.T) { + t.Parallel() + + testCases := []struct { + name string + firstMessageIDs []uint64 + secondMessageIDs []uint64 + offset uint64 + wantCutoff *uint64 + wantMessageIDs []uint64 + }{ + { + name: "same max message id rolls back one", + firstMessageIDs: []uint64{1, 2, 3, 4, 5, 6, 7}, + secondMessageIDs: []uint64{1, 2, 3, 4, 5, 6, 7}, + offset: 1, + wantCutoff: uint64Ptr(6), + wantMessageIDs: []uint64{1, 2, 3, 4, 5, 6}, + }, + { + name: "smaller input max wins before rollback", + firstMessageIDs: []uint64{1, 2, 3, 4, 5, 6, 7, 8, 9}, + secondMessageIDs: []uint64{1, 2, 3, 4, 5, 6, 7}, + offset: 1, + wantCutoff: uint64Ptr(6), + wantMessageIDs: []uint64{1, 2, 3, 4, 5, 6}, + }, + { + name: "not enough shared messages yields empty result", + firstMessageIDs: []uint64{1}, + secondMessageIDs: []uint64{1}, + offset: 1, + wantCutoff: uint64Ptr(0), + wantMessageIDs: nil, + }, + } + + for _, tt := range testCases { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + tempDir := t.TempDir() + firstPath := filepath.Join(tempDir, "first.jsonl") + secondPath := filepath.Join(tempDir, "second.jsonl") + writeEventsJSONL(t, firstPath, testEventsForMessageIDs(tt.firstMessageIDs, "peer-a", "peer-b")) + writeEventsJSONL(t, secondPath, testEventsForMessageIDs(tt.secondMessageIDs, "peer-b", "peer-a")) + + events, cutoff, err := LoadEventsFromFilesWithSharedMaxOffset([]string{firstPath, secondPath}, tt.offset) + if err != nil { + t.Fatalf("LoadEventsFromFilesWithSharedMaxOffset() error = %v", err) + } + if !reflect.DeepEqual(cutoff, tt.wantCutoff) { + t.Fatalf("cutoff = %v, want %v", cutoff, tt.wantCutoff) + } + + if got := businessMessageIDs(events); !reflect.DeepEqual(got, tt.wantMessageIDs) { + t.Fatalf("message IDs = %v, want %v", got, tt.wantMessageIDs) + } + }) + } +} + +func TestLoadEventsFromFilesWithSharedMaxOffsetPreservesEarlierSummaries(t *testing.T) { + tempDir := t.TempDir() + firstPath := filepath.Join(tempDir, "first.jsonl") + secondPath := filepath.Join(tempDir, "second.jsonl") + + writeEventsJSONL(t, firstPath, []Event{ + {TsUnixNano: 100, Event: EventAAppPrepBegin, MessageType: protocol.MessageTypeText, MessageID: 1, From: "peer-a", To: "peer-b", BodySize: 320}, + {TsUnixNano: 120, Event: EventATXSched, MessageType: protocol.MessageTypeText, MessageID: 1, From: "peer-a", To: "peer-b", BodySize: 320}, + {TsUnixNano: 140, Event: EventATXSoftware, MessageType: protocol.MessageTypeText, MessageID: 1, From: "peer-a", To: "peer-b", BodySize: 320}, + {TsUnixNano: 180, Event: EventBRXSoftware, MessageType: protocol.MessageTypeText, MessageID: 1, From: "peer-a", To: "peer-b", BodySize: 320}, + {TsUnixNano: 220, Event: EventBAppRecv, MessageType: protocol.MessageTypeText, MessageID: 1, From: "peer-a", To: "peer-b", BodySize: 320}, + {TsUnixNano: 260, Event: EventBPersistEnd, MessageType: protocol.MessageTypeText, MessageID: 1, From: "peer-a", To: "peer-b", BodySize: 320}, + {TsUnixNano: 300, Event: EventAAppPrepBegin, MessageType: protocol.MessageTypeText, MessageID: 2, From: "peer-a", To: "peer-b", BodySize: 160}, + {TsUnixNano: 330, Event: EventATXSched, MessageType: protocol.MessageTypeText, MessageID: 2, From: "peer-a", To: "peer-b", BodySize: 160}, + {TsUnixNano: 360, Event: EventATXSoftware, MessageType: protocol.MessageTypeText, MessageID: 2, From: "peer-a", To: "peer-b", BodySize: 160}, + {TsUnixNano: 390, Event: EventBRXSoftware, MessageType: protocol.MessageTypeText, MessageID: 2, From: "peer-a", To: "peer-b", BodySize: 160}, + {TsUnixNano: 420, Event: EventBAppRecv, MessageType: protocol.MessageTypeText, MessageID: 2, From: "peer-a", To: "peer-b", BodySize: 160}, + {TsUnixNano: 470, Event: EventBPersistEnd, MessageType: protocol.MessageTypeText, MessageID: 2, From: "peer-a", To: "peer-b", BodySize: 160}, + {TsUnixNano: 500, Event: EventAAppPrepBegin, MessageType: protocol.MessageTypeText, MessageID: 3, From: "peer-a", To: "peer-b", BodySize: 80}, + {TsUnixNano: 520, Event: EventATXSched, MessageType: protocol.MessageTypeText, MessageID: 3, From: "peer-a", To: "peer-b", BodySize: 80}, + {TsUnixNano: 540, Event: EventATXSoftware, MessageType: protocol.MessageTypeText, MessageID: 3, From: "peer-a", To: "peer-b", BodySize: 80}, + {TsUnixNano: 560, Event: EventBRXSoftware, MessageType: protocol.MessageTypeText, MessageID: 3, From: "peer-a", To: "peer-b", BodySize: 80}, + {TsUnixNano: 580, Event: EventBAppRecv, MessageType: protocol.MessageTypeText, MessageID: 3, From: "peer-a", To: "peer-b", BodySize: 80}, + {TsUnixNano: 600, Event: EventBPersistEnd, MessageType: protocol.MessageTypeText, MessageID: 3, From: "peer-a", To: "peer-b", BodySize: 80}, + {TsUnixNano: 700, Event: EventAAppPrepBegin, MessageType: protocol.MessageTypeText, MessageID: 4, From: "peer-a", To: "peer-b", BodySize: 40}, + }) + writeEventsJSONL(t, secondPath, []Event{ + {TsUnixNano: 90, Event: EventAAppPrepBegin, MessageType: protocol.MessageTypeText, MessageID: 1, From: "peer-b", To: "peer-a", BodySize: 20}, + {TsUnixNano: 95, Event: EventATXSoftware, MessageType: protocol.MessageTypeText, MessageID: 1, From: "peer-b", To: "peer-a", BodySize: 20}, + {TsUnixNano: 150, Event: EventBAppRecv, MessageType: protocol.MessageTypeText, MessageID: 1, From: "peer-b", To: "peer-a", BodySize: 20}, + {TsUnixNano: 290, Event: EventAAppPrepBegin, MessageType: protocol.MessageTypeText, MessageID: 2, From: "peer-b", To: "peer-a", BodySize: 20}, + {TsUnixNano: 295, Event: EventATXSoftware, MessageType: protocol.MessageTypeText, MessageID: 2, From: "peer-b", To: "peer-a", BodySize: 20}, + {TsUnixNano: 350, Event: EventBAppRecv, MessageType: protocol.MessageTypeText, MessageID: 2, From: "peer-b", To: "peer-a", BodySize: 20}, + {TsUnixNano: 490, Event: EventAAppPrepBegin, MessageType: protocol.MessageTypeText, MessageID: 3, From: "peer-b", To: "peer-a", BodySize: 20}, + {TsUnixNano: 495, Event: EventATXSoftware, MessageType: protocol.MessageTypeText, MessageID: 3, From: "peer-b", To: "peer-a", BodySize: 20}, + {TsUnixNano: 550, Event: EventBAppRecv, MessageType: protocol.MessageTypeText, MessageID: 3, From: "peer-b", To: "peer-a", BodySize: 20}, + {TsUnixNano: 690, Event: EventAAppPrepBegin, MessageType: protocol.MessageTypeText, MessageID: 4, From: "peer-b", To: "peer-a", BodySize: 20}, + }) + + events, cutoff, err := LoadEventsFromFilesWithSharedMaxOffset([]string{firstPath, secondPath}, 1) + if err != nil { + t.Fatalf("LoadEventsFromFilesWithSharedMaxOffset() error = %v", err) + } + if !reflect.DeepEqual(cutoff, uint64Ptr(3)) { + t.Fatalf("cutoff = %v, want %v", cutoff, uint64Ptr(3)) + } + + summaries := SummarizeEvents(events) + if got := len(summaries); got != 6 { + t.Fatalf("summary count = %d, want 6", got) + } + + for _, summary := range summaries { + if summary.MessageID == 4 { + t.Fatalf("message 4 should have been truncated from summaries: %+v", summary) + } + } + + var forwardMessageTwo Summary + found := false + for _, summary := range summaries { + if summary.From == "peer-a" && summary.To == "peer-b" && summary.MessageID == 2 { + forwardMessageTwo = summary + found = true + break + } + } + if !found { + t.Fatal("summary for message 2 peer-a -> peer-b not found") + } + if got := ptrValue(forwardMessageTwo.EndToEndLatencyNS); got != 170 { + t.Fatalf("message 2 EndToEndLatencyNS = %d, want 170", got) + } + if got := ptrValue(forwardMessageTwo.ApproxRTTNS); got != 190 { + t.Fatalf("message 2 ApproxRTTNS = %d, want 190", got) + } +} + func ptrValue(value *int64) int64 { if value == nil { return 0 @@ -201,3 +343,57 @@ func ptrValueFloat(value *float64) float64 { } return *value } + +func uint64Ptr(value uint64) *uint64 { + return &value +} + +func businessMessageIDs(events []Event) []uint64 { + seen := make(map[uint64]struct{}) + var ids []uint64 + + for _, event := range events { + if !IsBusinessEvent(event) { + continue + } + if _, ok := seen[event.MessageID]; ok { + continue + } + seen[event.MessageID] = struct{}{} + ids = append(ids, event.MessageID) + } + + sort.Slice(ids, func(i, j int) bool { + return ids[i] < ids[j] + }) + return ids +} + +func testEventsForMessageIDs(messageIDs []uint64, from, to string) []Event { + events := make([]Event, 0, len(messageIDs)*2) + for _, messageID := range messageIDs { + events = append(events, + Event{TsUnixNano: int64(messageID*100 + 10), Event: EventAAppPrepBegin, MessageType: protocol.MessageTypeText, MessageID: messageID, From: from, To: to, BodySize: 32}, + Event{TsUnixNano: int64(messageID*100 + 20), Event: EventATXSoftware, MessageType: protocol.MessageTypeText, MessageID: messageID, From: from, To: to, BodySize: 32}, + ) + } + + return events +} + +func writeEventsJSONL(t *testing.T, path string, events []Event) { + t.Helper() + + file, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0o644) + if err != nil { + t.Fatalf("os.OpenFile(%s) error = %v", path, err) + } + defer file.Close() + + encoder := json.NewEncoder(file) + for _, event := range events { + if err := encoder.Encode(event); err != nil { + t.Fatalf("encoder.Encode(%s) error = %v", path, err) + } + } +} diff --git a/cmd/latencysummary/main.go b/cmd/latencysummary/main.go index 76c4df1..1e5eac4 100644 --- a/cmd/latencysummary/main.go +++ b/cmd/latencysummary/main.go @@ -23,6 +23,8 @@ func (f *stringListFlag) Set(value string) error { func main() { var inputPaths stringListFlag outputPath := flag.String("output", "latency-summary.jsonl", "output JSONL file for summarized latency metrics") + // shared-max-offset 是一个可选参数,用于在对齐输入文件的 per-file max message_id 后,排除掉最新的共享 message_id 以外的记录。它指定了要排除的共享 message_id 的数量。 + sharedMaxOffset := flag.Uint64("shared-max-offset", 1, "number of newest shared message IDs to exclude after aligning inputs by per-file max message_id") flag.Var(&inputPaths, "input", "raw latency JSONL file path; can be provided multiple times") flag.Parse() @@ -30,10 +32,16 @@ func main() { log.Fatal("at least one -input raw latency log file is required") } - events, err := latencylog.LoadEventsFromFiles(inputPaths) + events, sharedMaxMessageID, err := latencylog.LoadEventsFromFilesWithSharedMaxOffset(inputPaths, *sharedMaxOffset) if err != nil { log.Fatalf("load raw latency logs: %v", err) } + // sharedMaxMessageID 可能为 nil,表示没有可用的共享 message_id 截止值(例如因为输入文件中没有共享消息)。在这种情况下,我们将继续处理所有事件,但会记录一个警告。 + if sharedMaxMessageID != nil { + log.Printf("using shared message_id cutoff <= %d (shared-max-offset=%d)", *sharedMaxMessageID, *sharedMaxOffset) + } else { + log.Printf("no shared message_id cutoff available after applying shared-max-offset=%d", *sharedMaxOffset) + } summaries := latencylog.SummarizeEvents(events) if err := latencylog.WriteSummariesJSONL(*outputPath, summaries); err != nil { diff --git a/scripts/refresh-latency-summary.sh b/scripts/refresh-latency-summary.sh new file mode 100755 index 0000000..9ee2054 --- /dev/null +++ b/scripts/refresh-latency-summary.sh @@ -0,0 +1,122 @@ +#!/usr/bin/env bash + +set -u +set -o pipefail + +repo_dir="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)" +remote_source="boll@175.178.116.187:/home/boll/LMJWork/OmniSocketGo/peer-b-latency.jsonl" +local_peer_a="$repo_dir/peer-a-latency.jsonl" +local_peer_b="$repo_dir/peer-b-latency.jsonl" +summary_output="$repo_dir/latency-summary.jsonl" +chart_output="$repo_dir/latency-summary.html" +latency_binary="$repo_dir/bin/latencysummary" +go_cache_dir="${GOCACHE:-/tmp/omnisocketgo-go-build}" +poll_interval_seconds=1 + +remote_tmp="" +summary_tmp="" +chart_tmp="" + +log() { + printf '[%s] %s\n' "$(date '+%Y-%m-%d %H:%M:%S')" "$*" +} + +cleanup_temp_file() { + local path="$1" + if [[ -n "$path" && -e "$path" ]]; then + rm -f "$path" + fi +} + +cleanup() { + cleanup_temp_file "$remote_tmp" + cleanup_temp_file "$summary_tmp" + cleanup_temp_file "$chart_tmp" +} + +trap cleanup EXIT INT TERM + +cd "$repo_dir" || exit 1 + +mkdir -p "$repo_dir/bin" +mkdir -p "$go_cache_dir" +if ! GOCACHE="$go_cache_dir" go build -o "$latency_binary" ./cmd/latencysummary; then + log "build failed; exiting" + exit 1 +fi + +log "starting 1-second refresh loop" + +while true; do + remote_tmp="$(mktemp "$repo_dir/peer-b-latency.jsonl.tmp.XXXXXX")" || exit 1 + if scp -P 10022 "$remote_source" "$remote_tmp"; then + if mv -f "$remote_tmp" "$local_peer_b"; then + remote_tmp="" + else + status=$? + log "failed to replace $(basename "$local_peer_b") after scp (exit $status)" + cleanup_temp_file "$remote_tmp" + remote_tmp="" + sleep "$poll_interval_seconds" + continue + fi + else + status=$? + log "scp refresh failed (exit $status)" + cleanup_temp_file "$remote_tmp" + remote_tmp="" + sleep "$poll_interval_seconds" + continue + fi + + summary_tmp="$(mktemp "$repo_dir/latency-summary.tmp.XXXXXX.jsonl")" || exit 1 + chart_tmp="${summary_tmp%.jsonl}.html" + if "$latency_binary" \ + -input "$local_peer_a" \ + -input "$local_peer_b" \ + -shared-max-offset 1 \ + -output "$summary_tmp"; then + if [[ ! -f "$summary_tmp" || ! -f "$chart_tmp" ]]; then + log "summary succeeded but temporary outputs are incomplete" + cleanup_temp_file "$summary_tmp" + cleanup_temp_file "$chart_tmp" + summary_tmp="" + chart_tmp="" + sleep "$poll_interval_seconds" + continue + fi + + if ! mv -f "$summary_tmp" "$summary_output"; then + status=$? + log "failed to replace $(basename "$summary_output") (exit $status)" + cleanup_temp_file "$summary_tmp" + cleanup_temp_file "$chart_tmp" + summary_tmp="" + chart_tmp="" + sleep "$poll_interval_seconds" + continue + fi + summary_tmp="" + + if ! mv -f "$chart_tmp" "$chart_output"; then + status=$? + log "failed to replace $(basename "$chart_output") (exit $status)" + cleanup_temp_file "$chart_tmp" + chart_tmp="" + sleep "$poll_interval_seconds" + continue + fi + chart_tmp="" + else + status=$? + log "latency summary refresh failed (exit $status)" + cleanup_temp_file "$summary_tmp" + cleanup_temp_file "$chart_tmp" + summary_tmp="" + chart_tmp="" + sleep "$poll_interval_seconds" + continue + fi + + sleep "$poll_interval_seconds" +done