214 lines
7.2 KiB
Go
214 lines
7.2 KiB
Go
package main
|
|
|
|
import (
|
|
"bufio"
|
|
"flag"
|
|
"fmt"
|
|
"io"
|
|
"log"
|
|
"os"
|
|
|
|
"omnisocketgo/cmd/internal/latencylog"
|
|
peerpkg "omnisocketgo/cmd/internal/peer"
|
|
"omnisocketgo/cmd/internal/protocol"
|
|
"omnisocketgo/cmd/internal/transport"
|
|
)
|
|
|
|
func main() {
|
|
peerID := flag.String("id", "peer-a", "peer identity")
|
|
serverAddr := flag.String("server", "127.0.0.1:9002", "logical KCP hub address; when -relay-via is set this may differ from the actual UDP dial target")
|
|
relayVia := flag.String("relay-via", "", "optional UDP relay address used as the actual KCP dial target")
|
|
targetPeer := flag.String("to", "", "optional target peer for one outgoing message")
|
|
text := flag.String("text", "", "optional text to send after connecting")
|
|
filePath := flag.String("file", "", "optional file path to send after connecting")
|
|
bindIP := flag.String("bind-ip", "", "optional local source IP used when dialing the server")
|
|
bindDevice := flag.String("bind-device", "", "optional Linux network device used when dialing the server")
|
|
inboxDir := flag.String("inbox-dir", "inbox", "directory used to persist received text and file messages")
|
|
logPath := flag.String("latency-log", "", "optional JSONL file path for latency timestamp logs")
|
|
kcpTimestampDebugLogPath := flag.String("kcp-ts-debug-log", "", "optional JSONL file path for KCP packet kernel timestamp debug records")
|
|
kcpSessionStatsLogPath := flag.String("kcp-session-stats-log", "", "optional JSONL file path for KCP session stats records")
|
|
kcpSessionStatsInterval := flag.String("kcp-session-stats-interval", transport.DefaultKCPSessionStatsInterval.String(), "sampling interval for KCP session stats, for example 100ms")
|
|
interactive := flag.Bool("interactive", true, "enable interactive REPL for repeated text/file sends on the same connection")
|
|
flag.Parse()
|
|
|
|
statsInterval, err := transport.ParseKCPSessionStatsInterval(*kcpSessionStatsInterval)
|
|
if err != nil {
|
|
log.Fatalf("parse -kcp-session-stats-interval=%q: %v", *kcpSessionStatsInterval, err)
|
|
}
|
|
|
|
clientOptions := make([]peerpkg.Option, 0, 6)
|
|
if *logPath != "" {
|
|
logger, err := latencylog.NewJSONLLogger(*logPath)
|
|
if err != nil {
|
|
log.Fatalf("create latency logger %s: %v", *logPath, err)
|
|
}
|
|
defer logger.Close()
|
|
clientOptions = append(clientOptions, peerpkg.WithLogger(logger))
|
|
}
|
|
if *kcpTimestampDebugLogPath != "" {
|
|
logger, err := transport.NewJSONLKCPPacketDebugLogger(*kcpTimestampDebugLogPath)
|
|
if err != nil {
|
|
log.Fatalf("create kcp packet debug logger %s: %v", *kcpTimestampDebugLogPath, err)
|
|
}
|
|
defer logger.Close()
|
|
clientOptions = append(clientOptions, peerpkg.WithKCPPacketDebugLogger(logger))
|
|
}
|
|
if *kcpSessionStatsLogPath != "" {
|
|
logger, err := transport.NewJSONLKCPSessionStatsLogger(*kcpSessionStatsLogPath)
|
|
if err != nil {
|
|
log.Fatalf("create kcp session stats logger %s: %v", *kcpSessionStatsLogPath, err)
|
|
}
|
|
defer logger.Close()
|
|
clientOptions = append(clientOptions, peerpkg.WithKCPSessionStatsLogger(logger, statsInterval))
|
|
}
|
|
if *bindIP != "" {
|
|
clientOptions = append(clientOptions, peerpkg.WithBindIP(*bindIP))
|
|
}
|
|
if *bindDevice != "" {
|
|
clientOptions = append(clientOptions, peerpkg.WithBindDevice(*bindDevice))
|
|
}
|
|
if *relayVia != "" {
|
|
clientOptions = append(clientOptions, peerpkg.WithKCPDialAddress(*relayVia))
|
|
}
|
|
|
|
client, err := peerpkg.DialKCP(*serverAddr, *peerID, clientOptions...)
|
|
if err != nil {
|
|
log.Fatalf("dial kcp server %s: %v", *serverAddr, err)
|
|
}
|
|
defer client.Close()
|
|
|
|
dialTarget := *serverAddr
|
|
if *relayVia != "" {
|
|
dialTarget = *relayVia
|
|
log.Printf("opened KCP session as %s; logical server=%s, actual dial target=%s via relay; register not yet confirmed", client.ID(), *serverAddr, dialTarget)
|
|
} else {
|
|
log.Printf("opened KCP session as %s; logical server=%s, actual dial target=%s; register not yet confirmed", client.ID(), *serverAddr, dialTarget)
|
|
}
|
|
|
|
receiveErr := make(chan error, 1)
|
|
go func() {
|
|
receiveErr <- client.ReceiveLoop(func(msg protocol.Message) error {
|
|
switch msg.Type {
|
|
case protocol.MessageTypeText:
|
|
path, err := client.PersistMessage(msg, *inboxDir)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
log.Printf("received text from %s to %s and persisted to %s", msg.From, msg.To, path)
|
|
case protocol.MessageTypeFile:
|
|
path, err := client.PersistMessage(msg, *inboxDir)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
log.Printf("received file from %s to %s: %s (%d bytes) -> %s", msg.From, msg.To, msg.FileName, len(msg.Body), path)
|
|
case protocol.MessageTypeError:
|
|
log.Printf("received %s from %s to %s: %s", msg.Type, msg.From, msg.To, string(msg.Body))
|
|
default:
|
|
log.Printf("received unexpected message type %s from %s", msg.Type, msg.From)
|
|
}
|
|
return nil
|
|
})
|
|
}()
|
|
|
|
if *text != "" && *filePath != "" {
|
|
log.Fatal("only one of -text or -file may be specified")
|
|
}
|
|
if (*text != "" || *filePath != "") && *targetPeer == "" {
|
|
log.Fatal("flag -to is required when sending text or file")
|
|
}
|
|
|
|
if *targetPeer != "" && *text != "" {
|
|
if err := client.SendText(*targetPeer, *text); err != nil {
|
|
log.Fatalf("send text to %s: %v", *targetPeer, err)
|
|
}
|
|
log.Printf("sent text to %s", *targetPeer)
|
|
}
|
|
if *targetPeer != "" && *filePath != "" {
|
|
if err := client.SendFilePath(*targetPeer, *filePath); err != nil {
|
|
log.Fatalf("send file %s to %s: %v", *filePath, *targetPeer, err)
|
|
}
|
|
log.Printf("sent file %s to %s", *filePath, *targetPeer)
|
|
}
|
|
|
|
if *interactive {
|
|
if err := runKCPInteractiveShell(client, os.Stdin, os.Stdout, receiveErr); err != nil {
|
|
log.Printf("interactive shell ended: %v", err)
|
|
}
|
|
return
|
|
}
|
|
|
|
if err := <-receiveErr; err != nil {
|
|
log.Printf("receive loop ended: %v", err)
|
|
}
|
|
}
|
|
|
|
func runKCPInteractiveShell(client *peerpkg.KCPClient, in io.Reader, out io.Writer, receiveErr <-chan error) error {
|
|
printKCPInteractiveHelp(out)
|
|
lines, inputErr := readKCPInteractiveLines(in, out, fmt.Sprintf("%s> ", client.ID()))
|
|
|
|
for {
|
|
select {
|
|
case err := <-receiveErr:
|
|
return err
|
|
case line, ok := <-lines:
|
|
if !ok {
|
|
return <-inputErr
|
|
}
|
|
|
|
command, err := parseKCPInteractiveCommand(line)
|
|
if err != nil {
|
|
if err == errKCPEmptyInteractiveCommand {
|
|
continue
|
|
}
|
|
log.Printf("interactive command error: %v", err)
|
|
continue
|
|
}
|
|
|
|
switch command.name {
|
|
case kcpInteractiveCommandHelp:
|
|
printKCPInteractiveHelp(out)
|
|
case kcpInteractiveCommandQuit:
|
|
return nil
|
|
case kcpInteractiveCommandText:
|
|
if err := client.SendText(command.to, command.value); err != nil {
|
|
log.Printf("send text to %s: %v", command.to, err)
|
|
continue
|
|
}
|
|
log.Printf("sent text to %s", command.to)
|
|
case kcpInteractiveCommandFile:
|
|
if err := client.SendFilePath(command.to, command.value); err != nil {
|
|
log.Printf("send file %s to %s: %v", command.value, command.to, err)
|
|
continue
|
|
}
|
|
log.Printf("sent file %s to %s", command.value, command.to)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func readKCPInteractiveLines(in io.Reader, out io.Writer, prompt string) (<-chan string, <-chan error) {
|
|
lines := make(chan string)
|
|
errs := make(chan error, 1)
|
|
|
|
go func() {
|
|
defer close(lines)
|
|
|
|
scanner := bufio.NewScanner(in)
|
|
scanner.Buffer(make([]byte, 0, 1024), 1024*1024)
|
|
|
|
for {
|
|
if _, err := fmt.Fprint(out, prompt); err != nil {
|
|
errs <- err
|
|
return
|
|
}
|
|
if !scanner.Scan() {
|
|
errs <- scanner.Err()
|
|
return
|
|
}
|
|
lines <- scanner.Text()
|
|
}
|
|
}()
|
|
|
|
return lines, errs
|
|
}
|