Merge branch 'main' of https://106.52.207.92:9103/limingjie/OmniSocketGo
This commit is contained in:
3
.gitignore
vendored
3
.gitignore
vendored
@@ -2,3 +2,6 @@ bin/*
|
|||||||
inbox/*
|
inbox/*
|
||||||
*.jsonl
|
*.jsonl
|
||||||
*.html
|
*.html
|
||||||
|
peer-b-latency.*
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -79,3 +79,6 @@ text peer-a hi
|
|||||||
file peer-b /tmp/test.bin
|
file peer-b /tmp/test.bin
|
||||||
quit
|
quit
|
||||||
```
|
```
|
||||||
|
### 自动化拉取更新汇总数据
|
||||||
|
cd /home/limingjie/LMJ_Work/OmniSocketGo
|
||||||
|
./scripts/refresh-latency-summary.sh
|
||||||
@@ -66,6 +66,48 @@ func LoadEventsFromFiles(paths []string) ([]Event, error) {
|
|||||||
return events, nil
|
return events, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// LoadEventsFromFilesWithSharedMaxOffset 从多个 JSONL 原始日志文件中加载事件,
|
||||||
|
// 并按每个输入文件的最大 message_id 计算共享截断点。
|
||||||
|
func LoadEventsFromFilesWithSharedMaxOffset(paths []string, sharedMaxOffset uint64) ([]Event, *uint64, error) {
|
||||||
|
eventsByFile := make([][]Event, 0, len(paths))
|
||||||
|
var minMaxMessageID uint64
|
||||||
|
hasSharedMax := false
|
||||||
|
|
||||||
|
for _, path := range paths {
|
||||||
|
fileEvents, err := LoadEventsFromFile(path)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
eventsByFile = append(eventsByFile, fileEvents)
|
||||||
|
|
||||||
|
fileMaxMessageID, ok := maxBusinessMessageID(fileEvents)
|
||||||
|
if !ok {
|
||||||
|
return nil, nil, nil
|
||||||
|
}
|
||||||
|
if !hasSharedMax || fileMaxMessageID < minMaxMessageID {
|
||||||
|
minMaxMessageID = fileMaxMessageID
|
||||||
|
hasSharedMax = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if !hasSharedMax {
|
||||||
|
return nil, nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
cutoff, ok := subtractUint64(minMaxMessageID, sharedMaxOffset)
|
||||||
|
if !ok {
|
||||||
|
return []Event{}, nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var events []Event
|
||||||
|
for _, fileEvents := range eventsByFile {
|
||||||
|
events = append(events, filterEventsByMaxMessageID(fileEvents, cutoff)...)
|
||||||
|
}
|
||||||
|
|
||||||
|
return events, &cutoff, nil
|
||||||
|
}
|
||||||
|
|
||||||
// LoadEventsFromFile 从单个 JSONL 原始日志文件中加载事件。
|
// LoadEventsFromFile 从单个 JSONL 原始日志文件中加载事件。
|
||||||
func LoadEventsFromFile(path string) ([]Event, error) {
|
func LoadEventsFromFile(path string) ([]Event, error) {
|
||||||
file, err := os.Open(path)
|
file, err := os.Open(path)
|
||||||
@@ -357,6 +399,45 @@ func calculateBitrateBPS(bodySize int, latencyNS *int64) *float64 {
|
|||||||
return &value
|
return &value
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 最大 message_id 计算函数
|
||||||
|
func maxBusinessMessageID(events []Event) (uint64, bool) {
|
||||||
|
var maxMessageID uint64
|
||||||
|
hasBusinessMessage := false
|
||||||
|
|
||||||
|
for _, event := range events {
|
||||||
|
if !IsBusinessEvent(event) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if !hasBusinessMessage || event.MessageID > maxMessageID {
|
||||||
|
maxMessageID = event.MessageID
|
||||||
|
hasBusinessMessage = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return maxMessageID, hasBusinessMessage
|
||||||
|
}
|
||||||
|
|
||||||
|
// 根据 message_id 截断事件列表的函数
|
||||||
|
func filterEventsByMaxMessageID(events []Event, maxMessageID uint64) []Event {
|
||||||
|
filtered := make([]Event, 0, len(events))
|
||||||
|
for _, event := range events {
|
||||||
|
if event.MessageID > maxMessageID {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
filtered = append(filtered, event)
|
||||||
|
}
|
||||||
|
|
||||||
|
return filtered
|
||||||
|
}
|
||||||
|
|
||||||
|
func subtractUint64(value, offset uint64) (uint64, bool) {
|
||||||
|
if offset > value {
|
||||||
|
return 0, false
|
||||||
|
}
|
||||||
|
|
||||||
|
return value - offset, true
|
||||||
|
}
|
||||||
|
|
||||||
// 判断事件是否是业务相关的时延事件(其中一项)
|
// 判断事件是否是业务相关的时延事件(其中一项)
|
||||||
func IsBusinessEvent(event Event) bool {
|
func IsBusinessEvent(event Event) bool {
|
||||||
switch event.Event {
|
switch event.Event {
|
||||||
|
|||||||
@@ -6,6 +6,7 @@ import (
|
|||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"reflect"
|
"reflect"
|
||||||
|
"sort"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"omnisocketgo/cmd/internal/protocol"
|
"omnisocketgo/cmd/internal/protocol"
|
||||||
@@ -188,6 +189,147 @@ func TestLoadAndWriteSummaryFiles(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestLoadEventsFromFilesWithSharedMaxOffsetFiltersToSharedCutoff(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
testCases := []struct {
|
||||||
|
name string
|
||||||
|
firstMessageIDs []uint64
|
||||||
|
secondMessageIDs []uint64
|
||||||
|
offset uint64
|
||||||
|
wantCutoff *uint64
|
||||||
|
wantMessageIDs []uint64
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "same max message id rolls back one",
|
||||||
|
firstMessageIDs: []uint64{1, 2, 3, 4, 5, 6, 7},
|
||||||
|
secondMessageIDs: []uint64{1, 2, 3, 4, 5, 6, 7},
|
||||||
|
offset: 1,
|
||||||
|
wantCutoff: uint64Ptr(6),
|
||||||
|
wantMessageIDs: []uint64{1, 2, 3, 4, 5, 6},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "smaller input max wins before rollback",
|
||||||
|
firstMessageIDs: []uint64{1, 2, 3, 4, 5, 6, 7, 8, 9},
|
||||||
|
secondMessageIDs: []uint64{1, 2, 3, 4, 5, 6, 7},
|
||||||
|
offset: 1,
|
||||||
|
wantCutoff: uint64Ptr(6),
|
||||||
|
wantMessageIDs: []uint64{1, 2, 3, 4, 5, 6},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "not enough shared messages yields empty result",
|
||||||
|
firstMessageIDs: []uint64{1},
|
||||||
|
secondMessageIDs: []uint64{1},
|
||||||
|
offset: 1,
|
||||||
|
wantCutoff: uint64Ptr(0),
|
||||||
|
wantMessageIDs: nil,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tt := range testCases {
|
||||||
|
tt := tt
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
tempDir := t.TempDir()
|
||||||
|
firstPath := filepath.Join(tempDir, "first.jsonl")
|
||||||
|
secondPath := filepath.Join(tempDir, "second.jsonl")
|
||||||
|
writeEventsJSONL(t, firstPath, testEventsForMessageIDs(tt.firstMessageIDs, "peer-a", "peer-b"))
|
||||||
|
writeEventsJSONL(t, secondPath, testEventsForMessageIDs(tt.secondMessageIDs, "peer-b", "peer-a"))
|
||||||
|
|
||||||
|
events, cutoff, err := LoadEventsFromFilesWithSharedMaxOffset([]string{firstPath, secondPath}, tt.offset)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("LoadEventsFromFilesWithSharedMaxOffset() error = %v", err)
|
||||||
|
}
|
||||||
|
if !reflect.DeepEqual(cutoff, tt.wantCutoff) {
|
||||||
|
t.Fatalf("cutoff = %v, want %v", cutoff, tt.wantCutoff)
|
||||||
|
}
|
||||||
|
|
||||||
|
if got := businessMessageIDs(events); !reflect.DeepEqual(got, tt.wantMessageIDs) {
|
||||||
|
t.Fatalf("message IDs = %v, want %v", got, tt.wantMessageIDs)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestLoadEventsFromFilesWithSharedMaxOffsetPreservesEarlierSummaries(t *testing.T) {
|
||||||
|
tempDir := t.TempDir()
|
||||||
|
firstPath := filepath.Join(tempDir, "first.jsonl")
|
||||||
|
secondPath := filepath.Join(tempDir, "second.jsonl")
|
||||||
|
|
||||||
|
writeEventsJSONL(t, firstPath, []Event{
|
||||||
|
{TsUnixNano: 100, Event: EventAAppPrepBegin, MessageType: protocol.MessageTypeText, MessageID: 1, From: "peer-a", To: "peer-b", BodySize: 320},
|
||||||
|
{TsUnixNano: 120, Event: EventATXSched, MessageType: protocol.MessageTypeText, MessageID: 1, From: "peer-a", To: "peer-b", BodySize: 320},
|
||||||
|
{TsUnixNano: 140, Event: EventATXSoftware, MessageType: protocol.MessageTypeText, MessageID: 1, From: "peer-a", To: "peer-b", BodySize: 320},
|
||||||
|
{TsUnixNano: 180, Event: EventBRXSoftware, MessageType: protocol.MessageTypeText, MessageID: 1, From: "peer-a", To: "peer-b", BodySize: 320},
|
||||||
|
{TsUnixNano: 220, Event: EventBAppRecv, MessageType: protocol.MessageTypeText, MessageID: 1, From: "peer-a", To: "peer-b", BodySize: 320},
|
||||||
|
{TsUnixNano: 260, Event: EventBPersistEnd, MessageType: protocol.MessageTypeText, MessageID: 1, From: "peer-a", To: "peer-b", BodySize: 320},
|
||||||
|
{TsUnixNano: 300, Event: EventAAppPrepBegin, MessageType: protocol.MessageTypeText, MessageID: 2, From: "peer-a", To: "peer-b", BodySize: 160},
|
||||||
|
{TsUnixNano: 330, Event: EventATXSched, MessageType: protocol.MessageTypeText, MessageID: 2, From: "peer-a", To: "peer-b", BodySize: 160},
|
||||||
|
{TsUnixNano: 360, Event: EventATXSoftware, MessageType: protocol.MessageTypeText, MessageID: 2, From: "peer-a", To: "peer-b", BodySize: 160},
|
||||||
|
{TsUnixNano: 390, Event: EventBRXSoftware, MessageType: protocol.MessageTypeText, MessageID: 2, From: "peer-a", To: "peer-b", BodySize: 160},
|
||||||
|
{TsUnixNano: 420, Event: EventBAppRecv, MessageType: protocol.MessageTypeText, MessageID: 2, From: "peer-a", To: "peer-b", BodySize: 160},
|
||||||
|
{TsUnixNano: 470, Event: EventBPersistEnd, MessageType: protocol.MessageTypeText, MessageID: 2, From: "peer-a", To: "peer-b", BodySize: 160},
|
||||||
|
{TsUnixNano: 500, Event: EventAAppPrepBegin, MessageType: protocol.MessageTypeText, MessageID: 3, From: "peer-a", To: "peer-b", BodySize: 80},
|
||||||
|
{TsUnixNano: 520, Event: EventATXSched, MessageType: protocol.MessageTypeText, MessageID: 3, From: "peer-a", To: "peer-b", BodySize: 80},
|
||||||
|
{TsUnixNano: 540, Event: EventATXSoftware, MessageType: protocol.MessageTypeText, MessageID: 3, From: "peer-a", To: "peer-b", BodySize: 80},
|
||||||
|
{TsUnixNano: 560, Event: EventBRXSoftware, MessageType: protocol.MessageTypeText, MessageID: 3, From: "peer-a", To: "peer-b", BodySize: 80},
|
||||||
|
{TsUnixNano: 580, Event: EventBAppRecv, MessageType: protocol.MessageTypeText, MessageID: 3, From: "peer-a", To: "peer-b", BodySize: 80},
|
||||||
|
{TsUnixNano: 600, Event: EventBPersistEnd, MessageType: protocol.MessageTypeText, MessageID: 3, From: "peer-a", To: "peer-b", BodySize: 80},
|
||||||
|
{TsUnixNano: 700, Event: EventAAppPrepBegin, MessageType: protocol.MessageTypeText, MessageID: 4, From: "peer-a", To: "peer-b", BodySize: 40},
|
||||||
|
})
|
||||||
|
writeEventsJSONL(t, secondPath, []Event{
|
||||||
|
{TsUnixNano: 90, Event: EventAAppPrepBegin, MessageType: protocol.MessageTypeText, MessageID: 1, From: "peer-b", To: "peer-a", BodySize: 20},
|
||||||
|
{TsUnixNano: 95, Event: EventATXSoftware, MessageType: protocol.MessageTypeText, MessageID: 1, From: "peer-b", To: "peer-a", BodySize: 20},
|
||||||
|
{TsUnixNano: 150, Event: EventBAppRecv, MessageType: protocol.MessageTypeText, MessageID: 1, From: "peer-b", To: "peer-a", BodySize: 20},
|
||||||
|
{TsUnixNano: 290, Event: EventAAppPrepBegin, MessageType: protocol.MessageTypeText, MessageID: 2, From: "peer-b", To: "peer-a", BodySize: 20},
|
||||||
|
{TsUnixNano: 295, Event: EventATXSoftware, MessageType: protocol.MessageTypeText, MessageID: 2, From: "peer-b", To: "peer-a", BodySize: 20},
|
||||||
|
{TsUnixNano: 350, Event: EventBAppRecv, MessageType: protocol.MessageTypeText, MessageID: 2, From: "peer-b", To: "peer-a", BodySize: 20},
|
||||||
|
{TsUnixNano: 490, Event: EventAAppPrepBegin, MessageType: protocol.MessageTypeText, MessageID: 3, From: "peer-b", To: "peer-a", BodySize: 20},
|
||||||
|
{TsUnixNano: 495, Event: EventATXSoftware, MessageType: protocol.MessageTypeText, MessageID: 3, From: "peer-b", To: "peer-a", BodySize: 20},
|
||||||
|
{TsUnixNano: 550, Event: EventBAppRecv, MessageType: protocol.MessageTypeText, MessageID: 3, From: "peer-b", To: "peer-a", BodySize: 20},
|
||||||
|
{TsUnixNano: 690, Event: EventAAppPrepBegin, MessageType: protocol.MessageTypeText, MessageID: 4, From: "peer-b", To: "peer-a", BodySize: 20},
|
||||||
|
})
|
||||||
|
|
||||||
|
events, cutoff, err := LoadEventsFromFilesWithSharedMaxOffset([]string{firstPath, secondPath}, 1)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("LoadEventsFromFilesWithSharedMaxOffset() error = %v", err)
|
||||||
|
}
|
||||||
|
if !reflect.DeepEqual(cutoff, uint64Ptr(3)) {
|
||||||
|
t.Fatalf("cutoff = %v, want %v", cutoff, uint64Ptr(3))
|
||||||
|
}
|
||||||
|
|
||||||
|
summaries := SummarizeEvents(events)
|
||||||
|
if got := len(summaries); got != 6 {
|
||||||
|
t.Fatalf("summary count = %d, want 6", got)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, summary := range summaries {
|
||||||
|
if summary.MessageID == 4 {
|
||||||
|
t.Fatalf("message 4 should have been truncated from summaries: %+v", summary)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var forwardMessageTwo Summary
|
||||||
|
found := false
|
||||||
|
for _, summary := range summaries {
|
||||||
|
if summary.From == "peer-a" && summary.To == "peer-b" && summary.MessageID == 2 {
|
||||||
|
forwardMessageTwo = summary
|
||||||
|
found = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !found {
|
||||||
|
t.Fatal("summary for message 2 peer-a -> peer-b not found")
|
||||||
|
}
|
||||||
|
if got := ptrValue(forwardMessageTwo.EndToEndLatencyNS); got != 170 {
|
||||||
|
t.Fatalf("message 2 EndToEndLatencyNS = %d, want 170", got)
|
||||||
|
}
|
||||||
|
if got := ptrValue(forwardMessageTwo.ApproxRTTNS); got != 190 {
|
||||||
|
t.Fatalf("message 2 ApproxRTTNS = %d, want 190", got)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func ptrValue(value *int64) int64 {
|
func ptrValue(value *int64) int64 {
|
||||||
if value == nil {
|
if value == nil {
|
||||||
return 0
|
return 0
|
||||||
@@ -201,3 +343,57 @@ func ptrValueFloat(value *float64) float64 {
|
|||||||
}
|
}
|
||||||
return *value
|
return *value
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func uint64Ptr(value uint64) *uint64 {
|
||||||
|
return &value
|
||||||
|
}
|
||||||
|
|
||||||
|
func businessMessageIDs(events []Event) []uint64 {
|
||||||
|
seen := make(map[uint64]struct{})
|
||||||
|
var ids []uint64
|
||||||
|
|
||||||
|
for _, event := range events {
|
||||||
|
if !IsBusinessEvent(event) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if _, ok := seen[event.MessageID]; ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
seen[event.MessageID] = struct{}{}
|
||||||
|
ids = append(ids, event.MessageID)
|
||||||
|
}
|
||||||
|
|
||||||
|
sort.Slice(ids, func(i, j int) bool {
|
||||||
|
return ids[i] < ids[j]
|
||||||
|
})
|
||||||
|
return ids
|
||||||
|
}
|
||||||
|
|
||||||
|
func testEventsForMessageIDs(messageIDs []uint64, from, to string) []Event {
|
||||||
|
events := make([]Event, 0, len(messageIDs)*2)
|
||||||
|
for _, messageID := range messageIDs {
|
||||||
|
events = append(events,
|
||||||
|
Event{TsUnixNano: int64(messageID*100 + 10), Event: EventAAppPrepBegin, MessageType: protocol.MessageTypeText, MessageID: messageID, From: from, To: to, BodySize: 32},
|
||||||
|
Event{TsUnixNano: int64(messageID*100 + 20), Event: EventATXSoftware, MessageType: protocol.MessageTypeText, MessageID: messageID, From: from, To: to, BodySize: 32},
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
return events
|
||||||
|
}
|
||||||
|
|
||||||
|
func writeEventsJSONL(t *testing.T, path string, events []Event) {
|
||||||
|
t.Helper()
|
||||||
|
|
||||||
|
file, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0o644)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("os.OpenFile(%s) error = %v", path, err)
|
||||||
|
}
|
||||||
|
defer file.Close()
|
||||||
|
|
||||||
|
encoder := json.NewEncoder(file)
|
||||||
|
for _, event := range events {
|
||||||
|
if err := encoder.Encode(event); err != nil {
|
||||||
|
t.Fatalf("encoder.Encode(%s) error = %v", path, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -23,6 +23,8 @@ func (f *stringListFlag) Set(value string) error {
|
|||||||
func main() {
|
func main() {
|
||||||
var inputPaths stringListFlag
|
var inputPaths stringListFlag
|
||||||
outputPath := flag.String("output", "latency-summary.jsonl", "output JSONL file for summarized latency metrics")
|
outputPath := flag.String("output", "latency-summary.jsonl", "output JSONL file for summarized latency metrics")
|
||||||
|
// shared-max-offset 是一个可选参数,用于在对齐输入文件的 per-file max message_id 后,排除掉最新的共享 message_id 以外的记录。它指定了要排除的共享 message_id 的数量。
|
||||||
|
sharedMaxOffset := flag.Uint64("shared-max-offset", 1, "number of newest shared message IDs to exclude after aligning inputs by per-file max message_id")
|
||||||
flag.Var(&inputPaths, "input", "raw latency JSONL file path; can be provided multiple times")
|
flag.Var(&inputPaths, "input", "raw latency JSONL file path; can be provided multiple times")
|
||||||
flag.Parse()
|
flag.Parse()
|
||||||
|
|
||||||
@@ -30,10 +32,16 @@ func main() {
|
|||||||
log.Fatal("at least one -input raw latency log file is required")
|
log.Fatal("at least one -input raw latency log file is required")
|
||||||
}
|
}
|
||||||
|
|
||||||
events, err := latencylog.LoadEventsFromFiles(inputPaths)
|
events, sharedMaxMessageID, err := latencylog.LoadEventsFromFilesWithSharedMaxOffset(inputPaths, *sharedMaxOffset)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("load raw latency logs: %v", err)
|
log.Fatalf("load raw latency logs: %v", err)
|
||||||
}
|
}
|
||||||
|
// sharedMaxMessageID 可能为 nil,表示没有可用的共享 message_id 截止值(例如因为输入文件中没有共享消息)。在这种情况下,我们将继续处理所有事件,但会记录一个警告。
|
||||||
|
if sharedMaxMessageID != nil {
|
||||||
|
log.Printf("using shared message_id cutoff <= %d (shared-max-offset=%d)", *sharedMaxMessageID, *sharedMaxOffset)
|
||||||
|
} else {
|
||||||
|
log.Printf("no shared message_id cutoff available after applying shared-max-offset=%d", *sharedMaxOffset)
|
||||||
|
}
|
||||||
|
|
||||||
summaries := latencylog.SummarizeEvents(events)
|
summaries := latencylog.SummarizeEvents(events)
|
||||||
if err := latencylog.WriteSummariesJSONL(*outputPath, summaries); err != nil {
|
if err := latencylog.WriteSummariesJSONL(*outputPath, summaries); err != nil {
|
||||||
|
|||||||
136
scripts/refresh-latency-summary.sh
Executable file
136
scripts/refresh-latency-summary.sh
Executable file
@@ -0,0 +1,136 @@
|
|||||||
|
#!/usr/bin/env bash
|
||||||
|
|
||||||
|
set -u
|
||||||
|
set -o pipefail
|
||||||
|
|
||||||
|
repo_dir="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)"
|
||||||
|
remote_source="boll@175.178.116.187:/home/boll/LMJWork/OmniSocketGo/peer-b-latency.jsonl"
|
||||||
|
local_peer_a="$repo_dir/peer-a-latency.jsonl"
|
||||||
|
local_peer_b="$repo_dir/peer-b-latency.jsonl"
|
||||||
|
summary_output="$repo_dir/latency-summary.jsonl"
|
||||||
|
chart_output="$repo_dir/latency-summary.html"
|
||||||
|
latency_binary="$repo_dir/bin/latencysummary"
|
||||||
|
go_cache_dir="${GOCACHE:-/tmp/omnisocketgo-go-build}"
|
||||||
|
poll_interval_seconds=1
|
||||||
|
|
||||||
|
remote_tmp=""
|
||||||
|
summary_tmp=""
|
||||||
|
chart_tmp=""
|
||||||
|
|
||||||
|
log() {
|
||||||
|
printf '[%s] %s\n' "$(date '+%Y-%m-%d %H:%M:%S')" "$*"
|
||||||
|
}
|
||||||
|
|
||||||
|
cleanup_temp_file() {
|
||||||
|
local path="$1"
|
||||||
|
if [[ -n "$path" && -e "$path" ]]; then
|
||||||
|
rm -f "$path"
|
||||||
|
fi
|
||||||
|
}
|
||||||
|
|
||||||
|
cleanup() {
|
||||||
|
cleanup_temp_file "$remote_tmp"
|
||||||
|
cleanup_temp_file "$summary_tmp"
|
||||||
|
cleanup_temp_file "$chart_tmp"
|
||||||
|
}
|
||||||
|
|
||||||
|
handle_interrupt() {
|
||||||
|
log "received interrupt signal, stopping refresh loop"
|
||||||
|
cleanup
|
||||||
|
exit 130
|
||||||
|
}
|
||||||
|
|
||||||
|
handle_terminate() {
|
||||||
|
log "received terminate signal, stopping refresh loop"
|
||||||
|
cleanup
|
||||||
|
exit 143
|
||||||
|
}
|
||||||
|
|
||||||
|
trap cleanup EXIT
|
||||||
|
trap handle_interrupt INT
|
||||||
|
trap handle_terminate TERM
|
||||||
|
|
||||||
|
cd "$repo_dir" || exit 1
|
||||||
|
|
||||||
|
mkdir -p "$repo_dir/bin"
|
||||||
|
mkdir -p "$go_cache_dir"
|
||||||
|
if ! GOCACHE="$go_cache_dir" go build -o "$latency_binary" ./cmd/latencysummary; then
|
||||||
|
log "build failed; exiting"
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
|
|
||||||
|
log "starting 1-second refresh loop"
|
||||||
|
|
||||||
|
while true; do
|
||||||
|
remote_tmp="$(mktemp "$repo_dir/peer-b-latency.jsonl.tmp.XXXXXX")" || exit 1
|
||||||
|
if scp -P 10022 "$remote_source" "$remote_tmp"; then
|
||||||
|
if mv -f "$remote_tmp" "$local_peer_b"; then
|
||||||
|
remote_tmp=""
|
||||||
|
else
|
||||||
|
status=$?
|
||||||
|
log "failed to replace $(basename "$local_peer_b") after scp (exit $status)"
|
||||||
|
cleanup_temp_file "$remote_tmp"
|
||||||
|
remote_tmp=""
|
||||||
|
sleep "$poll_interval_seconds"
|
||||||
|
continue
|
||||||
|
fi
|
||||||
|
else
|
||||||
|
status=$?
|
||||||
|
log "scp refresh failed (exit $status)"
|
||||||
|
cleanup_temp_file "$remote_tmp"
|
||||||
|
remote_tmp=""
|
||||||
|
sleep "$poll_interval_seconds"
|
||||||
|
continue
|
||||||
|
fi
|
||||||
|
|
||||||
|
summary_tmp="$(mktemp "$repo_dir/latency-summary.tmp.XXXXXX.jsonl")" || exit 1
|
||||||
|
chart_tmp="${summary_tmp%.jsonl}.html"
|
||||||
|
if "$latency_binary" \
|
||||||
|
-input "$local_peer_a" \
|
||||||
|
-input "$local_peer_b" \
|
||||||
|
-shared-max-offset 1 \
|
||||||
|
-output "$summary_tmp"; then
|
||||||
|
if [[ ! -f "$summary_tmp" || ! -f "$chart_tmp" ]]; then
|
||||||
|
log "summary succeeded but temporary outputs are incomplete"
|
||||||
|
cleanup_temp_file "$summary_tmp"
|
||||||
|
cleanup_temp_file "$chart_tmp"
|
||||||
|
summary_tmp=""
|
||||||
|
chart_tmp=""
|
||||||
|
sleep "$poll_interval_seconds"
|
||||||
|
continue
|
||||||
|
fi
|
||||||
|
|
||||||
|
if ! mv -f "$summary_tmp" "$summary_output"; then
|
||||||
|
status=$?
|
||||||
|
log "failed to replace $(basename "$summary_output") (exit $status)"
|
||||||
|
cleanup_temp_file "$summary_tmp"
|
||||||
|
cleanup_temp_file "$chart_tmp"
|
||||||
|
summary_tmp=""
|
||||||
|
chart_tmp=""
|
||||||
|
sleep "$poll_interval_seconds"
|
||||||
|
continue
|
||||||
|
fi
|
||||||
|
summary_tmp=""
|
||||||
|
|
||||||
|
if ! mv -f "$chart_tmp" "$chart_output"; then
|
||||||
|
status=$?
|
||||||
|
log "failed to replace $(basename "$chart_output") (exit $status)"
|
||||||
|
cleanup_temp_file "$chart_tmp"
|
||||||
|
chart_tmp=""
|
||||||
|
sleep "$poll_interval_seconds"
|
||||||
|
continue
|
||||||
|
fi
|
||||||
|
chart_tmp=""
|
||||||
|
else
|
||||||
|
status=$?
|
||||||
|
log "latency summary refresh failed (exit $status)"
|
||||||
|
cleanup_temp_file "$summary_tmp"
|
||||||
|
cleanup_temp_file "$chart_tmp"
|
||||||
|
summary_tmp=""
|
||||||
|
chart_tmp=""
|
||||||
|
sleep "$poll_interval_seconds"
|
||||||
|
continue
|
||||||
|
fi
|
||||||
|
|
||||||
|
sleep "$poll_interval_seconds"
|
||||||
|
done
|
||||||
Reference in New Issue
Block a user