100 lines
3.1 KiB
Go
100 lines
3.1 KiB
Go
package peer
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"os"
|
|
"path/filepath"
|
|
|
|
"omnisocketgo/cmd/internal/latencylog"
|
|
"omnisocketgo/cmd/internal/protocol"
|
|
)
|
|
|
|
const textInboxFileName = "messages.log"
|
|
|
|
type textInboxRecord struct {
|
|
MessageType protocol.MessageType `json:"message_type"`
|
|
MessageID uint64 `json:"message_id"`
|
|
From string `json:"from"`
|
|
To string `json:"to"`
|
|
Body string `json:"body"`
|
|
}
|
|
|
|
// PersistMessage 将收到的业务消息写入本地磁盘,并记录处理完成节点。
|
|
func (c *Client) PersistMessage(msg protocol.Message, inboxDir string) (string, error) {
|
|
if !latencylog.IsBusinessMessage(msg) {
|
|
return "", fmt.Errorf("peer: cannot persist message type %s", msg.Type)
|
|
}
|
|
if inboxDir == "" {
|
|
return "", fmt.Errorf("peer: inbox directory is required")
|
|
}
|
|
|
|
latencylog.LogMessageEvent(c.logger, latencylog.NodeRolePeer, c.id, latencylog.EventBPersistBegin, msg)
|
|
|
|
if err := os.MkdirAll(inboxDir, 0o755); err != nil {
|
|
return "", fmt.Errorf("peer: create inbox dir %s: %w", inboxDir, err)
|
|
}
|
|
|
|
path, err := persistMessageToDisk(msg, inboxDir)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
latencylog.LogMessageEvent(c.logger, latencylog.NodeRolePeer, c.id, latencylog.EventBPersistEnd, msg)
|
|
|
|
return path, nil
|
|
}
|
|
|
|
// persistMessageToDisk 根据消息类型将消息内容写入磁盘,文本消息追加到文本日志文件,文件消息写成独立文件。
|
|
func persistMessageToDisk(msg protocol.Message, inboxDir string) (string, error) {
|
|
switch msg.Type {
|
|
case protocol.MessageTypeText:
|
|
return persistTextMessage(msg, inboxDir)
|
|
case protocol.MessageTypeFile:
|
|
return persistFileMessage(msg, inboxDir)
|
|
default:
|
|
return "", fmt.Errorf("peer: cannot persist unsupported message type %s", msg.Type)
|
|
}
|
|
}
|
|
|
|
// registerPeer 验证 peer ID 的合法性和唯一性,并将其与连接关联起来。
|
|
func persistTextMessage(msg protocol.Message, inboxDir string) (string, error) {
|
|
record := textInboxRecord{
|
|
MessageType: msg.Type,
|
|
MessageID: msg.ID,
|
|
From: msg.From,
|
|
To: msg.To,
|
|
Body: string(msg.Body),
|
|
}
|
|
|
|
line, err := json.Marshal(record)
|
|
if err != nil {
|
|
return "", fmt.Errorf("peer: encode text inbox record: %w", err)
|
|
}
|
|
|
|
path := filepath.Join(inboxDir, textInboxFileName)
|
|
file, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0o644)
|
|
if err != nil {
|
|
return "", fmt.Errorf("peer: open text inbox %s: %w", path, err)
|
|
}
|
|
defer file.Close()
|
|
|
|
if _, err := file.Write(append(line, '\n')); err != nil {
|
|
return "", fmt.Errorf("peer: append text inbox %s: %w", path, err)
|
|
}
|
|
|
|
return path, nil
|
|
}
|
|
|
|
// persistFileMessage 将文件消息的内容写成独立文件,文件名包含发送方、消息 ID 和原始文件名,保证唯一性和可读性。
|
|
func persistFileMessage(msg protocol.Message, inboxDir string) (string, error) {
|
|
fileName := filepath.Base(msg.FileName)
|
|
path := filepath.Join(inboxDir, fmt.Sprintf("%s-%d-%s", msg.From, msg.ID, fileName))
|
|
|
|
if err := os.WriteFile(path, msg.Body, 0o644); err != nil {
|
|
return "", fmt.Errorf("peer: write received file %s: %w", path, err)
|
|
}
|
|
|
|
return path, nil
|
|
}
|