Files
OmniSocketGo/cmd/internal/peer/udp_client_test.go

338 lines
8.4 KiB
Go

package peer
import (
"net"
"os"
"path/filepath"
"sync"
"testing"
"time"
"omnisocketgo/cmd/internal/latencylog"
"omnisocketgo/cmd/internal/protocol"
"omnisocketgo/cmd/internal/server"
"omnisocketgo/cmd/internal/transport"
)
type recordingLatencyLogger struct {
mu sync.Mutex
events []latencylog.Event
}
func (l *recordingLatencyLogger) LogEvent(event latencylog.Event) error {
l.mu.Lock()
defer l.mu.Unlock()
l.events = append(l.events, event)
return nil
}
func (l *recordingLatencyLogger) Events() []latencylog.Event {
l.mu.Lock()
defer l.mu.Unlock()
return append([]latencylog.Event(nil), l.events...)
}
type recordingUDPTXTimestampDebugLogger struct {
mu sync.Mutex
records []transport.TXTimestampDebugRecord
}
func (l *recordingUDPTXTimestampDebugLogger) LogTXTimestampDebugRecord(record transport.TXTimestampDebugRecord) error {
l.mu.Lock()
defer l.mu.Unlock()
l.records = append(l.records, record)
return nil
}
func (l *recordingUDPTXTimestampDebugLogger) Records() []transport.TXTimestampDebugRecord {
l.mu.Lock()
defer l.mu.Unlock()
return append([]transport.TXTimestampDebugRecord(nil), l.records...)
}
func TestUDPDialAndSendText(t *testing.T) {
hubAddr := startUDPTestHub(t)
clientA, err := DialUDP(hubAddr.String(), "peer-a")
if err != nil {
t.Fatalf("DialUDP(peer-a) error = %v", err)
}
defer clientA.Close()
clientB, err := DialUDP(hubAddr.String(), "peer-b")
if err != nil {
t.Fatalf("DialUDP(peer-b) error = %v", err)
}
defer clientB.Close()
time.Sleep(50 * time.Millisecond)
if err := clientA.SendText("peer-b", "hello from udp"); err != nil {
t.Fatalf("SendText() error = %v", err)
}
msg := receiveUDPClientMessage(t, clientB)
if msg.Type != protocol.MessageTypeText {
t.Fatalf("message type = %s, want text", msg.Type)
}
if string(msg.Body) != "hello from udp" {
t.Fatalf("message body = %q, want %q", string(msg.Body), "hello from udp")
}
}
func TestUDPClientID(t *testing.T) {
hubAddr := startUDPTestHub(t)
client, err := DialUDP(hubAddr.String(), "my-peer-id")
if err != nil {
t.Fatalf("DialUDP() error = %v", err)
}
defer client.Close()
if got := client.ID(); got != "my-peer-id" {
t.Fatalf("ID() = %q, want %q", got, "my-peer-id")
}
}
func TestUDPClientPersistMessage(t *testing.T) {
hubAddr := startUDPTestHub(t)
client, err := DialUDP(hubAddr.String(), "peer-persist")
if err != nil {
t.Fatalf("DialUDP() error = %v", err)
}
defer client.Close()
inboxDir := t.TempDir()
textMsg := protocol.Message{
Type: protocol.MessageTypeText,
ID: 1,
From: "sender",
To: "peer-persist",
Body: []byte("persisted text"),
}
path, err := client.PersistMessage(textMsg, inboxDir)
if err != nil {
t.Fatalf("PersistMessage(text) error = %v", err)
}
if !filepath.IsAbs(path) && path == "" {
t.Fatalf("PersistMessage(text) returned empty path")
}
fileMsg := protocol.Message{
Type: protocol.MessageTypeFile,
ID: 2,
From: "sender",
To: "peer-persist",
FileName: "test.bin",
Body: []byte{0x01, 0x02, 0x03},
}
filePath, err := client.PersistMessage(fileMsg, inboxDir)
if err != nil {
t.Fatalf("PersistMessage(file) error = %v", err)
}
content, err := os.ReadFile(filePath)
if err != nil {
t.Fatalf("ReadFile(%s) error = %v", filePath, err)
}
if len(content) != 3 || content[0] != 0x01 {
t.Fatalf("file content mismatch: got %v", content)
}
}
func TestUDPClientSendFile(t *testing.T) {
hubAddr := startUDPTestHub(t)
clientA, err := DialUDP(hubAddr.String(), "peer-a")
if err != nil {
t.Fatalf("DialUDP(peer-a) error = %v", err)
}
defer clientA.Close()
clientB, err := DialUDP(hubAddr.String(), "peer-b")
if err != nil {
t.Fatalf("DialUDP(peer-b) error = %v", err)
}
defer clientB.Close()
time.Sleep(50 * time.Millisecond)
fileBody := []byte{0xDE, 0xAD, 0xBE, 0xEF}
if err := clientA.SendFile("peer-b", "test.bin", fileBody); err != nil {
t.Fatalf("SendFile() error = %v", err)
}
msg := receiveUDPClientMessage(t, clientB)
if msg.Type != protocol.MessageTypeFile {
t.Fatalf("message type = %s, want file", msg.Type)
}
if msg.FileName != "test.bin" {
t.Fatalf("file name = %q, want %q", msg.FileName, "test.bin")
}
if len(msg.Body) != 4 {
t.Fatalf("body length = %d, want 4", len(msg.Body))
}
}
func TestUDPClientBidirectionalLogsAndServerDiagnostics(t *testing.T) {
serverLogger := &recordingLatencyLogger{}
serverDebugLogger := &recordingUDPTXTimestampDebugLogger{}
hubAddr := startUDPTestHubWithOptions(
t,
server.WithUDPLogger(serverLogger),
server.WithUDPTXTimestampDebugLogger(serverDebugLogger),
)
clientALogger := &recordingLatencyLogger{}
clientADebugLogger := &recordingUDPTXTimestampDebugLogger{}
clientA, err := DialUDP(
hubAddr.String(),
"peer-a",
WithLogger(clientALogger),
WithTXTimestampDebugLogger(clientADebugLogger),
)
if err != nil {
t.Fatalf("DialUDP(peer-a) error = %v", err)
}
defer clientA.Close()
clientBLogger := &recordingLatencyLogger{}
clientBDebugLogger := &recordingUDPTXTimestampDebugLogger{}
clientB, err := DialUDP(
hubAddr.String(),
"peer-b",
WithLogger(clientBLogger),
WithTXTimestampDebugLogger(clientBDebugLogger),
)
if err != nil {
t.Fatalf("DialUDP(peer-b) error = %v", err)
}
defer clientB.Close()
time.Sleep(50 * time.Millisecond)
if err := clientA.SendText("peer-b", "hello from peer-a"); err != nil {
t.Fatalf("clientA.SendText() error = %v", err)
}
msgFromA := receiveUDPClientMessage(t, clientB)
if string(msgFromA.Body) != "hello from peer-a" {
t.Fatalf("message body = %q, want %q", string(msgFromA.Body), "hello from peer-a")
}
if err := clientB.SendText("peer-a", "hello from peer-b"); err != nil {
t.Fatalf("clientB.SendText() error = %v", err)
}
msgFromB := receiveUDPClientMessage(t, clientA)
if string(msgFromB.Body) != "hello from peer-b" {
t.Fatalf("message body = %q, want %q", string(msgFromB.Body), "hello from peer-b")
}
assertPeerEvent(t, clientBLogger.Events(), latencylog.EventBRXSoftware, "peer-a", "peer-b", msgFromA.ID)
assertPeerEvent(t, clientBLogger.Events(), latencylog.EventBAppRecv, "peer-a", "peer-b", msgFromA.ID)
assertPeerEvent(t, clientALogger.Events(), latencylog.EventBRXSoftware, "peer-b", "peer-a", msgFromB.ID)
assertPeerEvent(t, clientALogger.Events(), latencylog.EventBAppRecv, "peer-b", "peer-a", msgFromB.ID)
if len(clientADebugLogger.Records()) == 0 {
t.Fatal("clientA debug records = 0, want at least 1")
}
if len(clientBDebugLogger.Records()) == 0 {
t.Fatal("clientB debug records = 0, want at least 1")
}
if len(serverLogger.Events()) == 0 {
t.Fatal("server latency events = 0, want at least 1")
}
if len(serverDebugLogger.Records()) == 0 {
t.Fatal("server debug records = 0, want at least 1")
}
}
func startUDPTestHub(t *testing.T) *net.UDPAddr {
return startUDPTestHubWithOptions(t)
}
func startUDPTestHubWithOptions(t *testing.T, opts ...server.UDPOption) *net.UDPAddr {
t.Helper()
addr, err := net.ResolveUDPAddr("udp", "127.0.0.1:0")
if err != nil {
t.Fatalf("ResolveUDPAddr() error = %v", err)
}
conn, err := net.ListenUDP("udp", addr)
if err != nil {
t.Fatalf("ListenUDP() error = %v", err)
}
hub, err := server.NewUDPHub(conn, opts...)
if err != nil {
_ = conn.Close()
t.Fatalf("NewUDPHub() error = %v", err)
}
go func() {
_ = hub.Serve()
}()
t.Cleanup(func() {
_ = hub.Close()
})
return conn.LocalAddr().(*net.UDPAddr)
}
func receiveUDPClientMessage(t *testing.T, client *UDPClient) protocol.Message {
t.Helper()
type result struct {
msg protocol.Message
err error
}
ch := make(chan result, 1)
go func() {
msg, err := client.Receive()
ch <- result{msg: msg, err: err}
}()
select {
case r := <-ch:
if r.err != nil {
t.Fatalf("Receive() error = %v", r.err)
}
return r.msg
case <-time.After(2 * time.Second):
t.Fatal("Receive() timed out after 2s")
return protocol.Message{}
}
}
func assertPeerEvent(t *testing.T, events []latencylog.Event, wantEvent, wantFrom, wantTo string, wantMessageID uint64) {
t.Helper()
for _, event := range events {
if event.Event != wantEvent {
continue
}
if event.From != wantFrom || event.To != wantTo || event.MessageID != wantMessageID {
continue
}
if event.TsUnixNano <= 0 {
t.Fatalf("event %s timestamp must be positive: %+v", wantEvent, event)
}
return
}
t.Fatalf("missing event %s from %s to %s for message %d in %+v", wantEvent, wantFrom, wantTo, wantMessageID, events)
}
// Ensure transport package is used (needed for WithTXTimestampDebugLogger option).
var _ transport.TXTimestampDebugLogger = nil