429 lines
14 KiB
C
429 lines
14 KiB
C
#include <errno.h>
|
|
#include <pthread.h>
|
|
#include <signal.h>
|
|
#include <stdio.h>
|
|
#include <stdlib.h>
|
|
#include <stdint.h>
|
|
#include <string.h>
|
|
#include <sys/socket.h>
|
|
#include <sys/un.h>
|
|
#include <unistd.h>
|
|
|
|
#include "control_protocol.h"
|
|
#include "video_pipeline.h"
|
|
|
|
#define CONTROL_DEFAULT_PEER_ID "peer-b-ctrl"
|
|
#define CONTROL_DEFAULT_EXPECTED_SENDER "peer-a-ctrl"
|
|
#define CONTROL_DEFAULT_UNIX_SOCKET "/tmp/omnisocket-b-side-cmd.sock"
|
|
|
|
typedef struct unix_dgram_client {
|
|
int fd;
|
|
char bind_path[108];
|
|
struct sockaddr_un dest_addr;
|
|
socklen_t dest_len;
|
|
} unix_dgram_client_t;
|
|
|
|
typedef struct control_bridge_stats {
|
|
pthread_mutex_t mutex;
|
|
uint64_t packets_forwarded;
|
|
uint64_t invalid_packets;
|
|
uint64_t unix_send_errors;
|
|
int connected;
|
|
char last_error[256];
|
|
kcp_runtime_stats_t transport;
|
|
} control_bridge_stats_t;
|
|
|
|
typedef struct daemon_state {
|
|
volatile sig_atomic_t *stop_requested;
|
|
video_pipeline_config_t video_config;
|
|
video_pipeline_stats_t video_stats;
|
|
const char *control_server_addr;
|
|
const char *control_relay_via;
|
|
const char *control_bind_ip;
|
|
const char *control_bind_device;
|
|
const char *control_peer_id;
|
|
const char *control_expected_sender;
|
|
const char *control_unix_socket;
|
|
unix_dgram_client_t unix_client;
|
|
control_bridge_stats_t control_stats;
|
|
} daemon_state_t;
|
|
|
|
static volatile sig_atomic_t g_stop_requested = 0;
|
|
|
|
static void handle_signal(int signum) {
|
|
(void) signum;
|
|
g_stop_requested = 1;
|
|
}
|
|
|
|
static int install_signal_handler(int signum) {
|
|
struct sigaction action;
|
|
|
|
memset(&action, 0, sizeof(action));
|
|
action.sa_handler = handle_signal;
|
|
action.sa_flags = SA_RESTART;
|
|
if (sigemptyset(&action.sa_mask) != 0) {
|
|
return -1;
|
|
}
|
|
return sigaction(signum, &action, NULL);
|
|
}
|
|
|
|
static const char *env_or_default(const char *name, const char *fallback) {
|
|
const char *value = getenv(name);
|
|
if (value != NULL && value[0] != '\0') {
|
|
return value;
|
|
}
|
|
return fallback;
|
|
}
|
|
|
|
static const char *env_first_nonempty(const char *first, const char *second, const char *fallback) {
|
|
const char *value = getenv(first);
|
|
if (value != NULL && value[0] != '\0') {
|
|
return value;
|
|
}
|
|
value = getenv(second);
|
|
if (value != NULL && value[0] != '\0') {
|
|
return value;
|
|
}
|
|
return fallback;
|
|
}
|
|
|
|
static int control_bridge_stats_init(control_bridge_stats_t *stats) {
|
|
int rc;
|
|
if (stats == NULL) {
|
|
errno = EINVAL;
|
|
return -1;
|
|
}
|
|
memset(stats, 0, sizeof(*stats));
|
|
rc = pthread_mutex_init(&stats->mutex, NULL);
|
|
if (rc != 0) {
|
|
errno = rc;
|
|
return -1;
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
static void control_bridge_stats_destroy(control_bridge_stats_t *stats) {
|
|
if (stats == NULL) {
|
|
return;
|
|
}
|
|
pthread_mutex_destroy(&stats->mutex);
|
|
}
|
|
|
|
static void control_bridge_set_error(control_bridge_stats_t *stats, const char *message) {
|
|
if (stats == NULL) {
|
|
return;
|
|
}
|
|
pthread_mutex_lock(&stats->mutex);
|
|
snprintf(stats->last_error, sizeof(stats->last_error), "%s", message == NULL ? "" : message);
|
|
pthread_mutex_unlock(&stats->mutex);
|
|
}
|
|
|
|
static void control_bridge_set_errno_error(control_bridge_stats_t *stats, const char *prefix) {
|
|
char buffer[256];
|
|
int saved_errno = errno;
|
|
|
|
snprintf(
|
|
buffer,
|
|
sizeof(buffer),
|
|
"%s: %s (errno=%d)",
|
|
prefix == NULL ? "control bridge error" : prefix,
|
|
saved_errno != 0 ? strerror(saved_errno) : "unknown error",
|
|
saved_errno
|
|
);
|
|
control_bridge_set_error(stats, buffer);
|
|
}
|
|
|
|
static void control_bridge_stats_snapshot(control_bridge_stats_t *stats, control_bridge_stats_t *out_stats) {
|
|
if (stats == NULL || out_stats == NULL) {
|
|
return;
|
|
}
|
|
memset(out_stats, 0, sizeof(*out_stats));
|
|
pthread_mutex_lock(&stats->mutex);
|
|
out_stats->packets_forwarded = stats->packets_forwarded;
|
|
out_stats->invalid_packets = stats->invalid_packets;
|
|
out_stats->unix_send_errors = stats->unix_send_errors;
|
|
out_stats->connected = stats->connected;
|
|
snprintf(out_stats->last_error, sizeof(out_stats->last_error), "%s", stats->last_error);
|
|
out_stats->transport = stats->transport;
|
|
pthread_mutex_unlock(&stats->mutex);
|
|
}
|
|
|
|
static int unix_dgram_client_init(unix_dgram_client_t *client, const char *dest_path) {
|
|
struct sockaddr_un bind_addr;
|
|
pid_t pid;
|
|
|
|
if (client == NULL || dest_path == NULL || dest_path[0] == '\0') {
|
|
errno = EINVAL;
|
|
return -1;
|
|
}
|
|
|
|
memset(client, 0, sizeof(*client));
|
|
client->fd = socket(AF_UNIX, SOCK_DGRAM, 0);
|
|
if (client->fd < 0) {
|
|
return -1;
|
|
}
|
|
|
|
memset(&bind_addr, 0, sizeof(bind_addr));
|
|
bind_addr.sun_family = AF_UNIX;
|
|
pid = getpid();
|
|
snprintf(client->bind_path, sizeof(client->bind_path), "/tmp/omnisocket-b-side-cmd-client-%ld.sock", (long) pid);
|
|
unlink(client->bind_path);
|
|
snprintf(bind_addr.sun_path, sizeof(bind_addr.sun_path), "%s", client->bind_path);
|
|
if (bind(client->fd, (const struct sockaddr *) &bind_addr, sizeof(bind_addr)) != 0) {
|
|
close(client->fd);
|
|
unlink(client->bind_path);
|
|
client->fd = -1;
|
|
return -1;
|
|
}
|
|
|
|
memset(&client->dest_addr, 0, sizeof(client->dest_addr));
|
|
client->dest_addr.sun_family = AF_UNIX;
|
|
snprintf(client->dest_addr.sun_path, sizeof(client->dest_addr.sun_path), "%s", dest_path);
|
|
client->dest_len = (socklen_t) sizeof(client->dest_addr);
|
|
return 0;
|
|
}
|
|
|
|
static int unix_dgram_client_send(unix_dgram_client_t *client, const void *data, size_t len) {
|
|
ssize_t written;
|
|
if (client == NULL || client->fd < 0 || (data == NULL && len > 0)) {
|
|
errno = EINVAL;
|
|
return -1;
|
|
}
|
|
written = sendto(client->fd, data, len, 0, (const struct sockaddr *) &client->dest_addr, client->dest_len);
|
|
if (written < 0 || (size_t) written != len) {
|
|
if (written >= 0) {
|
|
errno = EIO;
|
|
}
|
|
return -1;
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
static void unix_dgram_client_close(unix_dgram_client_t *client) {
|
|
if (client == NULL) {
|
|
return;
|
|
}
|
|
if (client->fd >= 0) {
|
|
close(client->fd);
|
|
client->fd = -1;
|
|
}
|
|
if (client->bind_path[0] != '\0') {
|
|
unlink(client->bind_path);
|
|
client->bind_path[0] = '\0';
|
|
}
|
|
}
|
|
|
|
static void *video_thread_main(void *arg) {
|
|
daemon_state_t *state = (daemon_state_t *) arg;
|
|
|
|
while (!*state->stop_requested) {
|
|
if (video_pipeline_run(&state->video_config, &state->video_stats, state->stop_requested) == 0) {
|
|
break;
|
|
}
|
|
if (!*state->stop_requested) {
|
|
sleep(1);
|
|
}
|
|
}
|
|
return NULL;
|
|
}
|
|
|
|
static void *control_thread_main(void *arg) {
|
|
daemon_state_t *state = (daemon_state_t *) arg;
|
|
|
|
while (!*state->stop_requested) {
|
|
kcp_conn_options_t options;
|
|
kcp_client_t *client = NULL;
|
|
|
|
kcp_conn_options_set_control_defaults(&options);
|
|
client = kcp_client_dial_with_options(
|
|
state->control_server_addr,
|
|
state->control_relay_via,
|
|
state->control_peer_id,
|
|
state->control_bind_ip,
|
|
state->control_bind_device,
|
|
&options,
|
|
NULL,
|
|
NULL,
|
|
NULL,
|
|
KCP_DEFAULT_STATS_INTERVAL_MS
|
|
);
|
|
if (client == NULL) {
|
|
control_bridge_set_errno_error(&state->control_stats, "failed to connect control session");
|
|
sleep(1);
|
|
continue;
|
|
}
|
|
|
|
pthread_mutex_lock(&state->control_stats.mutex);
|
|
state->control_stats.connected = 1;
|
|
state->control_stats.last_error[0] = '\0';
|
|
pthread_mutex_unlock(&state->control_stats.mutex);
|
|
|
|
while (!*state->stop_requested) {
|
|
message_t msg;
|
|
int rc;
|
|
|
|
protocol_message_init(&msg);
|
|
rc = kcp_client_receive_timed(client, &msg, 100);
|
|
if (rc == 1) {
|
|
protocol_message_clear(&msg);
|
|
continue;
|
|
}
|
|
if (rc != 0) {
|
|
control_bridge_set_errno_error(&state->control_stats, "control receive loop stopped");
|
|
protocol_message_clear(&msg);
|
|
break;
|
|
}
|
|
|
|
if (state->control_expected_sender[0] != '\0' && strcmp(msg.from, state->control_expected_sender) != 0) {
|
|
pthread_mutex_lock(&state->control_stats.mutex);
|
|
state->control_stats.invalid_packets += 1;
|
|
pthread_mutex_unlock(&state->control_stats.mutex);
|
|
protocol_message_clear(&msg);
|
|
continue;
|
|
}
|
|
|
|
if (msg.type != MSG_TYPE_BINARY || msg.body_len != OMNI_CONTROL_PACKET_SIZE) {
|
|
pthread_mutex_lock(&state->control_stats.mutex);
|
|
state->control_stats.invalid_packets += 1;
|
|
pthread_mutex_unlock(&state->control_stats.mutex);
|
|
protocol_message_clear(&msg);
|
|
continue;
|
|
}
|
|
|
|
if (unix_dgram_client_send(&state->unix_client, msg.body, msg.body_len) != 0) {
|
|
pthread_mutex_lock(&state->control_stats.mutex);
|
|
state->control_stats.unix_send_errors += 1;
|
|
pthread_mutex_unlock(&state->control_stats.mutex);
|
|
control_bridge_set_errno_error(&state->control_stats, "failed to forward command to unix socket");
|
|
protocol_message_clear(&msg);
|
|
continue;
|
|
}
|
|
|
|
pthread_mutex_lock(&state->control_stats.mutex);
|
|
state->control_stats.packets_forwarded += 1;
|
|
kcp_client_runtime_stats_snapshot(client, &state->control_stats.transport);
|
|
pthread_mutex_unlock(&state->control_stats.mutex);
|
|
protocol_message_clear(&msg);
|
|
}
|
|
|
|
pthread_mutex_lock(&state->control_stats.mutex);
|
|
state->control_stats.connected = 0;
|
|
pthread_mutex_unlock(&state->control_stats.mutex);
|
|
kcp_client_close(client);
|
|
kcp_client_free(client);
|
|
if (!*state->stop_requested) {
|
|
sleep(1);
|
|
}
|
|
}
|
|
|
|
return NULL;
|
|
}
|
|
|
|
static void print_stats(daemon_state_t *state) {
|
|
video_pipeline_stats_t video_stats;
|
|
control_bridge_stats_t control_stats;
|
|
|
|
memset(&video_stats, 0, sizeof(video_stats));
|
|
memset(&control_stats, 0, sizeof(control_stats));
|
|
video_pipeline_stats_snapshot(&state->video_stats, &video_stats);
|
|
control_bridge_stats_snapshot(&state->control_stats, &control_stats);
|
|
|
|
fprintf(
|
|
stderr,
|
|
"[b_side_omnid] video connected=%d frames=%llu bytes=%llu srtt=%dms | control connected=%d forwarded=%llu invalid=%llu unix_err=%llu srtt=%dms\n",
|
|
video_stats.connected,
|
|
(unsigned long long) video_stats.frames_sent,
|
|
(unsigned long long) video_stats.bytes_sent,
|
|
video_stats.transport.srtt_ms,
|
|
control_stats.connected,
|
|
(unsigned long long) control_stats.packets_forwarded,
|
|
(unsigned long long) control_stats.invalid_packets,
|
|
(unsigned long long) control_stats.unix_send_errors,
|
|
control_stats.transport.srtt_ms
|
|
);
|
|
}
|
|
|
|
int main(void) {
|
|
daemon_state_t state;
|
|
pthread_t video_thread;
|
|
pthread_t control_thread;
|
|
|
|
memset(&state, 0, sizeof(state));
|
|
state.stop_requested = &g_stop_requested;
|
|
|
|
video_pipeline_config_init(&state.video_config);
|
|
video_pipeline_config_load_env(&state.video_config);
|
|
state.control_server_addr = env_first_nonempty("OMNI_CONTROL_SERVER_ADDR", "OMNISOCKET_SERVER_ADDR", "");
|
|
state.control_relay_via = env_first_nonempty("OMNI_CONTROL_RELAY_VIA", "OMNISOCKET_RELAY_VIA", "");
|
|
state.control_bind_ip = env_first_nonempty("OMNI_CONTROL_BIND_IP", "OMNISOCKET_BIND_IP", "");
|
|
state.control_bind_device = env_first_nonempty("OMNI_CONTROL_BIND_DEVICE", "OMNISOCKET_BIND_DEVICE", "");
|
|
state.control_peer_id = env_or_default("OMNI_CONTROL_PEER_ID", CONTROL_DEFAULT_PEER_ID);
|
|
state.control_expected_sender = env_or_default("OMNI_CONTROL_EXPECTED_SENDER", CONTROL_DEFAULT_EXPECTED_SENDER);
|
|
state.control_unix_socket = env_or_default("OMNI_CONTROL_UNIX_SOCKET_PATH", CONTROL_DEFAULT_UNIX_SOCKET);
|
|
|
|
if (state.video_config.server_addr == NULL || state.video_config.server_addr[0] == '\0' ||
|
|
state.control_server_addr == NULL || state.control_server_addr[0] == '\0') {
|
|
fprintf(stderr, "OMNISOCKET_SERVER_ADDR (or session-specific overrides) is required\n");
|
|
return 1;
|
|
}
|
|
|
|
if (video_pipeline_stats_init(&state.video_stats) != 0) {
|
|
perror("video_pipeline_stats_init");
|
|
return 1;
|
|
}
|
|
if (control_bridge_stats_init(&state.control_stats) != 0) {
|
|
perror("control_bridge_stats_init");
|
|
video_pipeline_stats_destroy(&state.video_stats);
|
|
return 1;
|
|
}
|
|
if (unix_dgram_client_init(&state.unix_client, state.control_unix_socket) != 0) {
|
|
perror("unix_dgram_client_init");
|
|
control_bridge_stats_destroy(&state.control_stats);
|
|
video_pipeline_stats_destroy(&state.video_stats);
|
|
return 1;
|
|
}
|
|
|
|
fprintf(
|
|
stderr,
|
|
"[b_side_omnid] control forwarding target is unix_dgram://%s\n",
|
|
state.control_unix_socket
|
|
);
|
|
|
|
if (install_signal_handler(SIGINT) != 0 || install_signal_handler(SIGTERM) != 0) {
|
|
perror("install_signal_handler");
|
|
unix_dgram_client_close(&state.unix_client);
|
|
control_bridge_stats_destroy(&state.control_stats);
|
|
video_pipeline_stats_destroy(&state.video_stats);
|
|
return 1;
|
|
}
|
|
|
|
if (pthread_create(&video_thread, NULL, video_thread_main, &state) != 0) {
|
|
perror("pthread_create(video_thread)");
|
|
unix_dgram_client_close(&state.unix_client);
|
|
control_bridge_stats_destroy(&state.control_stats);
|
|
video_pipeline_stats_destroy(&state.video_stats);
|
|
return 1;
|
|
}
|
|
if (pthread_create(&control_thread, NULL, control_thread_main, &state) != 0) {
|
|
perror("pthread_create(control_thread)");
|
|
g_stop_requested = 1;
|
|
pthread_join(video_thread, NULL);
|
|
unix_dgram_client_close(&state.unix_client);
|
|
control_bridge_stats_destroy(&state.control_stats);
|
|
video_pipeline_stats_destroy(&state.video_stats);
|
|
return 1;
|
|
}
|
|
|
|
while (!g_stop_requested) {
|
|
sleep(1);
|
|
print_stats(&state);
|
|
}
|
|
|
|
pthread_join(video_thread, NULL);
|
|
pthread_join(control_thread, NULL);
|
|
unix_dgram_client_close(&state.unix_client);
|
|
control_bridge_stats_destroy(&state.control_stats);
|
|
video_pipeline_stats_destroy(&state.video_stats);
|
|
return 0;
|
|
}
|