package latencylog import ( "bufio" "encoding/json" "os" "path/filepath" "reflect" "sort" "testing" "omnisocketgo/cmd/internal/protocol" ) func TestSummarizeEventsComputesLatencyMetrics(t *testing.T) { events := []Event{ {TsUnixNano: 100, Event: EventAAppPrepBegin, MessageType: protocol.MessageTypeText, MessageID: 1, From: "peer-a", To: "peer-b", BodySize: 320}, {TsUnixNano: 120, Event: EventATXSched, MessageType: protocol.MessageTypeText, MessageID: 1, From: "peer-a", To: "peer-b", BodySize: 320}, {TsUnixNano: 140, Event: EventATXSoftware, MessageType: protocol.MessageTypeText, MessageID: 1, From: "peer-a", To: "peer-b", BodySize: 320}, {TsUnixNano: 180, Event: EventBRXSoftware, MessageType: protocol.MessageTypeText, MessageID: 1, From: "peer-a", To: "peer-b", BodySize: 320}, {TsUnixNano: 220, Event: EventBAppRecv, MessageType: protocol.MessageTypeText, MessageID: 1, From: "peer-a", To: "peer-b", BodySize: 320}, {TsUnixNano: 230, Event: EventBPersistBegin, MessageType: protocol.MessageTypeText, MessageID: 1, From: "peer-a", To: "peer-b", BodySize: 320}, {TsUnixNano: 260, Event: EventBPersistEnd, MessageType: protocol.MessageTypeText, MessageID: 1, From: "peer-a", To: "peer-b", BodySize: 320}, } summaries := SummarizeEvents(events) if len(summaries) != 1 { t.Fatalf("summary count = %d, want 1", len(summaries)) } summary := summaries[0] if got := ptrValue(summary.AProcessingLatencyNS); got != 20 { t.Fatalf("AProcessingLatencyNS = %d, want 20", got) } if got := ptrValue(summary.AQueueLatencyNS); got != 20 { t.Fatalf("AQueueLatencyNS = %d, want 20", got) } if got := ptrValue(summary.ABTransportPropagationNS); got != 80 { t.Fatalf("ABTransportPropagationNS = %d, want 80", got) } if got := ptrValue(summary.BKernelReceivePathLatencyNS); got != 40 { t.Fatalf("BKernelReceivePathLatencyNS = %d, want 40", got) } if got := ptrValue(summary.BProcessingLatencyNS); got != 40 { t.Fatalf("BProcessingLatencyNS = %d, want 40", got) } if got := ptrValue(summary.EndToEndLatencyNS); got != 160 { t.Fatalf("EndToEndLatencyNS = %d, want 160", got) } if got := ptrValueFloat(summary.AProcessingBitrateBPS); got != 128_000_000_000 { t.Fatalf("AProcessingBitrateBPS = %v, want 128000000000", got) } if got := ptrValueFloat(summary.ABTransportPropagationBitrateBPS); got != 32_000_000_000 { t.Fatalf("ABTransportPropagationBitrateBPS = %v, want 32000000000", got) } if got := ptrValueFloat(summary.EndToEndBitrateBPS); got != 16_000_000_000 { t.Fatalf("EndToEndBitrateBPS = %v, want 16000000000", got) } if got := summary.Timestamps[EventBRXSoftware]; got != 180 { t.Fatalf("timestamps[%q] = %d, want 180", EventBRXSoftware, got) } if len(summary.MissingTimestamps) != 0 { t.Fatalf("MissingTimestamps = %v, want empty", summary.MissingTimestamps) } } func TestSummarizeEventsComputesApproxRTTByPairingReverseMessages(t *testing.T) { events := []Event{ {TsUnixNano: 100, Event: EventAAppPrepBegin, MessageType: protocol.MessageTypeText, MessageID: 1, From: "peer-a", To: "peer-b"}, {TsUnixNano: 110, Event: EventATXSoftware, MessageType: protocol.MessageTypeText, MessageID: 1, From: "peer-a", To: "peer-b"}, {TsUnixNano: 180, Event: EventBAppRecv, MessageType: protocol.MessageTypeText, MessageID: 1, From: "peer-a", To: "peer-b"}, {TsUnixNano: 120, Event: EventAAppPrepBegin, MessageType: protocol.MessageTypeText, MessageID: 2, From: "peer-a", To: "peer-b"}, {TsUnixNano: 140, Event: EventATXSoftware, MessageType: protocol.MessageTypeText, MessageID: 2, From: "peer-a", To: "peer-b"}, {TsUnixNano: 190, Event: EventBAppRecv, MessageType: protocol.MessageTypeText, MessageID: 2, From: "peer-a", To: "peer-b"}, {TsUnixNano: 200, Event: EventAAppPrepBegin, MessageType: protocol.MessageTypeText, MessageID: 11, From: "peer-b", To: "peer-a"}, {TsUnixNano: 210, Event: EventATXSoftware, MessageType: protocol.MessageTypeText, MessageID: 11, From: "peer-b", To: "peer-a"}, {TsUnixNano: 260, Event: EventBAppRecv, MessageType: protocol.MessageTypeText, MessageID: 11, From: "peer-b", To: "peer-a"}, {TsUnixNano: 220, Event: EventAAppPrepBegin, MessageType: protocol.MessageTypeText, MessageID: 12, From: "peer-b", To: "peer-a"}, {TsUnixNano: 230, Event: EventATXSoftware, MessageType: protocol.MessageTypeText, MessageID: 12, From: "peer-b", To: "peer-a"}, {TsUnixNano: 310, Event: EventBAppRecv, MessageType: protocol.MessageTypeText, MessageID: 12, From: "peer-b", To: "peer-a"}, } summaries := SummarizeEvents(events) if len(summaries) != 4 { t.Fatalf("summary count = %d, want 4", len(summaries)) } gotByMessageID := make(map[uint64]Summary, len(summaries)) for _, summary := range summaries { gotByMessageID[summary.MessageID] = summary } if got := ptrValue(gotByMessageID[1].ApproxRTTNS); got != 150 { t.Fatalf("message 1 ApproxRTTNS = %d, want 150", got) } if got := ptrValue(gotByMessageID[2].ApproxRTTNS); got != 170 { t.Fatalf("message 2 ApproxRTTNS = %d, want 170", got) } if gotByMessageID[11].ApproxRTTNS != nil { t.Fatalf("message 11 ApproxRTTNS = %d, want nil", ptrValue(gotByMessageID[11].ApproxRTTNS)) } if gotByMessageID[12].ApproxRTTNS != nil { t.Fatalf("message 12 ApproxRTTNS = %d, want nil", ptrValue(gotByMessageID[12].ApproxRTTNS)) } } func TestSummarizeEventsReportsMissingTimestamps(t *testing.T) { events := []Event{ {TsUnixNano: 100, Event: EventAAppPrepBegin, MessageType: protocol.MessageTypeText, MessageID: 2, From: "peer-a", To: "peer-b"}, {TsUnixNano: 240, Event: EventBPersistEnd, MessageType: protocol.MessageTypeText, MessageID: 2, From: "peer-a", To: "peer-b"}, } summaries := SummarizeEvents(events) if len(summaries) != 1 { t.Fatalf("summary count = %d, want 1", len(summaries)) } wantMissing := []string{EventATXSched, EventATXSoftware, EventBRXSoftware, EventBAppRecv} if !reflect.DeepEqual(summaries[0].MissingTimestamps, wantMissing) { t.Fatalf("MissingTimestamps = %v, want %v", summaries[0].MissingTimestamps, wantMissing) } if summaries[0].AProcessingLatencyNS != nil { t.Fatalf("AProcessingLatencyNS = %v, want nil", ptrValue(summaries[0].AProcessingLatencyNS)) } if summaries[0].EndToEndLatencyNS == nil { t.Fatal("EndToEndLatencyNS = nil, want non-nil because endpoints are present") } } func TestLoadAndWriteSummaryFiles(t *testing.T) { rawPath := filepath.Join(t.TempDir(), "raw.jsonl") rawLogger, err := NewJSONLLogger(rawPath) if err != nil { t.Fatalf("NewJSONLLogger() error = %v", err) } t.Cleanup(func() { _ = rawLogger.Close() }) for _, event := range []Event{ {TsUnixNano: 100, Event: EventAAppPrepBegin, MessageType: protocol.MessageTypeText, MessageID: 3, From: "peer-a", To: "peer-b", BodySize: 320}, {TsUnixNano: 120, Event: EventATXSched, MessageType: protocol.MessageTypeText, MessageID: 3, From: "peer-a", To: "peer-b", BodySize: 320}, {TsUnixNano: 140, Event: EventATXSoftware, MessageType: protocol.MessageTypeText, MessageID: 3, From: "peer-a", To: "peer-b", BodySize: 320}, {TsUnixNano: 180, Event: EventBRXSoftware, MessageType: protocol.MessageTypeText, MessageID: 3, From: "peer-a", To: "peer-b", BodySize: 320}, {TsUnixNano: 220, Event: EventBAppRecv, MessageType: protocol.MessageTypeText, MessageID: 3, From: "peer-a", To: "peer-b", BodySize: 320}, {TsUnixNano: 260, Event: EventBPersistEnd, MessageType: protocol.MessageTypeText, MessageID: 3, From: "peer-a", To: "peer-b", BodySize: 320}, } { if err := rawLogger.LogEvent(event); err != nil { t.Fatalf("LogEvent() error = %v", err) } } events, err := LoadEventsFromFile(rawPath) if err != nil { t.Fatalf("LoadEventsFromFile() error = %v", err) } summaryPath := filepath.Join(t.TempDir(), "summary.jsonl") if err := WriteSummariesJSONL(summaryPath, SummarizeEvents(events)); err != nil { t.Fatalf("WriteSummariesJSONL() error = %v", err) } file, err := os.Open(summaryPath) if err != nil { t.Fatalf("os.Open() error = %v", err) } defer file.Close() scanner := bufio.NewScanner(file) if !scanner.Scan() { t.Fatal("expected one summary line, got none") } var summary Summary if err := json.Unmarshal(scanner.Bytes(), &summary); err != nil { t.Fatalf("json.Unmarshal() error = %v", err) } if summary.MessageID != 3 { t.Fatalf("MessageID = %d, want 3", summary.MessageID) } if got := ptrValue(summary.BKernelReceivePathLatencyNS); got != 40 { t.Fatalf("BKernelReceivePathLatencyNS = %d, want 40", got) } if got := ptrValue(summary.EndToEndLatencyNS); got != 160 { t.Fatalf("EndToEndLatencyNS = %d, want 160", got) } if got := ptrValueFloat(summary.EndToEndBitrateBPS); got != 16_000_000_000 { t.Fatalf("EndToEndBitrateBPS = %v, want 16000000000", got) } } func TestLoadEventsFromFilesWithSharedMaxOffsetFiltersToSharedCutoff(t *testing.T) { t.Parallel() testCases := []struct { name string firstMessageIDs []uint64 secondMessageIDs []uint64 offset uint64 wantCutoff *uint64 wantMessageIDs []uint64 }{ { name: "same max message id rolls back one", firstMessageIDs: []uint64{1, 2, 3, 4, 5, 6, 7}, secondMessageIDs: []uint64{1, 2, 3, 4, 5, 6, 7}, offset: 1, wantCutoff: uint64Ptr(6), wantMessageIDs: []uint64{1, 2, 3, 4, 5, 6}, }, { name: "smaller input max wins before rollback", firstMessageIDs: []uint64{1, 2, 3, 4, 5, 6, 7, 8, 9}, secondMessageIDs: []uint64{1, 2, 3, 4, 5, 6, 7}, offset: 1, wantCutoff: uint64Ptr(6), wantMessageIDs: []uint64{1, 2, 3, 4, 5, 6}, }, { name: "not enough shared messages yields empty result", firstMessageIDs: []uint64{1}, secondMessageIDs: []uint64{1}, offset: 1, wantCutoff: uint64Ptr(0), wantMessageIDs: nil, }, } for _, tt := range testCases { tt := tt t.Run(tt.name, func(t *testing.T) { t.Parallel() tempDir := t.TempDir() firstPath := filepath.Join(tempDir, "first.jsonl") secondPath := filepath.Join(tempDir, "second.jsonl") writeEventsJSONL(t, firstPath, testEventsForMessageIDs(tt.firstMessageIDs, "peer-a", "peer-b")) writeEventsJSONL(t, secondPath, testEventsForMessageIDs(tt.secondMessageIDs, "peer-b", "peer-a")) events, cutoff, err := LoadEventsFromFilesWithSharedMaxOffset([]string{firstPath, secondPath}, tt.offset) if err != nil { t.Fatalf("LoadEventsFromFilesWithSharedMaxOffset() error = %v", err) } if !reflect.DeepEqual(cutoff, tt.wantCutoff) { t.Fatalf("cutoff = %v, want %v", cutoff, tt.wantCutoff) } if got := businessMessageIDs(events); !reflect.DeepEqual(got, tt.wantMessageIDs) { t.Fatalf("message IDs = %v, want %v", got, tt.wantMessageIDs) } }) } } func TestLoadEventsFromFilesWithSharedMaxOffsetPreservesEarlierSummaries(t *testing.T) { tempDir := t.TempDir() firstPath := filepath.Join(tempDir, "first.jsonl") secondPath := filepath.Join(tempDir, "second.jsonl") writeEventsJSONL(t, firstPath, []Event{ {TsUnixNano: 100, Event: EventAAppPrepBegin, MessageType: protocol.MessageTypeText, MessageID: 1, From: "peer-a", To: "peer-b", BodySize: 320}, {TsUnixNano: 120, Event: EventATXSched, MessageType: protocol.MessageTypeText, MessageID: 1, From: "peer-a", To: "peer-b", BodySize: 320}, {TsUnixNano: 140, Event: EventATXSoftware, MessageType: protocol.MessageTypeText, MessageID: 1, From: "peer-a", To: "peer-b", BodySize: 320}, {TsUnixNano: 180, Event: EventBRXSoftware, MessageType: protocol.MessageTypeText, MessageID: 1, From: "peer-a", To: "peer-b", BodySize: 320}, {TsUnixNano: 220, Event: EventBAppRecv, MessageType: protocol.MessageTypeText, MessageID: 1, From: "peer-a", To: "peer-b", BodySize: 320}, {TsUnixNano: 260, Event: EventBPersistEnd, MessageType: protocol.MessageTypeText, MessageID: 1, From: "peer-a", To: "peer-b", BodySize: 320}, {TsUnixNano: 300, Event: EventAAppPrepBegin, MessageType: protocol.MessageTypeText, MessageID: 2, From: "peer-a", To: "peer-b", BodySize: 160}, {TsUnixNano: 330, Event: EventATXSched, MessageType: protocol.MessageTypeText, MessageID: 2, From: "peer-a", To: "peer-b", BodySize: 160}, {TsUnixNano: 360, Event: EventATXSoftware, MessageType: protocol.MessageTypeText, MessageID: 2, From: "peer-a", To: "peer-b", BodySize: 160}, {TsUnixNano: 390, Event: EventBRXSoftware, MessageType: protocol.MessageTypeText, MessageID: 2, From: "peer-a", To: "peer-b", BodySize: 160}, {TsUnixNano: 420, Event: EventBAppRecv, MessageType: protocol.MessageTypeText, MessageID: 2, From: "peer-a", To: "peer-b", BodySize: 160}, {TsUnixNano: 470, Event: EventBPersistEnd, MessageType: protocol.MessageTypeText, MessageID: 2, From: "peer-a", To: "peer-b", BodySize: 160}, {TsUnixNano: 500, Event: EventAAppPrepBegin, MessageType: protocol.MessageTypeText, MessageID: 3, From: "peer-a", To: "peer-b", BodySize: 80}, {TsUnixNano: 520, Event: EventATXSched, MessageType: protocol.MessageTypeText, MessageID: 3, From: "peer-a", To: "peer-b", BodySize: 80}, {TsUnixNano: 540, Event: EventATXSoftware, MessageType: protocol.MessageTypeText, MessageID: 3, From: "peer-a", To: "peer-b", BodySize: 80}, {TsUnixNano: 560, Event: EventBRXSoftware, MessageType: protocol.MessageTypeText, MessageID: 3, From: "peer-a", To: "peer-b", BodySize: 80}, {TsUnixNano: 580, Event: EventBAppRecv, MessageType: protocol.MessageTypeText, MessageID: 3, From: "peer-a", To: "peer-b", BodySize: 80}, {TsUnixNano: 600, Event: EventBPersistEnd, MessageType: protocol.MessageTypeText, MessageID: 3, From: "peer-a", To: "peer-b", BodySize: 80}, {TsUnixNano: 700, Event: EventAAppPrepBegin, MessageType: protocol.MessageTypeText, MessageID: 4, From: "peer-a", To: "peer-b", BodySize: 40}, }) writeEventsJSONL(t, secondPath, []Event{ {TsUnixNano: 90, Event: EventAAppPrepBegin, MessageType: protocol.MessageTypeText, MessageID: 1, From: "peer-b", To: "peer-a", BodySize: 20}, {TsUnixNano: 95, Event: EventATXSoftware, MessageType: protocol.MessageTypeText, MessageID: 1, From: "peer-b", To: "peer-a", BodySize: 20}, {TsUnixNano: 150, Event: EventBAppRecv, MessageType: protocol.MessageTypeText, MessageID: 1, From: "peer-b", To: "peer-a", BodySize: 20}, {TsUnixNano: 290, Event: EventAAppPrepBegin, MessageType: protocol.MessageTypeText, MessageID: 2, From: "peer-b", To: "peer-a", BodySize: 20}, {TsUnixNano: 295, Event: EventATXSoftware, MessageType: protocol.MessageTypeText, MessageID: 2, From: "peer-b", To: "peer-a", BodySize: 20}, {TsUnixNano: 350, Event: EventBAppRecv, MessageType: protocol.MessageTypeText, MessageID: 2, From: "peer-b", To: "peer-a", BodySize: 20}, {TsUnixNano: 490, Event: EventAAppPrepBegin, MessageType: protocol.MessageTypeText, MessageID: 3, From: "peer-b", To: "peer-a", BodySize: 20}, {TsUnixNano: 495, Event: EventATXSoftware, MessageType: protocol.MessageTypeText, MessageID: 3, From: "peer-b", To: "peer-a", BodySize: 20}, {TsUnixNano: 550, Event: EventBAppRecv, MessageType: protocol.MessageTypeText, MessageID: 3, From: "peer-b", To: "peer-a", BodySize: 20}, {TsUnixNano: 690, Event: EventAAppPrepBegin, MessageType: protocol.MessageTypeText, MessageID: 4, From: "peer-b", To: "peer-a", BodySize: 20}, }) events, cutoff, err := LoadEventsFromFilesWithSharedMaxOffset([]string{firstPath, secondPath}, 1) if err != nil { t.Fatalf("LoadEventsFromFilesWithSharedMaxOffset() error = %v", err) } if !reflect.DeepEqual(cutoff, uint64Ptr(3)) { t.Fatalf("cutoff = %v, want %v", cutoff, uint64Ptr(3)) } summaries := SummarizeEvents(events) if got := len(summaries); got != 6 { t.Fatalf("summary count = %d, want 6", got) } for _, summary := range summaries { if summary.MessageID == 4 { t.Fatalf("message 4 should have been truncated from summaries: %+v", summary) } } var forwardMessageTwo Summary found := false for _, summary := range summaries { if summary.From == "peer-a" && summary.To == "peer-b" && summary.MessageID == 2 { forwardMessageTwo = summary found = true break } } if !found { t.Fatal("summary for message 2 peer-a -> peer-b not found") } if got := ptrValue(forwardMessageTwo.EndToEndLatencyNS); got != 170 { t.Fatalf("message 2 EndToEndLatencyNS = %d, want 170", got) } if got := ptrValue(forwardMessageTwo.ApproxRTTNS); got != 190 { t.Fatalf("message 2 ApproxRTTNS = %d, want 190", got) } } func ptrValue(value *int64) int64 { if value == nil { return 0 } return *value } func ptrValueFloat(value *float64) float64 { if value == nil { return 0 } return *value } func uint64Ptr(value uint64) *uint64 { return &value } func businessMessageIDs(events []Event) []uint64 { seen := make(map[uint64]struct{}) var ids []uint64 for _, event := range events { if !IsBusinessEvent(event) { continue } if _, ok := seen[event.MessageID]; ok { continue } seen[event.MessageID] = struct{}{} ids = append(ids, event.MessageID) } sort.Slice(ids, func(i, j int) bool { return ids[i] < ids[j] }) return ids } func testEventsForMessageIDs(messageIDs []uint64, from, to string) []Event { events := make([]Event, 0, len(messageIDs)*2) for _, messageID := range messageIDs { events = append(events, Event{TsUnixNano: int64(messageID*100 + 10), Event: EventAAppPrepBegin, MessageType: protocol.MessageTypeText, MessageID: messageID, From: from, To: to, BodySize: 32}, Event{TsUnixNano: int64(messageID*100 + 20), Event: EventATXSoftware, MessageType: protocol.MessageTypeText, MessageID: messageID, From: from, To: to, BodySize: 32}, ) } return events } func writeEventsJSONL(t *testing.T, path string, events []Event) { t.Helper() file, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0o644) if err != nil { t.Fatalf("os.OpenFile(%s) error = %v", path, err) } defer file.Close() encoder := json.NewEncoder(file) for _, event := range events { if err := encoder.Encode(event); err != nil { t.Fatalf("encoder.Encode(%s) error = %v", path, err) } } }