package main import ( "encoding/json" "errors" "flag" "fmt" "io" "math" "os" "sort" "strings" "time" ) const ( defaultPeerID = "pinger" defaultServer = "127.0.0.1:9001" defaultCount = 100 defaultInterval = 100 * time.Millisecond defaultSize = 64 defaultTimeout = 3 * time.Second minExpiryPoll = 10 * time.Millisecond maxExpiryPoll = 100 * time.Millisecond ) type config struct { id string server string to string echo bool count int interval time.Duration size int timeout time.Duration bindIP string latencyLog string } type pingPayload struct { Seq uint64 `json:"seq"` TSUnixNano int64 `json:"ts_ns"` Pad string `json:"pad"` } type pendingPing struct { deadline time.Time } type replyDisposition int const ( replyMatched replyDisposition = iota replyDuplicate replyUnexpected ) type replyResult struct { disposition replyDisposition rtt time.Duration } type pingTracker struct { timeout time.Duration sent int duplicates int pending map[uint64]pendingPing seen map[uint64]struct{} samples []time.Duration } type rttSummary struct { Sent int Received int Duplicates int LossPct float64 Min time.Duration Avg time.Duration Max time.Duration P50 time.Duration P95 time.Duration P99 time.Duration StdDev time.Duration HasSamples bool } func main() { if err := runMain(os.Args[1:], os.Stdout, os.Stderr, time.Now); err != nil { if errors.Is(err, flag.ErrHelp) { return } _, _ = fmt.Fprintln(os.Stderr, err) os.Exit(1) } } func runMain(args []string, stdout, stderr io.Writer, now func() time.Time) error { cfg, err := parseConfig(args, stderr) if err != nil { return err } return runPlatform(cfg, stdout, stderr, now) } func parseConfig(args []string, stderr io.Writer) (config, error) { cfg := config{} flags := flag.NewFlagSet("udpping", flag.ContinueOnError) flags.SetOutput(stderr) flags.StringVar(&cfg.id, "id", defaultPeerID, "local peer identity") flags.StringVar(&cfg.server, "server", defaultServer, "UDP server address") flags.StringVar(&cfg.to, "to", "", "target peer identity in ping mode") flags.BoolVar(&cfg.echo, "echo", false, "echo back every received text message") flags.IntVar(&cfg.count, "count", defaultCount, "number of pings to send; 0 means run until interrupted") flags.DurationVar(&cfg.interval, "interval", defaultInterval, "delay between ping sends") flags.IntVar(&cfg.size, "size", defaultSize, "application payload size in bytes") flags.DurationVar(&cfg.timeout, "timeout", defaultTimeout, "per-ping timeout") flags.StringVar(&cfg.bindIP, "bind-ip", "", "optional local source IP used when dialing the server") flags.StringVar(&cfg.latencyLog, "latency-log", "", "optional JSONL file path for latency timestamp logs") if err := flags.Parse(args); err != nil { return config{}, err } if flags.NArg() > 0 { return config{}, fmt.Errorf("unexpected positional arguments: %s", strings.Join(flags.Args(), " ")) } cfg.id = strings.TrimSpace(cfg.id) cfg.server = strings.TrimSpace(cfg.server) cfg.to = strings.TrimSpace(cfg.to) cfg.bindIP = strings.TrimSpace(cfg.bindIP) cfg.latencyLog = strings.TrimSpace(cfg.latencyLog) if err := cfg.validate(); err != nil { return config{}, err } return cfg, nil } func (c config) validate() error { if c.id == "" { return fmt.Errorf("flag -id is required") } if c.server == "" { return fmt.Errorf("flag -server is required") } if !c.echo && c.to == "" { return fmt.Errorf("flag -to is required unless -echo is set") } if c.count < 0 { return fmt.Errorf("flag -count must be greater than or equal to zero") } if c.interval <= 0 { return fmt.Errorf("flag -interval must be greater than zero") } if c.size <= 0 { return fmt.Errorf("flag -size must be greater than zero") } if c.timeout <= 0 { return fmt.Errorf("flag -timeout must be greater than zero") } return nil } func buildPingPayload(seq uint64, tsUnixNano int64, size int) ([]byte, error) { payload := pingPayload{ Seq: seq, TSUnixNano: tsUnixNano, Pad: "", } body, err := json.Marshal(payload) if err != nil { return nil, fmt.Errorf("encode ping payload: %w", err) } if len(body) > size { return nil, fmt.Errorf("requested payload size %d is too small; minimum is %d", size, len(body)) } payload.Pad = strings.Repeat("A", size-len(body)) body, err = json.Marshal(payload) if err != nil { return nil, fmt.Errorf("encode padded ping payload: %w", err) } if len(body) != size { return nil, fmt.Errorf("encode padded ping payload: got %d bytes, want %d", len(body), size) } return body, nil } func parsePingPayload(body []byte) (pingPayload, error) { var payload pingPayload if err := json.Unmarshal(body, &payload); err != nil { return pingPayload{}, fmt.Errorf("decode ping payload: %w", err) } if payload.Seq == 0 { return pingPayload{}, fmt.Errorf("decode ping payload: seq must be greater than zero") } if payload.TSUnixNano <= 0 { return pingPayload{}, fmt.Errorf("decode ping payload: ts_ns must be greater than zero") } return payload, nil } func newPingTracker(timeout time.Duration) *pingTracker { return &pingTracker{ timeout: timeout, pending: make(map[uint64]pendingPing), seen: make(map[uint64]struct{}), } } func (t *pingTracker) markSent(seq uint64, sentAt time.Time) { t.sent++ t.pending[seq] = pendingPing{deadline: sentAt.Add(t.timeout)} t.seen[seq] = struct{}{} } func (t *pingTracker) observeReply(payload pingPayload, receivedAt time.Time) replyResult { if _, ok := t.seen[payload.Seq]; !ok { return replyResult{disposition: replyUnexpected} } if _, ok := t.pending[payload.Seq]; !ok { t.duplicates++ return replyResult{disposition: replyDuplicate} } delete(t.pending, payload.Seq) rtt := receivedAt.Sub(time.Unix(0, payload.TSUnixNano)) if rtt < 0 { rtt = 0 } t.samples = append(t.samples, rtt) return replyResult{ disposition: replyMatched, rtt: rtt, } } func (t *pingTracker) expire(now time.Time) []uint64 { expired := make([]uint64, 0) for seq, pending := range t.pending { if !pending.deadline.After(now) { expired = append(expired, seq) delete(t.pending, seq) } } sort.Slice(expired, func(i, j int) bool { return expired[i] < expired[j] }) return expired } func (t *pingTracker) pendingCount() int { return len(t.pending) } func (t *pingTracker) summary() rttSummary { return calculateRTTSummary(t.samples, t.sent, t.duplicates) } func calculateRTTSummary(samples []time.Duration, sent, duplicates int) rttSummary { summary := rttSummary{ Sent: sent, Received: len(samples), Duplicates: duplicates, } if sent > 0 { summary.LossPct = float64(sent-len(samples)) * 100 / float64(sent) } if len(samples) == 0 { return summary } sorted := append([]time.Duration(nil), samples...) sort.Slice(sorted, func(i, j int) bool { return sorted[i] < sorted[j] }) var sum float64 for _, sample := range sorted { sum += float64(sample) } avg := sum / float64(len(sorted)) var variance float64 for _, sample := range sorted { delta := float64(sample) - avg variance += delta * delta } variance /= float64(len(sorted)) summary.Min = sorted[0] summary.Avg = time.Duration(math.Round(avg)) summary.Max = sorted[len(sorted)-1] summary.P50 = percentileDuration(sorted, 0.50) summary.P95 = percentileDuration(sorted, 0.95) summary.P99 = percentileDuration(sorted, 0.99) summary.StdDev = time.Duration(math.Round(math.Sqrt(variance))) summary.HasSamples = true return summary } func percentileDuration(sorted []time.Duration, percentile float64) time.Duration { if len(sorted) == 0 { return 0 } if percentile <= 0 { return sorted[0] } if percentile >= 1 { return sorted[len(sorted)-1] } index := int(math.Ceil(percentile*float64(len(sorted)))) - 1 if index < 0 { index = 0 } if index >= len(sorted) { index = len(sorted) - 1 } return sorted[index] } func formatRTT(duration time.Duration) string { return fmt.Sprintf("%.2fms", float64(duration)/float64(time.Millisecond)) } func writePingHeader(w io.Writer, cfg config) error { _, err := fmt.Fprintf(w, "UDP PING %s via %s (payload=%d bytes, UDP)\n", cfg.to, cfg.server, cfg.size) return err } func writeMatchedReply(w io.Writer, seq uint64, rtt time.Duration) error { _, err := fmt.Fprintf(w, "seq=%d rtt=%s\n", seq, formatRTT(rtt)) return err } func writeTimeout(w io.Writer, seq uint64) error { _, err := fmt.Fprintf(w, "seq=%d timeout\n", seq) return err } func writeSummary(w io.Writer, target string, summary rttSummary) error { if _, err := fmt.Fprintf(w, "--- %s udp ping statistics ---\n", target); err != nil { return err } if _, err := fmt.Fprintf( w, "%d packets transmitted, %d received, %d duplicates, %.2f%% packet loss\n", summary.Sent, summary.Received, summary.Duplicates, summary.LossPct, ); err != nil { return err } if !summary.HasSamples { _, err := fmt.Fprintln(w, "rtt min/avg/max/p50/p95/p99 = n/a/n/a/n/a/n/a/n/a/n/a, stddev=n/a") return err } _, err := fmt.Fprintf( w, "rtt min/avg/max/p50/p95/p99 = %s/%s/%s/%s/%s/%s, stddev=%s\n", formatRTT(summary.Min), formatRTT(summary.Avg), formatRTT(summary.Max), formatRTT(summary.P50), formatRTT(summary.P95), formatRTT(summary.P99), formatRTT(summary.StdDev), ) return err } func expiryPollInterval(timeout time.Duration) time.Duration { interval := timeout / 4 if interval < minExpiryPoll { return minExpiryPoll } if interval > maxExpiryPoll { return maxExpiryPoll } return interval }