Files
OmniSocketGo/cmd/internal/peer/client.go

274 lines
7.5 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package peer
import (
"fmt"
"net"
"os"
"path/filepath"
"sync/atomic"
"syscall"
"time"
"omnisocketgo/cmd/internal/latencylog"
"omnisocketgo/cmd/internal/protocol"
"omnisocketgo/cmd/internal/transport"
)
var dialServer = dialServerWithOptions
type clientOptions struct {
logger latencylog.Logger
txTimestampDebugLogger transport.TXTimestampDebugLogger
kcpPacketDebugLogger transport.KCPPacketDebugLogger
kcpSessionStatsLogger transport.KCPSessionStatsLogger
kcpSessionStatsInterval time.Duration
udpLinuxTimestamping bool
bindIP string
bindDevice string
}
// Option 用于配置 Client 的可选行为,例如时延日志。
type Option func(*clientOptions)
// WithLogger 为 client 注入时延日志记录器。
func WithLogger(logger latencylog.Logger) Option {
return func(options *clientOptions) {
options.logger = logger
}
}
// WithTXTimestampDebugLogger 为 client 注入 TX errqueue 调试日志器。
func WithTXTimestampDebugLogger(logger transport.TXTimestampDebugLogger) Option {
return func(options *clientOptions) {
options.txTimestampDebugLogger = logger
}
}
// WithKCPPacketDebugLogger 为 KCP UDP packet timestamp 调试日志注入记录器。
func WithKCPPacketDebugLogger(logger transport.KCPPacketDebugLogger) Option {
return func(options *clientOptions) {
options.kcpPacketDebugLogger = logger
}
}
// WithKCPSessionStatsLogger 为 KCP 会话统计日志注入记录器与采样间隔。
func WithKCPSessionStatsLogger(logger transport.KCPSessionStatsLogger, interval time.Duration) Option {
return func(options *clientOptions) {
options.kcpSessionStatsLogger = logger
options.kcpSessionStatsInterval = interval
}
}
// WithBindIP 指定拨号时使用的本地源 IP。
func WithBindIP(ip string) Option {
return func(options *clientOptions) {
options.bindIP = ip
}
}
// WithBindDevice 指定拨号时绑定的 Linux 网卡名,例如 eth0 或 wwan0。
func WithBindDevice(device string) Option {
return func(options *clientOptions) {
options.bindDevice = device
}
}
// WithUDPLinuxTimestamping controls whether UDP clients enable Linux timestamping.
func WithUDPLinuxTimestamping(enabled bool) Option {
return func(options *clientOptions) {
options.udpLinuxTimestamping = enabled
}
}
// Client 表示一个已经连接到 server 的 peer。
type Client struct {
id string
conn *transport.TCPConn
logger latencylog.Logger
nextID uint64
}
// Dial 连接到 server并立即发送 register 消息完成身份注册。
func Dial(serverAddr, peerID string, opts ...Option) (*Client, error) {
options := clientOptions{
logger: latencylog.NoopLogger{},
udpLinuxTimestamping: true,
}
for _, opt := range opts {
opt(&options)
}
if options.logger == nil {
options.logger = latencylog.NoopLogger{}
}
rawConn, err := dialServer(serverAddr, options)
if err != nil {
return nil, fmt.Errorf("peer: dial server: %w", err)
}
conn, err := transport.NewTCPConn(
rawConn,
transport.WithLogger(options.logger, latencylog.NodeRolePeer, peerID),
transport.WithTXTimestampDebugLogger(options.txTimestampDebugLogger),
)
if err != nil {
_ = rawConn.Close()
return nil, fmt.Errorf("peer: create transport conn: %w", err)
}
client := &Client{
id: peerID,
conn: conn,
logger: options.logger,
}
if err := conn.Send(protocol.Message{ //向 server 发送一条 register 消息,完成身份注册。
Type: protocol.MessageTypeRegister,
From: peerID,
To: protocol.ServerPeerID,
}); err != nil {
_ = conn.Close()
return nil, fmt.Errorf("peer: register with server: %w", err)
}
return client, nil
}
// ID 返回当前 client 的 peer 标识。
func (c *Client) ID() string {
return c.id
}
// SendText 向目标 peer 发送一条文本消息。
func (c *Client) SendText(to, body string) error {
msg := protocol.Message{
Type: protocol.MessageTypeText,
ID: c.nextMessageID(),
From: c.id,
To: to,
}
// 记录 A 端应用开始准备消息的时间点。
latencylog.LogMessageEvent(c.logger, latencylog.NodeRolePeer, c.id, latencylog.EventAAppPrepBegin, msg)
msg.Body = []byte(body)
return c.conn.Send(msg)
}
// SendFile 向目标 peer 发送一条文件消息。
func (c *Client) SendFile(to, fileName string, body []byte) error {
msg := protocol.Message{
Type: protocol.MessageTypeFile,
ID: c.nextMessageID(),
From: c.id,
To: to,
FileName: fileName,
}
// 记录 A 端应用开始准备消息的时间点。
latencylog.LogMessageEvent(c.logger, latencylog.NodeRolePeer, c.id, latencylog.EventAAppPrepBegin, msg)
bodyCopy := make([]byte, len(body))
copy(bodyCopy, body)
msg.Body = bodyCopy
return c.conn.Send(msg)
}
// SendFilePath 从本地文件读取内容并发送给目标 peer。
func (c *Client) SendFilePath(to, path string) error {
msg := protocol.Message{
Type: protocol.MessageTypeFile,
ID: c.nextMessageID(),
From: c.id,
To: to,
FileName: filepath.Base(path),
}
// 记录 A 端应用开始准备消息的时间点。
latencylog.LogMessageEvent(c.logger, latencylog.NodeRolePeer, c.id, latencylog.EventAAppPrepBegin, msg)
body, err := os.ReadFile(path)
if err != nil {
return fmt.Errorf("peer: read file %s: %w", path, err)
}
msg.Body = body
return c.conn.Send(msg)
}
// Receive 读取一条来自 server 的消息。
func (c *Client) Receive() (protocol.Message, error) {
msg, err := c.conn.Receive() //从底层 TCP 连接读取一条消息,返回一个 protocol.Message 结构体。
if err != nil {
return protocol.Message{}, fmt.Errorf("peer: receive from server: %w", err)
}
latencylog.LogMessageEvent(c.logger, latencylog.NodeRolePeer, c.id, latencylog.EventBAppRecv, msg)
return msg, nil
}
// ReceiveLoop 持续接收 server 消息并交给 handler 处理。
func (c *Client) ReceiveLoop(handler func(protocol.Message) error) error {
return c.conn.ReceiveLoop(func(msg protocol.Message) error {
switch msg.Type {
case protocol.MessageTypeText, protocol.MessageTypeFile, protocol.MessageTypeError:
// 记录 B 端应用真正读到完整消息的时间点。
latencylog.LogMessageEvent(c.logger, latencylog.NodeRolePeer, c.id, latencylog.EventBAppRecv, msg)
return handler(msg)
default:
return fmt.Errorf("peer: unexpected message type from server: %s", msg.Type)
}
})
}
// Close 关闭与 server 的连接。
func (c *Client) Close() error {
return c.conn.Close()
}
func (c *Client) nextMessageID() uint64 {
return atomic.AddUint64(&c.nextID, 1)
}
// 根据提供的选项创建 TCP 连接,并进行必要的配置,例如绑定本地 IP 或网卡。成功建立连接后,返回一个封装了该连接的 TCPConn 实例。
func dialServerWithOptions(serverAddr string, options clientOptions) (net.Conn, error) {
dialer, err := buildDialer(options)
if err != nil {
return nil, err
}
return dialer.Dial("tcp", serverAddr)
}
func buildDialer(options clientOptions) (*net.Dialer, error) {
dialer := &net.Dialer{}
if options.bindIP != "" {
ip := net.ParseIP(options.bindIP)
if ip == nil {
return nil, fmt.Errorf("peer: invalid bind ip %q", options.bindIP)
}
dialer.LocalAddr = &net.TCPAddr{IP: ip}
}
if options.bindDevice != "" {
device := options.bindDevice
dialer.Control = func(_, _ string, rawConn syscall.RawConn) error {
var bindErr error
if err := rawConn.Control(func(fd uintptr) {
bindErr = syscall.BindToDevice(int(fd), device)
}); err != nil {
return err
}
if bindErr != nil {
return fmt.Errorf("peer: bind device %s: %w", device, bindErr)
}
return nil
}
}
return dialer, nil
}