Files
OmniSocketGo/cmd/internal/peer/udp_client.go

205 lines
5.5 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package peer
import (
"fmt"
"net"
"os"
"path/filepath"
"sync/atomic"
"omnisocketgo/cmd/internal/latencylog"
"omnisocketgo/cmd/internal/protocol"
"omnisocketgo/cmd/internal/transport"
)
// UDPClient 表示一个通过 UDP 连接到 server 的 peer。
type UDPClient struct {
id string
conn *transport.UDPConn
logger latencylog.Logger
nextID uint64
}
// DialUDP 通过 UDP 连接到 server并发送 register 消息完成身份注册。
func DialUDP(serverAddr, peerID string, opts ...Option) (*UDPClient, error) {
options := clientOptions{
logger: latencylog.NoopLogger{},
udpLinuxTimestamping: true,
}
for _, opt := range opts {
opt(&options)
}
if options.logger == nil {
options.logger = latencylog.NoopLogger{}
}
udpServerAddr, err := net.ResolveUDPAddr("udp", serverAddr)
if err != nil {
return nil, fmt.Errorf("peer: resolve udp server addr %s: %w", serverAddr, err)
}
var localAddr *net.UDPAddr
if options.bindIP != "" {
ip := net.ParseIP(options.bindIP)
if ip == nil {
return nil, fmt.Errorf("peer: invalid bind ip %q", options.bindIP)
}
localAddr = &net.UDPAddr{IP: ip}
}
rawConn, err := net.DialUDP("udp", localAddr, udpServerAddr)
if err != nil {
return nil, fmt.Errorf("peer: dial udp server %s: %w", serverAddr, err)
}
conn, err := transport.NewUDPConn(
rawConn,
nil, // peer 侧已连接模式,不需要指定 peerAddr
transport.WithUDPLogger(options.logger, latencylog.NodeRolePeer, peerID),
transport.WithUDPLinuxTimestamping(options.udpLinuxTimestamping),
transport.WithUDPTXTimestampDebugLogger(options.txTimestampDebugLogger),
)
if err != nil {
_ = rawConn.Close()
return nil, fmt.Errorf("peer: create udp transport conn: %w", err)
}
client := &UDPClient{
id: peerID,
conn: conn,
logger: options.logger,
}
// 发送 register 消息完成身份注册
if err := conn.Send(protocol.Message{
Type: protocol.MessageTypeRegister,
From: peerID,
To: protocol.ServerPeerID,
}); err != nil {
_ = conn.Close()
return nil, fmt.Errorf("peer: udp register with server: %w", err)
}
return client, nil
}
// ID 返回当前 client 的 peer 标识。
func (c *UDPClient) ID() string {
return c.id
}
// SendText 向目标 peer 发送一条文本消息。
func (c *UDPClient) SendText(to, body string) error {
msg := protocol.Message{
Type: protocol.MessageTypeText,
ID: c.nextMessageID(),
From: c.id,
To: to,
}
latencylog.LogMessageEvent(c.logger, latencylog.NodeRolePeer, c.id, latencylog.EventAAppPrepBegin, msg)
msg.Body = []byte(body)
return c.conn.Send(msg)
}
// SendFile 向目标 peer 发送一条文件消息。
func (c *UDPClient) SendFile(to, fileName string, body []byte) error {
msg := protocol.Message{
Type: protocol.MessageTypeFile,
ID: c.nextMessageID(),
From: c.id,
To: to,
FileName: fileName,
}
latencylog.LogMessageEvent(c.logger, latencylog.NodeRolePeer, c.id, latencylog.EventAAppPrepBegin, msg)
bodyCopy := make([]byte, len(body))
copy(bodyCopy, body)
msg.Body = bodyCopy
return c.conn.Send(msg)
}
// SendFilePath 从本地文件读取内容并发送给目标 peer。
func (c *UDPClient) SendFilePath(to, path string) error {
msg := protocol.Message{
Type: protocol.MessageTypeFile,
ID: c.nextMessageID(),
From: c.id,
To: to,
FileName: filepath.Base(path),
}
latencylog.LogMessageEvent(c.logger, latencylog.NodeRolePeer, c.id, latencylog.EventAAppPrepBegin, msg)
body, err := os.ReadFile(path)
if err != nil {
return fmt.Errorf("peer: read file %s: %w", path, err)
}
msg.Body = body
return c.conn.Send(msg)
}
// Receive 读取一条来自 server 的消息。
func (c *UDPClient) Receive() (protocol.Message, error) {
msg, _, err := c.conn.Receive()
if err != nil {
return protocol.Message{}, fmt.Errorf("peer: udp receive from server: %w", err)
}
latencylog.LogMessageEvent(c.logger, latencylog.NodeRolePeer, c.id, latencylog.EventBAppRecv, msg)
return msg, nil
}
// ReceiveLoop 持续接收 server 消息并交给 handler 处理。
func (c *UDPClient) ReceiveLoop(handler func(protocol.Message) error) error {
return c.conn.ReceiveLoop(func(msg protocol.Message, _ *net.UDPAddr) error {
switch msg.Type {
case protocol.MessageTypeText, protocol.MessageTypeFile, protocol.MessageTypeError:
latencylog.LogMessageEvent(c.logger, latencylog.NodeRolePeer, c.id, latencylog.EventBAppRecv, msg)
return handler(msg)
default:
return fmt.Errorf("peer: unexpected message type from server: %s", msg.Type)
}
})
}
// PersistMessage 将收到的业务消息写入本地磁盘。
func (c *UDPClient) PersistMessage(msg protocol.Message, inboxDir string) (string, error) {
if !latencylog.IsBusinessMessage(msg) {
return "", fmt.Errorf("peer: cannot persist message type %s", msg.Type)
}
if inboxDir == "" {
return "", fmt.Errorf("peer: inbox directory is required")
}
latencylog.LogMessageEvent(c.logger, latencylog.NodeRolePeer, c.id, latencylog.EventBPersistBegin, msg)
if err := os.MkdirAll(inboxDir, 0o755); err != nil {
return "", fmt.Errorf("peer: create inbox dir %s: %w", inboxDir, err)
}
path, err := persistMessageToDisk(msg, inboxDir)
if err != nil {
return "", err
}
latencylog.LogMessageEvent(c.logger, latencylog.NodeRolePeer, c.id, latencylog.EventBPersistEnd, msg)
return path, nil
}
// Close 关闭与 server 的 UDP 连接。
func (c *UDPClient) Close() error {
return c.conn.Close()
}
func (c *UDPClient) nextMessageID() uint64 {
return atomic.AddUint64(&c.nextID, 1)
}