143 lines
3.9 KiB
Go
143 lines
3.9 KiB
Go
package transport
|
||
|
||
import (
|
||
"fmt"
|
||
"net"
|
||
"sync"
|
||
"syscall"
|
||
|
||
"omnisocketgo/cmd/internal/latencylog"
|
||
"omnisocketgo/cmd/internal/protocol"
|
||
)
|
||
|
||
// UDPConn 是对 UDP 连接的轻量封装。
|
||
// server 侧共享一个 net.UDPConn,通过 SendTo 指定目标地址;
|
||
// peer 侧使用已连接的 net.UDPConn,直接 Send 即可。
|
||
type UDPConn struct {
|
||
conn *net.UDPConn
|
||
peerAddr *net.UDPAddr
|
||
raw syscall.RawConn
|
||
|
||
logger latencylog.Logger
|
||
txTimestampDebugLogger TXTimestampDebugLogger
|
||
txPacketSeq uint32
|
||
pendingTX map[uint32]udpTXPendingRecord
|
||
nodeRole string
|
||
nodeID string
|
||
writeMu sync.Mutex
|
||
closeOnce sync.Once
|
||
closeErr error
|
||
}
|
||
|
||
type udpTXPendingRecord struct {
|
||
msg protocol.Message
|
||
sendCallIndex int
|
||
bytesWritten int
|
||
expectedTXID uint32
|
||
observedTimestamps map[string]int64
|
||
}
|
||
|
||
// UDPOption 用于为 UDPConn 注入可选行为。
|
||
type UDPOption func(*UDPConn)
|
||
|
||
// WithUDPLogger 为 UDP 连接注入业务消息日志上下文。
|
||
func WithUDPLogger(logger latencylog.Logger, nodeRole, nodeID string) UDPOption {
|
||
return func(conn *UDPConn) {
|
||
conn.logger = logger
|
||
conn.nodeRole = nodeRole
|
||
conn.nodeID = nodeID
|
||
}
|
||
}
|
||
|
||
// WithUDPTXTimestampDebugLogger 为 UDP 连接注入 TX errqueue 调试日志器。
|
||
func WithUDPTXTimestampDebugLogger(logger TXTimestampDebugLogger) UDPOption {
|
||
return func(conn *UDPConn) {
|
||
conn.txTimestampDebugLogger = logger
|
||
}
|
||
}
|
||
|
||
// NewUDPConn 创建 UDP transport 连接封装。
|
||
func NewUDPConn(conn *net.UDPConn, peerAddr *net.UDPAddr, opts ...UDPOption) (*UDPConn, error) {
|
||
udpConn := &UDPConn{
|
||
conn: conn,
|
||
peerAddr: peerAddr,
|
||
logger: latencylog.NoopLogger{},
|
||
pendingTX: make(map[uint32]udpTXPendingRecord),
|
||
}
|
||
|
||
for _, opt := range opts {
|
||
opt(udpConn)
|
||
}
|
||
|
||
if udpConn.logger == nil {
|
||
udpConn.logger = latencylog.NoopLogger{}
|
||
}
|
||
|
||
if err := udpConn.initUDPLinuxTimestamping(); err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
return udpConn, nil
|
||
}
|
||
|
||
// Send 将一条协议消息编码为 UDP 数据报并发送。
|
||
func (c *UDPConn) Send(msg protocol.Message) error {
|
||
c.writeMu.Lock()
|
||
defer c.writeMu.Unlock()
|
||
|
||
latencylog.LogMessageEvent(c.logger, c.nodeRole, c.nodeID, latencylog.EventSendHandoffBegin, msg)
|
||
if err := c.sendMessageLinux(msg); err != nil {
|
||
return fmt.Errorf("transport: udp send message: %w", err)
|
||
}
|
||
latencylog.LogMessageEvent(c.logger, c.nodeRole, c.nodeID, latencylog.EventSendHandoffEnd, msg)
|
||
|
||
return nil
|
||
}
|
||
|
||
// SendTo 将一条协议消息编码为 UDP 数据报并发送到指定地址。
|
||
func (c *UDPConn) SendTo(msg protocol.Message, addr *net.UDPAddr) error {
|
||
c.writeMu.Lock()
|
||
defer c.writeMu.Unlock()
|
||
|
||
latencylog.LogMessageEvent(c.logger, c.nodeRole, c.nodeID, latencylog.EventSendHandoffBegin, msg)
|
||
if err := c.sendMessageToLinux(msg, addr); err != nil {
|
||
return fmt.Errorf("transport: udp send message to %s: %w", addr, err)
|
||
}
|
||
latencylog.LogMessageEvent(c.logger, c.nodeRole, c.nodeID, latencylog.EventSendHandoffEnd, msg)
|
||
|
||
return nil
|
||
}
|
||
|
||
// Receive 从 UDP 连接读取一条完整协议消息。
|
||
func (c *UDPConn) Receive() (protocol.Message, *net.UDPAddr, error) {
|
||
msg, addr, err := c.receiveMessageLinux()
|
||
if err != nil {
|
||
return protocol.Message{}, nil, fmt.Errorf("transport: udp receive message: %w", err)
|
||
}
|
||
|
||
return msg, addr, nil
|
||
}
|
||
|
||
// ReceiveLoop 持续从 UDP 连接读取消息并交给 handler 处理。
|
||
func (c *UDPConn) ReceiveLoop(handler func(protocol.Message, *net.UDPAddr) error) error {
|
||
for {
|
||
msg, addr, err := c.Receive()
|
||
if err != nil {
|
||
return fmt.Errorf("transport: udp receive loop read: %w", err)
|
||
}
|
||
|
||
if err := handler(msg, addr); err != nil {
|
||
return fmt.Errorf("transport: udp receive loop handler: %w", err)
|
||
}
|
||
}
|
||
}
|
||
|
||
// Close 关闭底层 UDP 连接,保证重复调用安全。
|
||
func (c *UDPConn) Close() error {
|
||
c.closeOnce.Do(func() {
|
||
c.closeErr = c.conn.Close()
|
||
})
|
||
|
||
return c.closeErr
|
||
}
|