package peer import ( "fmt" "net" "os" "path/filepath" "sync/atomic" "omnisocketgo/cmd/internal/latencylog" "omnisocketgo/cmd/internal/protocol" "omnisocketgo/cmd/internal/transport" ) // UDPClient 表示一个通过 UDP 连接到 server 的 peer。 type UDPClient struct { id string conn *transport.UDPConn logger latencylog.Logger nextID uint64 } // DialUDP 通过 UDP 连接到 server,并发送 register 消息完成身份注册。 func DialUDP(serverAddr, peerID string, opts ...Option) (*UDPClient, error) { options := clientOptions{ logger: latencylog.NoopLogger{}, udpLinuxTimestamping: true, } for _, opt := range opts { opt(&options) } if options.logger == nil { options.logger = latencylog.NoopLogger{} } udpServerAddr, err := net.ResolveUDPAddr("udp", serverAddr) if err != nil { return nil, fmt.Errorf("peer: resolve udp server addr %s: %w", serverAddr, err) } var localAddr *net.UDPAddr if options.bindIP != "" { ip := net.ParseIP(options.bindIP) if ip == nil { return nil, fmt.Errorf("peer: invalid bind ip %q", options.bindIP) } localAddr = &net.UDPAddr{IP: ip} } rawConn, err := net.DialUDP("udp", localAddr, udpServerAddr) if err != nil { return nil, fmt.Errorf("peer: dial udp server %s: %w", serverAddr, err) } conn, err := transport.NewUDPConn( rawConn, nil, // peer 侧已连接模式,不需要指定 peerAddr transport.WithUDPLogger(options.logger, latencylog.NodeRolePeer, peerID), transport.WithUDPLinuxTimestamping(options.udpLinuxTimestamping), transport.WithUDPTXTimestampDebugLogger(options.txTimestampDebugLogger), ) if err != nil { _ = rawConn.Close() return nil, fmt.Errorf("peer: create udp transport conn: %w", err) } client := &UDPClient{ id: peerID, conn: conn, logger: options.logger, } // 发送 register 消息完成身份注册 if err := conn.Send(protocol.Message{ Type: protocol.MessageTypeRegister, From: peerID, To: protocol.ServerPeerID, }); err != nil { _ = conn.Close() return nil, fmt.Errorf("peer: udp register with server: %w", err) } return client, nil } // ID 返回当前 client 的 peer 标识。 func (c *UDPClient) ID() string { return c.id } // SendText 向目标 peer 发送一条文本消息。 func (c *UDPClient) SendText(to, body string) error { msg := protocol.Message{ Type: protocol.MessageTypeText, ID: c.nextMessageID(), From: c.id, To: to, } latencylog.LogMessageEvent(c.logger, latencylog.NodeRolePeer, c.id, latencylog.EventAAppPrepBegin, msg) msg.Body = []byte(body) return c.conn.Send(msg) } // SendFile 向目标 peer 发送一条文件消息。 func (c *UDPClient) SendFile(to, fileName string, body []byte) error { msg := protocol.Message{ Type: protocol.MessageTypeFile, ID: c.nextMessageID(), From: c.id, To: to, FileName: fileName, } latencylog.LogMessageEvent(c.logger, latencylog.NodeRolePeer, c.id, latencylog.EventAAppPrepBegin, msg) bodyCopy := make([]byte, len(body)) copy(bodyCopy, body) msg.Body = bodyCopy return c.conn.Send(msg) } // SendFilePath 从本地文件读取内容并发送给目标 peer。 func (c *UDPClient) SendFilePath(to, path string) error { msg := protocol.Message{ Type: protocol.MessageTypeFile, ID: c.nextMessageID(), From: c.id, To: to, FileName: filepath.Base(path), } latencylog.LogMessageEvent(c.logger, latencylog.NodeRolePeer, c.id, latencylog.EventAAppPrepBegin, msg) body, err := os.ReadFile(path) if err != nil { return fmt.Errorf("peer: read file %s: %w", path, err) } msg.Body = body return c.conn.Send(msg) } // Receive 读取一条来自 server 的消息。 func (c *UDPClient) Receive() (protocol.Message, error) { msg, _, err := c.conn.Receive() if err != nil { return protocol.Message{}, fmt.Errorf("peer: udp receive from server: %w", err) } latencylog.LogMessageEvent(c.logger, latencylog.NodeRolePeer, c.id, latencylog.EventBAppRecv, msg) return msg, nil } // ReceiveLoop 持续接收 server 消息并交给 handler 处理。 func (c *UDPClient) ReceiveLoop(handler func(protocol.Message) error) error { return c.conn.ReceiveLoop(func(msg protocol.Message, _ *net.UDPAddr) error { switch msg.Type { case protocol.MessageTypeText, protocol.MessageTypeFile, protocol.MessageTypeError: latencylog.LogMessageEvent(c.logger, latencylog.NodeRolePeer, c.id, latencylog.EventBAppRecv, msg) return handler(msg) default: return fmt.Errorf("peer: unexpected message type from server: %s", msg.Type) } }) } // PersistMessage 将收到的业务消息写入本地磁盘。 func (c *UDPClient) PersistMessage(msg protocol.Message, inboxDir string) (string, error) { if !latencylog.IsBusinessMessage(msg) { return "", fmt.Errorf("peer: cannot persist message type %s", msg.Type) } if inboxDir == "" { return "", fmt.Errorf("peer: inbox directory is required") } latencylog.LogMessageEvent(c.logger, latencylog.NodeRolePeer, c.id, latencylog.EventBPersistBegin, msg) if err := os.MkdirAll(inboxDir, 0o755); err != nil { return "", fmt.Errorf("peer: create inbox dir %s: %w", inboxDir, err) } path, err := persistMessageToDisk(msg, inboxDir) if err != nil { return "", err } latencylog.LogMessageEvent(c.logger, latencylog.NodeRolePeer, c.id, latencylog.EventBPersistEnd, msg) return path, nil } // Close 关闭与 server 的 UDP 连接。 func (c *UDPClient) Close() error { return c.conn.Close() } func (c *UDPClient) nextMessageID() uint64 { return atomic.AddUint64(&c.nextID, 1) }