Files
OmniSocketGo/cmd/internal/peer/persist.go
nnbcccscdscdsc 4824675244 init
2026-03-23 20:18:53 +08:00

100 lines
3.1 KiB
Go

package peer
import (
"encoding/json"
"fmt"
"os"
"path/filepath"
"omnisocketgo/cmd/internal/latencylog"
"omnisocketgo/cmd/internal/protocol"
)
const textInboxFileName = "messages.log"
type textInboxRecord struct {
MessageType protocol.MessageType `json:"message_type"`
MessageID uint64 `json:"message_id"`
From string `json:"from"`
To string `json:"to"`
Body string `json:"body"`
}
// PersistMessage 将收到的业务消息写入本地磁盘,并记录处理完成节点。
func (c *Client) PersistMessage(msg protocol.Message, inboxDir string) (string, error) {
if !latencylog.IsBusinessMessage(msg) {
return "", fmt.Errorf("peer: cannot persist message type %s", msg.Type)
}
if inboxDir == "" {
return "", fmt.Errorf("peer: inbox directory is required")
}
latencylog.LogMessageEvent(c.logger, latencylog.NodeRolePeer, c.id, latencylog.EventBPersistBegin, msg)
if err := os.MkdirAll(inboxDir, 0o755); err != nil {
return "", fmt.Errorf("peer: create inbox dir %s: %w", inboxDir, err)
}
path, err := persistMessageToDisk(msg, inboxDir)
if err != nil {
return "", err
}
latencylog.LogMessageEvent(c.logger, latencylog.NodeRolePeer, c.id, latencylog.EventBPersistEnd, msg)
return path, nil
}
// persistMessageToDisk 根据消息类型将消息内容写入磁盘,文本消息追加到文本日志文件,文件消息写成独立文件。
func persistMessageToDisk(msg protocol.Message, inboxDir string) (string, error) {
switch msg.Type {
case protocol.MessageTypeText:
return persistTextMessage(msg, inboxDir)
case protocol.MessageTypeFile:
return persistFileMessage(msg, inboxDir)
default:
return "", fmt.Errorf("peer: cannot persist unsupported message type %s", msg.Type)
}
}
// registerPeer 验证 peer ID 的合法性和唯一性,并将其与连接关联起来。
func persistTextMessage(msg protocol.Message, inboxDir string) (string, error) {
record := textInboxRecord{
MessageType: msg.Type,
MessageID: msg.ID,
From: msg.From,
To: msg.To,
Body: string(msg.Body),
}
line, err := json.Marshal(record)
if err != nil {
return "", fmt.Errorf("peer: encode text inbox record: %w", err)
}
path := filepath.Join(inboxDir, textInboxFileName)
file, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0o644)
if err != nil {
return "", fmt.Errorf("peer: open text inbox %s: %w", path, err)
}
defer file.Close()
if _, err := file.Write(append(line, '\n')); err != nil {
return "", fmt.Errorf("peer: append text inbox %s: %w", path, err)
}
return path, nil
}
// persistFileMessage 将文件消息的内容写成独立文件,文件名包含发送方、消息 ID 和原始文件名,保证唯一性和可读性。
func persistFileMessage(msg protocol.Message, inboxDir string) (string, error) {
fileName := filepath.Base(msg.FileName)
path := filepath.Join(inboxDir, fmt.Sprintf("%s-%d-%s", msg.From, msg.ID, fileName))
if err := os.WriteFile(path, msg.Body, 0o644); err != nil {
return "", fmt.Errorf("peer: write received file %s: %w", path, err)
}
return path, nil
}