Files
OmniSocketGo/cmd/internal/peer/kcp_client.go

186 lines
5.0 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package peer
import (
"fmt"
"os"
"path/filepath"
"sync/atomic"
"omnisocketgo/cmd/internal/latencylog"
"omnisocketgo/cmd/internal/protocol"
"omnisocketgo/cmd/internal/transport"
)
// KCPClient 表示一个通过 KCP 连接到 server 的 peer。
type KCPClient struct {
id string
conn *transport.KCPConn
logger latencylog.Logger
nextID uint64
}
// DialKCP 通过 KCP 连接到 server并发送 register 消息完成身份注册。
func DialKCP(serverAddr, peerID string, opts ...Option) (*KCPClient, error) {
options := clientOptions{
logger: latencylog.NoopLogger{},
}
for _, opt := range opts {
opt(&options)
}
if options.logger == nil {
options.logger = latencylog.NoopLogger{}
}
session, err := transport.DialKCPSession(
serverAddr,
options.bindIP,
options.bindDevice,
options.kcpPacketDebugLogger,
latencylog.NodeRolePeer,
peerID,
)
if err != nil {
return nil, fmt.Errorf("peer: dial kcp server: %w", err)
}
conn, err := transport.NewKCPConn(
session,
transport.WithKCPLogger(options.logger, latencylog.NodeRolePeer, peerID),
transport.WithKCPSessionStatsLogger(options.kcpSessionStatsLogger, options.kcpSessionStatsInterval),
)
if err != nil {
_ = session.Close()
return nil, fmt.Errorf("peer: create kcp transport conn: %w", err)
}
client := &KCPClient{
id: peerID,
conn: conn,
logger: options.logger,
}
if err := conn.Send(protocol.Message{
Type: protocol.MessageTypeRegister,
From: peerID,
To: protocol.ServerPeerID,
}); err != nil {
_ = conn.Close()
return nil, fmt.Errorf("peer: register with kcp server: %w", err)
}
return client, nil
}
// ID 返回当前 client 的 peer 标识。
func (c *KCPClient) ID() string {
return c.id
}
// SendText 向目标 peer 发送一条文本消息。
func (c *KCPClient) SendText(to, body string) error {
msg := protocol.Message{
Type: protocol.MessageTypeText,
ID: c.nextMessageID(),
From: c.id,
To: to,
}
latencylog.LogMessageEvent(c.logger, latencylog.NodeRolePeer, c.id, latencylog.EventAAppPrepBegin, msg)
msg.Body = []byte(body)
return c.conn.Send(msg)
}
// SendFile 向目标 peer 发送一条文件消息。
func (c *KCPClient) SendFile(to, fileName string, body []byte) error {
msg := protocol.Message{
Type: protocol.MessageTypeFile,
ID: c.nextMessageID(),
From: c.id,
To: to,
FileName: fileName,
}
latencylog.LogMessageEvent(c.logger, latencylog.NodeRolePeer, c.id, latencylog.EventAAppPrepBegin, msg)
bodyCopy := make([]byte, len(body))
copy(bodyCopy, body)
msg.Body = bodyCopy
return c.conn.Send(msg)
}
// SendFilePath 从本地文件读取内容并发送给目标 peer。
func (c *KCPClient) SendFilePath(to, path string) error {
msg := protocol.Message{
Type: protocol.MessageTypeFile,
ID: c.nextMessageID(),
From: c.id,
To: to,
FileName: filepath.Base(path),
}
latencylog.LogMessageEvent(c.logger, latencylog.NodeRolePeer, c.id, latencylog.EventAAppPrepBegin, msg)
body, err := os.ReadFile(path)
if err != nil {
return fmt.Errorf("peer: read file %s: %w", path, err)
}
msg.Body = body
return c.conn.Send(msg)
}
// Receive 读取一条来自 server 的消息。
func (c *KCPClient) Receive() (protocol.Message, error) {
msg, err := c.conn.Receive()
if err != nil {
return protocol.Message{}, fmt.Errorf("peer: receive from kcp server: %w", err)
}
latencylog.LogMessageEvent(c.logger, latencylog.NodeRolePeer, c.id, latencylog.EventBAppRecv, msg)
return msg, nil
}
// ReceiveLoop 持续接收 server 消息并交给 handler 处理。
func (c *KCPClient) ReceiveLoop(handler func(protocol.Message) error) error {
return c.conn.ReceiveLoop(func(msg protocol.Message) error {
switch msg.Type {
case protocol.MessageTypeText, protocol.MessageTypeFile, protocol.MessageTypeError:
latencylog.LogMessageEvent(c.logger, latencylog.NodeRolePeer, c.id, latencylog.EventBAppRecv, msg)
return handler(msg)
default:
return fmt.Errorf("peer: unexpected message type from kcp server: %s", msg.Type)
}
})
}
// PersistMessage 将收到的业务消息写入本地磁盘。
func (c *KCPClient) PersistMessage(msg protocol.Message, inboxDir string) (string, error) {
if !latencylog.IsBusinessMessage(msg) {
return "", fmt.Errorf("peer: cannot persist message type %s", msg.Type)
}
if inboxDir == "" {
return "", fmt.Errorf("peer: inbox directory is required")
}
latencylog.LogMessageEvent(c.logger, latencylog.NodeRolePeer, c.id, latencylog.EventBPersistBegin, msg)
if err := os.MkdirAll(inboxDir, 0o755); err != nil {
return "", fmt.Errorf("peer: create inbox dir %s: %w", inboxDir, err)
}
path, err := persistMessageToDisk(msg, inboxDir)
if err != nil {
return "", err
}
latencylog.LogMessageEvent(c.logger, latencylog.NodeRolePeer, c.id, latencylog.EventBPersistEnd, msg)
return path, nil
}
// Close 关闭与 server 的 KCP 会话。
func (c *KCPClient) Close() error {
return c.conn.Close()
}
func (c *KCPClient) nextMessageID() uint64 {
return atomic.AddUint64(&c.nextID, 1)
}