diff --git a/README.md b/README.md index 25186be..b9dad6c 100644 --- a/README.md +++ b/README.md @@ -38,19 +38,7 @@ CGO_ENABLED=0 GOOS=linux GOARCH=arm GOARM=7 go build -o bin/server-linux-armv7 . CGO_ENABLED=0 GOOS=linux GOARCH=arm GOARM=7 go build -o bin/peer-linux-armv7 ./cmd/peer ``` -## Deploy -把对应架构的二进制拷到目标机器,并赋予可执行权限。 - -```bash -scp bin/server-linux-amd64 user@server-host:~/omnisocket/server -scp bin/peer-linux-amd64 user@peer-host:~/omnisocket/peer -``` - -```bash -ssh user@server-host 'chmod +x ~/omnisocket/server' -ssh user@peer-host 'chmod +x ~/omnisocket/peer' -``` ## Run On Different Machines diff --git a/cmd/internal/latencylog/summary.go b/cmd/internal/latencylog/summary.go index 13337b2..7d6429a 100644 --- a/cmd/internal/latencylog/summary.go +++ b/cmd/internal/latencylog/summary.go @@ -31,13 +31,14 @@ type Summary struct { BodySize int `json:"body_size"` //消息体大小(字节数) Timestamps map[string]int64 `json:"timestamps"` //事件时间戳,key 是事件名称,value 是 UnixNano 时间戳 - AProcessingLatencyNS *int64 `json:"a_processing_latency_ns,omitempty"` // A 处理时延:A_TX_SCHED - A_APP_PREP_BEGIN - AQueueLatencyNS *int64 `json:"a_queue_latency_ns,omitempty"` // A 排队时延:A_TX_SOFTWARE - A_TX_SCHED - ABTransportPropagationBQueueLatencyNS *int64 `json:"a_b_transport_propagation_b_queue_latency_ns,omitempty"` // A-B 传输时延 + B 排队时延:B_APP_RECV - A_TX_SOFTWARE - BKernelReceivePathLatencyNS *int64 `json:"b_kernel_receive_path_latency_ns,omitempty"` // B 内核接收路径近似:B_APP_RECV - B_RX_SOFTWARE - BProcessingLatencyNS *int64 `json:"b_processing_latency_ns,omitempty"` // B 处理时延:B_PERSIST_END - B_APP_RECV - EndToEndLatencyNS *int64 `json:"end_to_end_latency_ns,omitempty"` // 端到端时延:B_PERSIST_END - A_APP_PREP_BEGIN - MissingTimestamps []string `json:"missing_timestamps,omitempty"` // 缺失的时间戳列表,包含 requiredTimestampNames 中但在原始事件中没有的事件名称 + AProcessingLatencyNS *int64 `json:"a_processing_latency_ns,omitempty"` // A 处理时延:A_TX_SCHED - A_APP_PREP_BEGIN + AQueueLatencyNS *int64 `json:"a_queue_latency_ns,omitempty"` // A 排队时延:A_TX_SOFTWARE - A_TX_SCHED + ABTransportPropagationNS *int64 `json:"a_b_transport_propagation_ns,omitempty"` // A-B 传输+传播时延近似:B_APP_RECV - A_TX_SOFTWARE + BKernelReceivePathLatencyNS *int64 `json:"b_kernel_receive_path_latency_ns,omitempty"` // B 内核接收路径近似:B_APP_RECV - B_RX_SOFTWARE + BProcessingLatencyNS *int64 `json:"b_processing_latency_ns,omitempty"` // B 处理时延:B_PERSIST_END - B_APP_RECV + EndToEndLatencyNS *int64 `json:"end_to_end_latency_ns,omitempty"` // 端到端时延:B_PERSIST_END - A_APP_PREP_BEGIN + ApproxRTTNS *int64 `json:"approx_rtt_ns,omitempty"` // 近似 RTT:首条反向应答的 B_APP_RECV - 当前请求的 A_TX_SOFTWARE + MissingTimestamps []string `json:"missing_timestamps,omitempty"` // 缺失的时间戳列表,包含 requiredTimestampNames 中但在原始事件中没有的事件名称 } // LoadEventsFromFiles 从JSONL 原始日志文件中加载事件。 @@ -132,9 +133,15 @@ func SummarizeEvents(events []Event) []Summary { } } - summaries := make([]Summary, 0, len(grouped)) + summaryPointers := make([]*Summary, 0, len(grouped)) for _, summary := range grouped { completeSummary(summary) //补全时延指标和缺失时间戳信息 + summaryPointers = append(summaryPointers, summary) + } + assignApproxRTTs(summaryPointers) + + summaries := make([]Summary, 0, len(summaryPointers)) + for _, summary := range summaryPointers { summaries = append(summaries, *summary) } //对整理结果进行排序,先按发送方、再按接收方、再按消息 ID、最后按消息类型排序,保证输出的稳定性和可读性。 @@ -195,7 +202,7 @@ func completeSummary(summary *Summary) { summary.AQueueLatencyNS = value } if value := subtractIfPresent(summary.Timestamps, EventBAppRecv, EventATXSoftware); value != nil { - summary.ABTransportPropagationBQueueLatencyNS = value + summary.ABTransportPropagationNS = value } if value := subtractIfPresent(summary.Timestamps, EventBAppRecv, EventBRXSoftware); value != nil { summary.BKernelReceivePathLatencyNS = value @@ -208,6 +215,90 @@ func completeSummary(summary *Summary) { } } +type routeKey struct { + From string + To string +} + +func assignApproxRTTs(summaries []*Summary) { + grouped := make(map[routeKey][]*Summary) + for _, summary := range summaries { + grouped[routeKey{From: summary.From, To: summary.To}] = append(grouped[routeKey{From: summary.From, To: summary.To}], summary) + } + + for key, requests := range grouped { + replies := grouped[routeKey{From: key.To, To: key.From}] + if len(replies) == 0 { + continue + } + + assignApproxRTTsForRoute( + sortSummariesByTimestamp(requests, EventBAppRecv), + sortSummariesByTimestamp(replies, EventATXSoftware), + ) + } +} + +func assignApproxRTTsForRoute(requests, replies []*Summary) { + replyIndex := 0 + for _, request := range requests { + requestReceivedAtResponder, ok := request.Timestamps[EventBAppRecv] + if !ok { + continue + } + + for replyIndex < len(replies) { + reply := replies[replyIndex] + replySentAtResponder, ok := reply.Timestamps[EventATXSoftware] + if !ok { + replyIndex++ + continue + } + if replySentAtResponder < requestReceivedAtResponder { + replyIndex++ + continue + } + + if value := subtractSummaryTimestamps(reply, EventBAppRecv, request, EventATXSoftware); value != nil { + request.ApproxRTTNS = value + } + replyIndex++ + break + } + } +} + +func sortSummariesByTimestamp(summaries []*Summary, eventName string) []*Summary { + sorted := append([]*Summary(nil), summaries...) + sort.SliceStable(sorted, func(i, j int) bool { + leftTS, leftOK := sorted[i].Timestamps[eventName] + rightTS, rightOK := sorted[j].Timestamps[eventName] + switch { + case leftOK && rightOK: + if leftTS != rightTS { + return leftTS < rightTS + } + case leftOK: + return true + case rightOK: + return false + } + + if sorted[i].MessageID != sorted[j].MessageID { + return sorted[i].MessageID < sorted[j].MessageID + } + if sorted[i].From != sorted[j].From { + return sorted[i].From < sorted[j].From + } + if sorted[i].To != sorted[j].To { + return sorted[i].To < sorted[j].To + } + + return sorted[i].MessageType < sorted[j].MessageType + }) + return sorted +} + // 返回 requiredTimestampNames 中哪些在给定的 timestamps 中缺失。 func missingTimestampNames(timestamps map[string]int64) []string { var missing []string @@ -235,6 +326,20 @@ func subtractIfPresent(timestamps map[string]int64, endName, beginName string) * return &value } +func subtractSummaryTimestamps(endSummary *Summary, endName string, beginSummary *Summary, beginName string) *int64 { + end, ok := endSummary.Timestamps[endName] + if !ok { + return nil + } + begin, ok := beginSummary.Timestamps[beginName] + if !ok { + return nil + } + + value := end - begin + return &value +} + // 判断事件是否是业务相关的时延事件(其中一项) func IsBusinessEvent(event Event) bool { switch event.Event { diff --git a/cmd/internal/latencylog/summary_chart.go b/cmd/internal/latencylog/summary_chart.go index 1953fe9..30b7559 100644 --- a/cmd/internal/latencylog/summary_chart.go +++ b/cmd/internal/latencylog/summary_chart.go @@ -215,6 +215,7 @@ const summaryChartHTMLTemplate = `
{{.EndToEnd}}
{{.Subtitle}}
+
{{.ApproxRTT}}
{{range .Segments}}
@@ -262,6 +263,7 @@ type summaryChartRow struct { Title string Subtitle string EndToEnd string + ApproxRTT string MissingTimestamps string Segments []summaryChartSegment } @@ -314,7 +316,7 @@ func buildSummaryChartPage(summaries []Summary) summaryChartPage { Legend: []summaryChartLegendItem{ {Label: "A processing", Color: "var(--a-proc)"}, {Label: "A queue", Color: "var(--a-queue)"}, - {Label: "Transport + B queue", Color: "var(--transport)"}, + {Label: "A-B transport + propagation", Color: "var(--transport)"}, {Label: "B processing", Color: "var(--b-proc)"}, {Label: "Unknown / missing", Color: "var(--unknown)"}, }, @@ -357,8 +359,12 @@ func buildSummaryChartRow(summary Summary) summaryChartRow { Title: buildSummaryChartTitle(summary), Subtitle: buildSummaryChartSubtitle(summary), EndToEnd: "End-to-end: n/a", + ApproxRTT: "Approx RTT: n/a", MissingTimestamps: strings.Join(summary.MissingTimestamps, ", "), } + if summary.ApproxRTTNS != nil && *summary.ApproxRTTNS > 0 { + row.ApproxRTT = fmt.Sprintf("Approx RTT: %s", formatLatencyNS(*summary.ApproxRTTNS)) + } if summary.EndToEndLatencyNS == nil || *summary.EndToEndLatencyNS <= 0 { return row @@ -370,7 +376,7 @@ func buildSummaryChartRow(summary Summary) summaryChartRow { metrics := []summaryChartSegmentMetric{ {label: "A processing", value: summary.AProcessingLatencyNS, color: "var(--a-proc)"}, {label: "A queue", value: summary.AQueueLatencyNS, color: "var(--a-queue)"}, - {label: "Transport + B queue", value: summary.ABTransportPropagationBQueueLatencyNS, color: "var(--transport)"}, + {label: "A-B transport + propagation", value: summary.ABTransportPropagationNS, color: "var(--transport)"}, {label: "B processing", value: summary.BProcessingLatencyNS, color: "var(--b-proc)"}, } diff --git a/cmd/internal/latencylog/summary_chart_test.go b/cmd/internal/latencylog/summary_chart_test.go index a7a7d0d..be466ac 100644 --- a/cmd/internal/latencylog/summary_chart_test.go +++ b/cmd/internal/latencylog/summary_chart_test.go @@ -18,16 +18,17 @@ func TestWriteSummariesHTMLChart(t *testing.T) { summaries := []Summary{ { - MessageType: protocol.MessageTypeText, - MessageID: 7, - From: "peer-a", - To: "peer-b", - BodySize: 5, - AProcessingLatencyNS: &aProcessing, - AQueueLatencyNS: &aQueue, - ABTransportPropagationBQueueLatencyNS: &transport, - BProcessingLatencyNS: &bProcessing, - EndToEndLatencyNS: &endToEnd, + MessageType: protocol.MessageTypeText, + MessageID: 7, + From: "peer-a", + To: "peer-b", + BodySize: 5, + AProcessingLatencyNS: &aProcessing, + AQueueLatencyNS: &aQueue, + ABTransportPropagationNS: &transport, + BProcessingLatencyNS: &bProcessing, + EndToEndLatencyNS: &endToEnd, + ApproxRTTNS: &endToEnd, }, { MessageType: protocol.MessageTypeFile, @@ -56,7 +57,9 @@ func TestWriteSummariesHTMLChart(t *testing.T) { "text #7", "peer-a -> peer-b | 5 bytes", "End-to-end: 100.000 ms", + "Approx RTT: 100.000 ms", "A processing 20.000 ms", + "A-B transport + propagation 40.000 ms", "file #8 (payload.bin)", "Missing timestamps: B_RX_SOFTWARE", } { diff --git a/cmd/internal/latencylog/summary_test.go b/cmd/internal/latencylog/summary_test.go index a7a0eb6..e8ac2f8 100644 --- a/cmd/internal/latencylog/summary_test.go +++ b/cmd/internal/latencylog/summary_test.go @@ -34,8 +34,8 @@ func TestSummarizeEventsComputesLatencyMetrics(t *testing.T) { if got := ptrValue(summary.AQueueLatencyNS); got != 20 { t.Fatalf("AQueueLatencyNS = %d, want 20", got) } - if got := ptrValue(summary.ABTransportPropagationBQueueLatencyNS); got != 80 { - t.Fatalf("ABTransportPropagationBQueueLatencyNS = %d, want 80", got) + if got := ptrValue(summary.ABTransportPropagationNS); got != 80 { + t.Fatalf("ABTransportPropagationNS = %d, want 80", got) } if got := ptrValue(summary.BKernelReceivePathLatencyNS); got != 40 { t.Fatalf("BKernelReceivePathLatencyNS = %d, want 40", got) @@ -54,6 +54,46 @@ func TestSummarizeEventsComputesLatencyMetrics(t *testing.T) { } } +func TestSummarizeEventsComputesApproxRTTByPairingReverseMessages(t *testing.T) { + events := []Event{ + {TsUnixNano: 100, Event: EventAAppPrepBegin, MessageType: protocol.MessageTypeText, MessageID: 1, From: "peer-a", To: "peer-b"}, + {TsUnixNano: 110, Event: EventATXSoftware, MessageType: protocol.MessageTypeText, MessageID: 1, From: "peer-a", To: "peer-b"}, + {TsUnixNano: 180, Event: EventBAppRecv, MessageType: protocol.MessageTypeText, MessageID: 1, From: "peer-a", To: "peer-b"}, + {TsUnixNano: 120, Event: EventAAppPrepBegin, MessageType: protocol.MessageTypeText, MessageID: 2, From: "peer-a", To: "peer-b"}, + {TsUnixNano: 140, Event: EventATXSoftware, MessageType: protocol.MessageTypeText, MessageID: 2, From: "peer-a", To: "peer-b"}, + {TsUnixNano: 190, Event: EventBAppRecv, MessageType: protocol.MessageTypeText, MessageID: 2, From: "peer-a", To: "peer-b"}, + {TsUnixNano: 200, Event: EventAAppPrepBegin, MessageType: protocol.MessageTypeText, MessageID: 11, From: "peer-b", To: "peer-a"}, + {TsUnixNano: 210, Event: EventATXSoftware, MessageType: protocol.MessageTypeText, MessageID: 11, From: "peer-b", To: "peer-a"}, + {TsUnixNano: 260, Event: EventBAppRecv, MessageType: protocol.MessageTypeText, MessageID: 11, From: "peer-b", To: "peer-a"}, + {TsUnixNano: 220, Event: EventAAppPrepBegin, MessageType: protocol.MessageTypeText, MessageID: 12, From: "peer-b", To: "peer-a"}, + {TsUnixNano: 230, Event: EventATXSoftware, MessageType: protocol.MessageTypeText, MessageID: 12, From: "peer-b", To: "peer-a"}, + {TsUnixNano: 310, Event: EventBAppRecv, MessageType: protocol.MessageTypeText, MessageID: 12, From: "peer-b", To: "peer-a"}, + } + + summaries := SummarizeEvents(events) + if len(summaries) != 4 { + t.Fatalf("summary count = %d, want 4", len(summaries)) + } + + gotByMessageID := make(map[uint64]Summary, len(summaries)) + for _, summary := range summaries { + gotByMessageID[summary.MessageID] = summary + } + + if got := ptrValue(gotByMessageID[1].ApproxRTTNS); got != 150 { + t.Fatalf("message 1 ApproxRTTNS = %d, want 150", got) + } + if got := ptrValue(gotByMessageID[2].ApproxRTTNS); got != 170 { + t.Fatalf("message 2 ApproxRTTNS = %d, want 170", got) + } + if gotByMessageID[11].ApproxRTTNS != nil { + t.Fatalf("message 11 ApproxRTTNS = %d, want nil", ptrValue(gotByMessageID[11].ApproxRTTNS)) + } + if gotByMessageID[12].ApproxRTTNS != nil { + t.Fatalf("message 12 ApproxRTTNS = %d, want nil", ptrValue(gotByMessageID[12].ApproxRTTNS)) + } +} + func TestSummarizeEventsReportsMissingTimestamps(t *testing.T) { events := []Event{ {TsUnixNano: 100, Event: EventAAppPrepBegin, MessageType: protocol.MessageTypeText, MessageID: 2, From: "peer-a", To: "peer-b"}, diff --git a/cmd/internal/peer/client.go b/cmd/internal/peer/client.go index 48cb5fa..8ab0dd8 100644 --- a/cmd/internal/peer/client.go +++ b/cmd/internal/peer/client.go @@ -195,6 +195,7 @@ func (c *Client) nextMessageID() uint64 { return atomic.AddUint64(&c.nextID, 1) } +// 根据提供的选项创建 TCP 连接,并进行必要的配置,例如绑定本地 IP 或网卡。成功建立连接后,返回一个封装了该连接的 TCPConn 实例。 func dialServerWithOptions(serverAddr string, options clientOptions) (net.Conn, error) { dialer, err := buildDialer(options) if err != nil {