package transport import ( "errors" "fmt" "io" "net" "sync" "syscall" "time" "omnisocketgo/cmd/internal/latencylog" "omnisocketgo/cmd/internal/protocol" ) // TCPConn 是对单条活跃 TCP 连接的轻量封装。 // 它负责把协议层的单条消息读写,提升为可复用的收发接口。 type TCPConn struct { conn net.Conn raw syscall.RawConn // 连接对应的底层 syscall 句柄,用于 Linux socket timestamping 收发。 logger latencylog.Logger nodeRole string // 日志中记录的节点角色,例如 "server" 或 "peer" nodeID string // 日志中记录的节点 ID,例如 peer 的 ID 或 server 的 "hub" writeMu sync.Mutex // 保护 Send 方法的互斥锁,确保同一时刻只有一条完整协议消息被写入连接,防止多条消息字节交叉 closeOnce sync.Once // 保护 Close 方法的 sync.Once,确保连接只被关闭一次 closeErr error // 连接关闭时的错误,如果连接成功关闭则为 nil,重复调用 Close 时会返回同样的错误 } // Option 用于为 TCPConn 注入可选行为,例如时延日志。 type Option func(*TCPConn) // WithLogger 为连接发送路径注入业务消息日志上下文。 func WithLogger(logger latencylog.Logger, nodeRole, nodeID string) Option { return func(conn *TCPConn) { conn.logger = logger conn.nodeRole = nodeRole conn.nodeID = nodeID } } // NewTCPConn 用已有的 net.Conn 创建 transport 连接封装。 func NewTCPConn(conn net.Conn, opts ...Option) (*TCPConn, error) { tcpConn := &TCPConn{ conn: conn, logger: latencylog.NoopLogger{}, } for _, opt := range opts { opt(tcpConn) } if tcpConn.logger == nil { tcpConn.logger = latencylog.NoopLogger{} } if err := tcpConn.initLinuxTimestamping(); err != nil { return nil, err } return tcpConn, nil } // Send 将一条协议消息完整写入底层连接。 // 多个 goroutine 可以并发调用,内部会串行化写入。 func (c *TCPConn) Send(msg protocol.Message) error { c.writeMu.Lock() //“同一时刻只能有一条完整协议消息往连接里写,防止多条消息字节交叉 defer c.writeMu.Unlock() latencylog.LogMessageEvent(c.logger, c.nodeRole, c.nodeID, latencylog.EventSendHandoffBegin, msg) if err := c.sendMessageLinux(msg); err != nil { return fmt.Errorf("transport: send message: %w", err) } //记录发送完成的时延日志事件,事件类型为 EventSendHandoffEnd,包含消息的基本信息(类型、ID、来源、目标)。 latencylog.LogMessageEvent(c.logger, c.nodeRole, c.nodeID, latencylog.EventSendHandoffEnd, msg) return nil } // Receive 从底层连接读取一条完整协议消息。 // 同一条连接应只由单个 reader 持续调用该方法。 func (c *TCPConn) Receive() (protocol.Message, error) { msg, err := c.receiveMessageLinux() if err != nil { return protocol.Message{}, fmt.Errorf("transport: receive message: %w", err) } return msg, nil } // ReceiveLoop 持续读取消息并交给 handler 处理。 // 读取错误、handler 错误或连接关闭都会结束循环,并关闭连接。 func (c *TCPConn) ReceiveLoop(handler func(protocol.Message) error) error { for { msg, err := c.Receive() if err != nil { _ = c.Close() return fmt.Errorf("transport: receive loop read: %w", err) } if err := handler(msg); err != nil { _ = c.Close() return fmt.Errorf("transport: receive loop handler: %w", err) } } } // CloseGracefully 在支持 half-close 的连接上先关闭写方向,给对端留出读取最终响应的机会, // 然后在短暂等待后再彻底关闭连接。 func (c *TCPConn) CloseGracefully(drainTimeout time.Duration) error { if closeWriter, ok := c.conn.(interface{ CloseWrite() error }); ok { if err := closeWriter.CloseWrite(); err != nil && !errors.Is(err, net.ErrClosed) { return c.Close() } if drainTimeout > 0 { _ = c.conn.SetReadDeadline(time.Now().Add(drainTimeout)) defer func() { _ = c.conn.SetReadDeadline(time.Time{}) }() var buf [256]byte for { _, err := c.conn.Read(buf[:]) switch { case err == nil: continue case errors.Is(err, io.EOF), errors.Is(err, net.ErrClosed): return c.Close() default: var netErr net.Error if errors.As(err, &netErr) && netErr.Timeout() { return c.Close() } return c.Close() } } } } return c.Close() } // Close 关闭底层连接,并保证重复调用是安全的。 func (c *TCPConn) Close() error { c.closeOnce.Do(func() { c.closeErr = c.conn.Close() }) return c.closeErr }