167 lines
4.8 KiB
Go
167 lines
4.8 KiB
Go
package latencylog
|
|
|
|
import (
|
|
"encoding/json"
|
|
"os"
|
|
"path/filepath"
|
|
"sync"
|
|
"time"
|
|
|
|
"omnisocketgo/cmd/internal/protocol"
|
|
)
|
|
|
|
const (
|
|
NodeRolePeer = "peer" //客户端节点
|
|
NodeRoleServer = "server" //云端转发节点
|
|
)
|
|
|
|
// 记录的消息事件的类型常量。
|
|
const (
|
|
EventAAppPrepBegin = "A_APP_PREP_BEGIN" // A 端应用开始准备这条消息
|
|
EventATXSched = "A_TX_SCHED" // A 端进入 Linux qdisc 之前
|
|
EventATXSoftware = "A_TX_SOFTWARE" // A 端即将交给网卡驱动
|
|
EventATXHardware = "A_TX_HARDWARE" // A 端网卡真正发出到物理介质
|
|
EventBRXHardware = "B_RX_HARDWARE" // B 端网卡真正从物理介质收到
|
|
EventBRXSoftware = "B_RX_SOFTWARE" // B 端驱动把数据交给 Linux 接收栈
|
|
EventBAppRecv = "B_APP_RECV" // B 端应用真正读到完整消息
|
|
EventBPersistBegin = "B_PERSIST_BEGIN" // B 端开始写盘
|
|
EventBPersistEnd = "B_PERSIST_END" // B 端写盘完成
|
|
|
|
EventSendHandoffBegin = "send_handoff_begin" // 调试事件:应用把消息交给传输层开始
|
|
EventSendHandoffEnd = "send_handoff_end" // 调试事件:应用把消息交给传输层结束
|
|
)
|
|
|
|
// Event 是一条时延时间戳日志记录。
|
|
type Event struct {
|
|
TsUnixNano int64 `json:"ts_unix_nano"`
|
|
NodeRole string `json:"node_role"`
|
|
NodeID string `json:"node_id"`
|
|
Event string `json:"event"`
|
|
MessageType protocol.MessageType `json:"message_type"`
|
|
MessageID uint64 `json:"message_id"`
|
|
From string `json:"from"`
|
|
To string `json:"to"`
|
|
FileName string `json:"file_name,omitempty"`
|
|
BodySize int `json:"body_size"`
|
|
}
|
|
|
|
// Logger 负责接收事件并将其写入外部介质。
|
|
type Logger interface {
|
|
LogEvent(Event) error
|
|
}
|
|
|
|
// NoopLogger 是默认的空实现。
|
|
type NoopLogger struct{}
|
|
|
|
// LogEvent 对空日志实现始终返回 nil。
|
|
func (NoopLogger) LogEvent(Event) error {
|
|
return nil
|
|
}
|
|
|
|
// JSONLLogger 以 JSONL 形式追加写日志文件。
|
|
type JSONLLogger struct {
|
|
mu sync.Mutex
|
|
closeOnce sync.Once
|
|
closeErr error
|
|
file *os.File
|
|
}
|
|
|
|
// NewJSONLLogger 创建一个线程安全的 JSONL 文件日志器。
|
|
func NewJSONLLogger(path string) (*JSONLLogger, error) {
|
|
dir := filepath.Dir(path)
|
|
if err := os.MkdirAll(dir, 0o755); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
file, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0o644)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &JSONLLogger{file: file}, nil
|
|
}
|
|
|
|
// LogEvent 以单行 JSON 的形式追加一条事件。
|
|
func (l *JSONLLogger) LogEvent(event Event) error {
|
|
line, err := json.Marshal(event)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
l.mu.Lock()
|
|
defer l.mu.Unlock()
|
|
|
|
if _, err := l.file.Write(append(line, '\n')); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Close 关闭底层文件;重复调用是安全的。
|
|
func (l *JSONLLogger) Close() error {
|
|
l.closeOnce.Do(func() {
|
|
l.closeErr = l.file.Close()
|
|
})
|
|
|
|
return l.closeErr
|
|
}
|
|
|
|
// IsBusinessMessage 判断消息是否属于要参与 A-C-B 时延分析的业务消息。
|
|
func IsBusinessMessage(msg protocol.Message) bool {
|
|
switch msg.Type {
|
|
case protocol.MessageTypeText, protocol.MessageTypeFile:
|
|
return true
|
|
default:
|
|
return false
|
|
}
|
|
}
|
|
|
|
// NewMessageEvent 用当前 UTC 时间为一条业务消息构造事件。
|
|
func NewMessageEvent(nodeRole, nodeID, eventName string, msg protocol.Message) Event {
|
|
return NewMessageEventAt(time.Now().UTC().UnixNano(), nodeRole, nodeID, eventName, msg)
|
|
}
|
|
|
|
// NewMessageEventAt 用指定的 UnixNano 时间为一条业务消息构造事件。
|
|
func NewMessageEventAt(tsUnixNano int64, nodeRole, nodeID, eventName string, msg protocol.Message) Event {
|
|
return Event{
|
|
TsUnixNano: tsUnixNano,
|
|
NodeRole: nodeRole,
|
|
NodeID: nodeID,
|
|
Event: eventName,
|
|
MessageType: msg.Type,
|
|
MessageID: msg.ID,
|
|
From: msg.From,
|
|
To: msg.To,
|
|
FileName: msg.FileName,
|
|
BodySize: len(msg.Body),
|
|
}
|
|
}
|
|
|
|
// LogBestEffort 写一条事件,失败时静默忽略,避免打断主收发流程。
|
|
func LogBestEffort(logger Logger, event Event) {
|
|
if logger == nil {
|
|
return
|
|
}
|
|
|
|
_ = logger.LogEvent(event)
|
|
}
|
|
|
|
// LogMessageEvent 为业务消息构造并写入一条事件。
|
|
func LogMessageEvent(logger Logger, nodeRole, nodeID, eventName string, msg protocol.Message) {
|
|
if !IsBusinessMessage(msg) {
|
|
return
|
|
}
|
|
|
|
LogBestEffort(logger, NewMessageEvent(nodeRole, nodeID, eventName, msg))
|
|
}
|
|
|
|
// LogMessageEventAt 为业务消息写入一条指定时间戳的事件。
|
|
func LogMessageEventAt(logger Logger, nodeRole, nodeID, eventName string, tsUnixNano int64, msg protocol.Message) {
|
|
if !IsBusinessMessage(msg) {
|
|
return
|
|
}
|
|
|
|
LogBestEffort(logger, NewMessageEventAt(tsUnixNano, nodeRole, nodeID, eventName, msg))
|
|
}
|