161 lines
5.0 KiB
Go
161 lines
5.0 KiB
Go
package transport
|
||
|
||
import (
|
||
"errors"
|
||
"fmt"
|
||
"io"
|
||
"net"
|
||
"sync"
|
||
"syscall"
|
||
"time"
|
||
|
||
"omnisocketgo/cmd/internal/latencylog"
|
||
"omnisocketgo/cmd/internal/protocol"
|
||
)
|
||
|
||
// TCPConn 是对单条活跃 TCP 连接的轻量封装。
|
||
// 它负责把协议层的单条消息读写,提升为可复用的收发接口。
|
||
type TCPConn struct {
|
||
conn net.Conn
|
||
raw syscall.RawConn // 连接对应的底层 syscall 句柄,用于 Linux socket timestamping 收发。
|
||
|
||
logger latencylog.Logger
|
||
txTimestampDebugLogger TXTimestampDebugLogger
|
||
nodeRole string // 日志中记录的节点角色,例如 "server" 或 "peer"
|
||
nodeID string // 日志中记录的节点 ID,例如 peer 的 ID 或 server 的 "hub"
|
||
writeMu sync.Mutex // 保护 Send 方法的互斥锁,确保同一时刻只有一条完整协议消息被写入连接,防止多条消息字节交叉
|
||
txWriteSeq uint32 // Linux TX timestamp OPT_ID_TCP 的本地镜像,按成功写出的字节推进。
|
||
closeOnce sync.Once // 保护 Close 方法的 sync.Once,确保连接只被关闭一次
|
||
closeErr error // 连接关闭时的错误,如果连接成功关闭则为 nil,重复调用 Close 时会返回同样的错误
|
||
}
|
||
|
||
// Option 用于为 TCPConn 注入可选行为,例如时延日志。
|
||
type Option func(*TCPConn)
|
||
|
||
// WithLogger 为连接发送路径注入业务消息日志上下文。
|
||
func WithLogger(logger latencylog.Logger, nodeRole, nodeID string) Option {
|
||
return func(conn *TCPConn) {
|
||
conn.logger = logger
|
||
conn.nodeRole = nodeRole
|
||
conn.nodeID = nodeID
|
||
}
|
||
}
|
||
|
||
// WithTXTimestampDebugLogger 为连接注入可选的 TX errqueue 调试日志器。
|
||
func WithTXTimestampDebugLogger(logger TXTimestampDebugLogger) Option {
|
||
return func(conn *TCPConn) {
|
||
conn.txTimestampDebugLogger = logger
|
||
}
|
||
}
|
||
|
||
// NewTCPConn 用已有的 net.Conn 创建 transport 连接封装。
|
||
func NewTCPConn(conn net.Conn, opts ...Option) (*TCPConn, error) {
|
||
tcpConn := &TCPConn{
|
||
conn: conn,
|
||
logger: latencylog.NoopLogger{},
|
||
}
|
||
|
||
for _, opt := range opts {
|
||
opt(tcpConn)
|
||
}
|
||
|
||
if tcpConn.logger == nil {
|
||
tcpConn.logger = latencylog.NoopLogger{}
|
||
}
|
||
|
||
if err := tcpConn.initLinuxTimestamping(); err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
return tcpConn, nil
|
||
}
|
||
|
||
// Send 将一条协议消息完整写入底层连接。
|
||
// 多个 goroutine 可以并发调用,内部会串行化写入。
|
||
func (c *TCPConn) Send(msg protocol.Message) error {
|
||
c.writeMu.Lock() //“同一时刻只能有一条完整协议消息往连接里写,防止多条消息字节交叉
|
||
defer c.writeMu.Unlock()
|
||
latencylog.LogMessageEvent(c.logger, c.nodeRole, c.nodeID, latencylog.EventSendHandoffBegin, msg)
|
||
|
||
if err := c.sendMessageLinux(msg); err != nil {
|
||
return fmt.Errorf("transport: send message: %w", err)
|
||
}
|
||
//记录发送完成的时延日志事件,事件类型为 EventSendHandoffEnd,包含消息的基本信息(类型、ID、来源、目标)。
|
||
latencylog.LogMessageEvent(c.logger, c.nodeRole, c.nodeID, latencylog.EventSendHandoffEnd, msg)
|
||
|
||
return nil
|
||
}
|
||
|
||
// Receive 从底层连接读取一条完整协议消息。
|
||
// 同一条连接应只由单个 reader 持续调用该方法。
|
||
func (c *TCPConn) Receive() (protocol.Message, error) {
|
||
msg, err := c.receiveMessageLinux()
|
||
if err != nil {
|
||
return protocol.Message{}, fmt.Errorf("transport: receive message: %w", err)
|
||
}
|
||
|
||
return msg, nil
|
||
}
|
||
|
||
// ReceiveLoop 持续读取消息并交给 handler 处理。
|
||
// 读取错误、handler 错误或连接关闭都会结束循环,并关闭连接。
|
||
func (c *TCPConn) ReceiveLoop(handler func(protocol.Message) error) error {
|
||
for {
|
||
msg, err := c.Receive()
|
||
if err != nil {
|
||
_ = c.Close()
|
||
return fmt.Errorf("transport: receive loop read: %w", err)
|
||
}
|
||
|
||
if err := handler(msg); err != nil {
|
||
_ = c.Close()
|
||
return fmt.Errorf("transport: receive loop handler: %w", err)
|
||
}
|
||
}
|
||
}
|
||
|
||
// CloseGracefully 在支持 half-close 的连接上先关闭写方向,给对端留出读取最终响应的机会,
|
||
// 然后在短暂等待后再彻底关闭连接。
|
||
func (c *TCPConn) CloseGracefully(drainTimeout time.Duration) error {
|
||
if closeWriter, ok := c.conn.(interface{ CloseWrite() error }); ok {
|
||
if err := closeWriter.CloseWrite(); err != nil && !errors.Is(err, net.ErrClosed) {
|
||
return c.Close()
|
||
}
|
||
|
||
if drainTimeout > 0 {
|
||
_ = c.conn.SetReadDeadline(time.Now().Add(drainTimeout))
|
||
defer func() {
|
||
_ = c.conn.SetReadDeadline(time.Time{})
|
||
}()
|
||
|
||
var buf [256]byte
|
||
for {
|
||
_, err := c.conn.Read(buf[:])
|
||
switch {
|
||
case err == nil:
|
||
continue
|
||
case errors.Is(err, io.EOF), errors.Is(err, net.ErrClosed):
|
||
return c.Close()
|
||
default:
|
||
var netErr net.Error
|
||
if errors.As(err, &netErr) && netErr.Timeout() {
|
||
return c.Close()
|
||
}
|
||
return c.Close()
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
return c.Close()
|
||
}
|
||
|
||
// Close 关闭底层连接,并保证重复调用是安全的。
|
||
func (c *TCPConn) Close() error {
|
||
c.closeOnce.Do(func() {
|
||
c.closeErr = c.conn.Close()
|
||
})
|
||
|
||
return c.closeErr
|
||
}
|