package peer import ( "fmt" "net" "os" "path/filepath" "sync/atomic" "syscall" "omnisocketgo/cmd/internal/latencylog" "omnisocketgo/cmd/internal/protocol" "omnisocketgo/cmd/internal/transport" ) var dialServer = dialServerWithOptions type clientOptions struct { logger latencylog.Logger bindIP string bindDevice string } // Option 用于配置 Client 的可选行为,例如时延日志。 type Option func(*clientOptions) // WithLogger 为 client 注入时延日志记录器。 func WithLogger(logger latencylog.Logger) Option { return func(options *clientOptions) { options.logger = logger } } // WithBindIP 指定拨号时使用的本地源 IP。 func WithBindIP(ip string) Option { return func(options *clientOptions) { options.bindIP = ip } } // WithBindDevice 指定拨号时绑定的 Linux 网卡名,例如 eth0 或 wwan0。 func WithBindDevice(device string) Option { return func(options *clientOptions) { options.bindDevice = device } } // Client 表示一个已经连接到 server 的 peer。 type Client struct { id string conn *transport.TCPConn logger latencylog.Logger nextID uint64 } // Dial 连接到 server,并立即发送 register 消息完成身份注册。 func Dial(serverAddr, peerID string, opts ...Option) (*Client, error) { options := clientOptions{ logger: latencylog.NoopLogger{}, } for _, opt := range opts { opt(&options) } if options.logger == nil { options.logger = latencylog.NoopLogger{} } rawConn, err := dialServer(serverAddr, options) if err != nil { return nil, fmt.Errorf("peer: dial server: %w", err) } conn, err := transport.NewTCPConn( rawConn, transport.WithLogger(options.logger, latencylog.NodeRolePeer, peerID), ) if err != nil { _ = rawConn.Close() return nil, fmt.Errorf("peer: create transport conn: %w", err) } client := &Client{ id: peerID, conn: conn, logger: options.logger, } if err := conn.Send(protocol.Message{ //向 server 发送一条 register 消息,完成身份注册。 Type: protocol.MessageTypeRegister, From: peerID, To: protocol.ServerPeerID, }); err != nil { _ = conn.Close() return nil, fmt.Errorf("peer: register with server: %w", err) } return client, nil } // ID 返回当前 client 的 peer 标识。 func (c *Client) ID() string { return c.id } // SendText 向目标 peer 发送一条文本消息。 func (c *Client) SendText(to, body string) error { msg := protocol.Message{ Type: protocol.MessageTypeText, ID: c.nextMessageID(), From: c.id, To: to, } // 记录 A 端应用开始准备消息的时间点。 latencylog.LogMessageEvent(c.logger, latencylog.NodeRolePeer, c.id, latencylog.EventAAppPrepBegin, msg) msg.Body = []byte(body) return c.conn.Send(msg) } // SendFile 向目标 peer 发送一条文件消息。 func (c *Client) SendFile(to, fileName string, body []byte) error { msg := protocol.Message{ Type: protocol.MessageTypeFile, ID: c.nextMessageID(), From: c.id, To: to, FileName: fileName, } // 记录 A 端应用开始准备消息的时间点。 latencylog.LogMessageEvent(c.logger, latencylog.NodeRolePeer, c.id, latencylog.EventAAppPrepBegin, msg) bodyCopy := make([]byte, len(body)) copy(bodyCopy, body) msg.Body = bodyCopy return c.conn.Send(msg) } // SendFilePath 从本地文件读取内容并发送给目标 peer。 func (c *Client) SendFilePath(to, path string) error { msg := protocol.Message{ Type: protocol.MessageTypeFile, ID: c.nextMessageID(), From: c.id, To: to, FileName: filepath.Base(path), } // 记录 A 端应用开始准备消息的时间点。 latencylog.LogMessageEvent(c.logger, latencylog.NodeRolePeer, c.id, latencylog.EventAAppPrepBegin, msg) body, err := os.ReadFile(path) if err != nil { return fmt.Errorf("peer: read file %s: %w", path, err) } msg.Body = body return c.conn.Send(msg) } // Receive 读取一条来自 server 的消息。 func (c *Client) Receive() (protocol.Message, error) { msg, err := c.conn.Receive() //从底层 TCP 连接读取一条消息,返回一个 protocol.Message 结构体。 if err != nil { return protocol.Message{}, fmt.Errorf("peer: receive from server: %w", err) } latencylog.LogMessageEvent(c.logger, latencylog.NodeRolePeer, c.id, latencylog.EventBAppRecv, msg) return msg, nil } // ReceiveLoop 持续接收 server 消息并交给 handler 处理。 func (c *Client) ReceiveLoop(handler func(protocol.Message) error) error { return c.conn.ReceiveLoop(func(msg protocol.Message) error { switch msg.Type { case protocol.MessageTypeText, protocol.MessageTypeFile, protocol.MessageTypeError: // 记录 B 端应用真正读到完整消息的时间点。 latencylog.LogMessageEvent(c.logger, latencylog.NodeRolePeer, c.id, latencylog.EventBAppRecv, msg) return handler(msg) default: return fmt.Errorf("peer: unexpected message type from server: %s", msg.Type) } }) } // Close 关闭与 server 的连接。 func (c *Client) Close() error { return c.conn.Close() } func (c *Client) nextMessageID() uint64 { return atomic.AddUint64(&c.nextID, 1) } // 根据提供的选项创建 TCP 连接,并进行必要的配置,例如绑定本地 IP 或网卡。成功建立连接后,返回一个封装了该连接的 TCPConn 实例。 func dialServerWithOptions(serverAddr string, options clientOptions) (net.Conn, error) { dialer, err := buildDialer(options) if err != nil { return nil, err } return dialer.Dial("tcp", serverAddr) } func buildDialer(options clientOptions) (*net.Dialer, error) { dialer := &net.Dialer{} if options.bindIP != "" { ip := net.ParseIP(options.bindIP) if ip == nil { return nil, fmt.Errorf("peer: invalid bind ip %q", options.bindIP) } dialer.LocalAddr = &net.TCPAddr{IP: ip} } if options.bindDevice != "" { device := options.bindDevice dialer.Control = func(_, _ string, rawConn syscall.RawConn) error { var bindErr error if err := rawConn.Control(func(fd uintptr) { bindErr = syscall.BindToDevice(int(fd), device) }); err != nil { return err } if bindErr != nil { return fmt.Errorf("peer: bind device %s: %w", device, bindErr) } return nil } } return dialer, nil }