package peer import ( "fmt" "os" "path/filepath" "sync/atomic" "omnisocketgo/cmd/internal/latencylog" "omnisocketgo/cmd/internal/protocol" "omnisocketgo/cmd/internal/transport" ) // KCPClient 表示一个通过 KCP 连接到 server 的 peer。 type KCPClient struct { id string conn *transport.KCPConn logger latencylog.Logger nextID uint64 } // DialKCP 通过 KCP 连接到 server,并发送 register 消息完成身份注册。 func DialKCP(serverAddr, peerID string, opts ...Option) (*KCPClient, error) { options := clientOptions{ logger: latencylog.NoopLogger{}, } for _, opt := range opts { opt(&options) } if options.logger == nil { options.logger = latencylog.NoopLogger{} } dialAddr := serverAddr if options.kcpDialAddress != "" { dialAddr = options.kcpDialAddress } session, err := transport.DialKCPSession( dialAddr, options.bindIP, options.bindDevice, options.kcpPacketDebugLogger, latencylog.NodeRolePeer, peerID, ) if err != nil { return nil, fmt.Errorf("peer: dial kcp server: %w", err) } conn, err := transport.NewKCPConn( session, transport.WithKCPLogger(options.logger, latencylog.NodeRolePeer, peerID), transport.WithKCPSessionStatsLogger(options.kcpSessionStatsLogger, options.kcpSessionStatsInterval), ) if err != nil { _ = session.Close() return nil, fmt.Errorf("peer: create kcp transport conn: %w", err) } client := &KCPClient{ id: peerID, conn: conn, logger: options.logger, } if err := conn.Send(protocol.Message{ Type: protocol.MessageTypeRegister, From: peerID, To: protocol.ServerPeerID, }); err != nil { _ = conn.Close() return nil, fmt.Errorf("peer: register with kcp server: %w", err) } return client, nil } // ID 返回当前 client 的 peer 标识。 func (c *KCPClient) ID() string { return c.id } // SendText 向目标 peer 发送一条文本消息。 func (c *KCPClient) SendText(to, body string) error { msg := protocol.Message{ Type: protocol.MessageTypeText, ID: c.nextMessageID(), From: c.id, To: to, } latencylog.LogMessageEvent(c.logger, latencylog.NodeRolePeer, c.id, latencylog.EventAAppPrepBegin, msg) msg.Body = []byte(body) return c.conn.Send(msg) } // SendFile 向目标 peer 发送一条文件消息。 func (c *KCPClient) SendFile(to, fileName string, body []byte) error { msg := protocol.Message{ Type: protocol.MessageTypeFile, ID: c.nextMessageID(), From: c.id, To: to, FileName: fileName, } latencylog.LogMessageEvent(c.logger, latencylog.NodeRolePeer, c.id, latencylog.EventAAppPrepBegin, msg) bodyCopy := make([]byte, len(body)) copy(bodyCopy, body) msg.Body = bodyCopy return c.conn.Send(msg) } // SendFilePath 从本地文件读取内容并发送给目标 peer。 func (c *KCPClient) SendFilePath(to, path string) error { msg := protocol.Message{ Type: protocol.MessageTypeFile, ID: c.nextMessageID(), From: c.id, To: to, FileName: filepath.Base(path), } latencylog.LogMessageEvent(c.logger, latencylog.NodeRolePeer, c.id, latencylog.EventAAppPrepBegin, msg) body, err := os.ReadFile(path) if err != nil { return fmt.Errorf("peer: read file %s: %w", path, err) } msg.Body = body return c.conn.Send(msg) } // Receive 读取一条来自 server 的消息。 func (c *KCPClient) Receive() (protocol.Message, error) { msg, err := c.conn.Receive() if err != nil { return protocol.Message{}, fmt.Errorf("peer: receive from kcp server: %w", err) } latencylog.LogMessageEvent(c.logger, latencylog.NodeRolePeer, c.id, latencylog.EventBAppRecv, msg) return msg, nil } // ReceiveLoop 持续接收 server 消息并交给 handler 处理。 func (c *KCPClient) ReceiveLoop(handler func(protocol.Message) error) error { return c.conn.ReceiveLoop(func(msg protocol.Message) error { switch msg.Type { case protocol.MessageTypeText, protocol.MessageTypeFile, protocol.MessageTypeError: latencylog.LogMessageEvent(c.logger, latencylog.NodeRolePeer, c.id, latencylog.EventBAppRecv, msg) return handler(msg) default: return fmt.Errorf("peer: unexpected message type from kcp server: %s", msg.Type) } }) } // PersistMessage 将收到的业务消息写入本地磁盘。 func (c *KCPClient) PersistMessage(msg protocol.Message, inboxDir string) (string, error) { if !latencylog.IsBusinessMessage(msg) { return "", fmt.Errorf("peer: cannot persist message type %s", msg.Type) } if inboxDir == "" { return "", fmt.Errorf("peer: inbox directory is required") } latencylog.LogMessageEvent(c.logger, latencylog.NodeRolePeer, c.id, latencylog.EventBPersistBegin, msg) if err := os.MkdirAll(inboxDir, 0o755); err != nil { return "", fmt.Errorf("peer: create inbox dir %s: %w", inboxDir, err) } path, err := persistMessageToDisk(msg, inboxDir) if err != nil { return "", err } latencylog.LogMessageEvent(c.logger, latencylog.NodeRolePeer, c.id, latencylog.EventBPersistEnd, msg) return path, nil } // Close 关闭与 server 的 KCP 会话。 func (c *KCPClient) Close() error { return c.conn.Close() } func (c *KCPClient) nextMessageID() uint64 { return atomic.AddUint64(&c.nextID, 1) }