Compare commits

...

4 Commits

4 changed files with 185 additions and 38 deletions

View File

@@ -285,6 +285,7 @@ static int open_v4l2_device(const char *device) {
static int init_v4l2_device(int fd, const worker_config_t *cfg) {
struct v4l2_format fmt;
struct v4l2_streamparm parm;
CLEAR(fmt);
fmt.type = V4L2_BUF_TYPE_VIDEO_CAPTURE;
@@ -297,6 +298,18 @@ static int init_v4l2_device(int fd, const worker_config_t *cfg) {
perror("VIDIOC_S_FMT");
return -1;
}
CLEAR(parm);
parm.type = V4L2_BUF_TYPE_VIDEO_CAPTURE;
if (cfg->initial_fps > 0 && ioctl(fd, VIDIOC_G_PARM, &parm) == 0) {
if ((parm.parm.capture.capability & V4L2_CAP_TIMEPERFRAME) != 0U) {
parm.parm.capture.timeperframe.numerator = 1U;
parm.parm.capture.timeperframe.denominator = (unsigned int) cfg->initial_fps;
if (ioctl(fd, VIDIOC_S_PARM, &parm) < 0) {
perror("VIDIOC_S_PARM");
}
}
}
return 0;
}
@@ -359,6 +372,58 @@ static int queue_all_buffers(int fd, int num_buffers) {
return 0;
}
static int dequeue_latest_buffer(int fd, struct v4l2_buffer *latest_buf) {
struct v4l2_buffer latest_local;
bool have_latest = false;
if (latest_buf == NULL) {
errno = EINVAL;
return -1;
}
for (;;) {
struct v4l2_buffer current;
int dq_errno;
CLEAR(current);
current.type = V4L2_BUF_TYPE_VIDEO_CAPTURE;
current.memory = V4L2_MEMORY_MMAP;
if (ioctl(fd, VIDIOC_DQBUF, &current) < 0) {
dq_errno = errno;
if (dq_errno == EINTR) {
continue;
}
if (dq_errno == EAGAIN) {
if (!have_latest) {
errno = EAGAIN;
return 1;
}
*latest_buf = latest_local;
return 0;
}
if (have_latest && ioctl(fd, VIDIOC_QBUF, &latest_local) < 0) {
perror("VIDIOC_QBUF");
}
errno = dq_errno;
return -1;
}
if (have_latest && ioctl(fd, VIDIOC_QBUF, &latest_local) < 0) {
int q_errno = errno;
perror("VIDIOC_QBUF");
if (ioctl(fd, VIDIOC_QBUF, &current) < 0) {
perror("VIDIOC_QBUF");
}
errno = q_errno;
return -1;
}
latest_local = current;
have_latest = true;
}
}
static AVCodecContext *create_mjpeg_decoder(const worker_config_t *cfg) {
const AVCodec *decoder = avcodec_find_decoder(AV_CODEC_ID_MJPEG);
AVCodecContext *ctx;
@@ -715,10 +780,6 @@ int main(void) {
if (fps < 1) {
fps = 1;
}
if (next_deadline_ms > now_ms) {
usleep((useconds_t) ((next_deadline_ms - now_ms) * 1000.0));
}
next_deadline_ms = monotonic_ms() + (1000.0 / (double) fps);
FD_ZERO(&fds);
FD_SET(camera_fd, &fds);
@@ -733,10 +794,7 @@ int main(void) {
continue;
}
CLEAR(buf);
buf.type = V4L2_BUF_TYPE_VIDEO_CAPTURE;
buf.memory = V4L2_MEMORY_MMAP;
if (ioctl(camera_fd, VIDIOC_DQBUF, &buf) < 0) {
if (dequeue_latest_buffer(camera_fd, &buf) != 0) {
if (errno == EAGAIN) {
continue;
}
@@ -744,6 +802,13 @@ int main(void) {
break;
}
now_ms = monotonic_ms();
if (now_ms < next_deadline_ms) {
drop_reason = "paced_drop";
goto requeue_and_report;
}
next_deadline_ms = now_ms + (1000.0 / (double) fps);
if (decode_mjpeg_frame(decoder, (uint8_t *) buffers[buf.index].start, (int) buf.bytesused, &decoded_frame) != 0) {
drop_reason = "decode_failed";
goto requeue_and_report;

View File

@@ -30,9 +30,9 @@ def utc_iso_now() -> str:
def load_omnisocket_api():
from omnisocket import CONTROL_DEFAULTS, MSG_TYPE_BINARY, Session, VIDEO_DEFAULTS
from omnisocket import CONTROL_DEFAULTS, MSG_TYPE_BINARY, MSG_TYPE_ERROR, Session, VIDEO_DEFAULTS
return CONTROL_DEFAULTS, MSG_TYPE_BINARY, Session, VIDEO_DEFAULTS
return CONTROL_DEFAULTS, MSG_TYPE_BINARY, MSG_TYPE_ERROR, Session, VIDEO_DEFAULTS
def _merge_kcp_defaults(defaults: dict[str, Any], override: dict[str, Any]) -> dict[str, Any]:
@@ -50,7 +50,7 @@ def _load_config(config_path: str | None) -> dict[str, Any]:
with path.open("r", encoding="utf-8") as file:
raw = yaml.safe_load(file) or {}
control_defaults, _msg_type_binary, _session_cls, video_defaults = load_omnisocket_api()
control_defaults, _msg_type_binary, _msg_type_error, _session_cls, video_defaults = load_omnisocket_api()
transport = dict(raw.get("transport", {}))
control = dict(raw.get("control_sender", {}))
@@ -160,7 +160,7 @@ class QueuedControlEvent:
class ControlSessionManager:
def __init__(self, config: dict[str, Any]) -> None:
control_defaults, _msg_type_binary, session_cls, _video_defaults = load_omnisocket_api()
control_defaults, _msg_type_binary, _msg_type_error, session_cls, _video_defaults = load_omnisocket_api()
transport = config["transport"]
control_cfg = config["control_sender"]
daemon_cfg = config["daemon"]
@@ -403,12 +403,13 @@ class ControlSessionManager:
class VideoSessionManager:
def __init__(self, config: dict[str, Any]) -> None:
_control_defaults, msg_type_binary, session_cls, video_defaults = load_omnisocket_api()
_control_defaults, msg_type_binary, msg_type_error, session_cls, video_defaults = load_omnisocket_api()
transport = config["transport"]
video_cfg = config["video_receiver"]
daemon_cfg = config["daemon"]
self._msg_type_binary = msg_type_binary
self._msg_type_error = msg_type_error
self._session_cls = session_cls
self._connect_kwargs = {
"server_addr": transport["server_addr"],
@@ -517,9 +518,12 @@ class VideoSessionManager:
meta = session.recv_into(buffer, timeout_ms=200)
if meta is None:
continue
if meta.get("msg_type") != self._msg_type_binary:
continue
msg_type = int(meta.get("msg_type", -1))
frame = bytes(buffer[: int(meta["body_len"])])
if msg_type != self._msg_type_binary:
self._disconnect(self._describe_unexpected_message(msg_type, frame))
time.sleep(0.2)
break
jpeg_frame = self._extract_jpeg_frame(frame)
if jpeg_frame is None:
with self._lock:
@@ -535,6 +539,14 @@ class VideoSessionManager:
self._disconnect(str(error))
time.sleep(0.2)
def _describe_unexpected_message(self, msg_type: int, payload: bytes) -> str:
detail = payload.decode("utf-8", errors="replace").strip()
if msg_type == self._msg_type_error:
return f"video session rejected by server: {detail or 'unknown error'}"
if detail:
return f"received unexpected video message type {msg_type}: {detail}"
return f"received unexpected video message type {msg_type}"
def _connect(self) -> None:
session = self._session_cls()
try:
@@ -959,8 +971,11 @@ class OmniDaemonHTTPHandler(BaseHTTPRequestHandler):
self.send_header("Content-Length", str(len(payload)))
self.send_header("Cache-Control", "no-store")
self.send_header("Connection", "keep-alive")
self.end_headers()
self.wfile.write(payload)
try:
self.end_headers()
self.wfile.write(payload)
except (BrokenPipeError, ConnectionResetError):
return
class ASideOmniDaemon:

View File

@@ -39,9 +39,9 @@ def utc_iso_now() -> str:
def load_omnisocket_api():
from omnisocket import CONTROL_DEFAULTS, MSG_TYPE_BINARY, Session
from omnisocket import CONTROL_DEFAULTS, MSG_TYPE_BINARY, MSG_TYPE_ERROR, Session
return CONTROL_DEFAULTS, MSG_TYPE_BINARY, Session
return CONTROL_DEFAULTS, MSG_TYPE_BINARY, MSG_TYPE_ERROR, Session
def _merge_kcp_defaults(defaults: dict[str, Any], override: dict[str, Any]) -> dict[str, Any]:
@@ -115,7 +115,7 @@ def _load_config(config_path: str | None) -> dict[str, Any]:
with path.open("r", encoding="utf-8") as file:
raw = yaml.safe_load(file) or {}
control_defaults, _msg_type_binary, _session_cls = load_omnisocket_api()
control_defaults, _msg_type_binary, _msg_type_error, _session_cls = load_omnisocket_api()
transport = dict(raw.get("transport", {}))
control = dict(raw.get("control_receiver", {}))
video = dict(raw.get("video_sender", {}))
@@ -193,12 +193,13 @@ def _load_config(config_path: str | None) -> dict[str, Any]:
class ControlRecvManager:
def __init__(self, config: dict[str, Any]) -> None:
control_defaults, msg_type_binary, session_cls = load_omnisocket_api()
control_defaults, msg_type_binary, msg_type_error, session_cls = load_omnisocket_api()
transport = config["transport"]
control_cfg = config["control_receiver"]
daemon_cfg = config["daemon"]
self._msg_type_binary = msg_type_binary
self._msg_type_error = msg_type_error
self._session_cls = session_cls
self._connect_kwargs = {
"server_addr": transport["server_addr"],
@@ -307,6 +308,7 @@ class ControlRecvManager:
if msg_type != self._msg_type_binary:
with self._lock:
self._ignored_non_binary += 1
self._disconnect(self._describe_unexpected_message(msg_type, payload))
continue
if len(payload) != CONTROL_PACKET_STRUCT.size:
with self._lock:
@@ -346,6 +348,14 @@ class ControlRecvManager:
except Exception:
pass
def _describe_unexpected_message(self, msg_type: int, payload: bytes) -> str:
detail = payload.decode("utf-8", errors="replace").strip()
if msg_type == self._msg_type_error:
return f"control session rejected by server: {detail or 'unknown error'}"
if detail:
return f"received unexpected control message type {msg_type}: {detail}"
return f"received unexpected control message type {msg_type}"
def _enqueue_packet(self, payload: bytes) -> None:
try:
self._queue.put_nowait(payload)
@@ -1198,8 +1208,11 @@ class OmniDaemonHTTPHandler(BaseHTTPRequestHandler):
self.send_header("Content-Length", str(len(body)))
self.send_header("Cache-Control", "no-store")
self.send_header("Connection", "keep-alive")
self.end_headers()
self.wfile.write(body)
try:
self.end_headers()
self.wfile.write(body)
except (BrokenPipeError, ConnectionResetError):
return
class BSideOmniDaemon:

View File

@@ -5,6 +5,13 @@
#define UDP_RELAY_BUF_SIZE (64U * 1024U)
typedef struct udp_relay_client_entry {
struct udp_relay_client_entry *next;
uint32_t conv;
struct sockaddr_storage addr;
socklen_t addr_len;
} udp_relay_client_entry_t;
struct udp_relay {
int downstream_fd;
int upstream_fd;
@@ -12,9 +19,10 @@ struct udp_relay {
socklen_t upstream_addr_len;
char downstream_local_addr[OMNI_MAX_ADDR_TEXT];
char upstream_local_addr[OMNI_MAX_ADDR_TEXT];
struct sockaddr_storage client_addr;
socklen_t client_addr_len;
int has_client;
struct sockaddr_storage last_client_addr;
socklen_t last_client_addr_len;
int has_last_client;
udp_relay_client_entry_t *clients;
pthread_mutex_t lock;
pthread_mutex_t log_mu;
pthread_mutex_t state_mu;
@@ -131,22 +139,55 @@ static void udp_relay_note_result(udp_relay_t *relay, int rc, int errnum) {
pthread_mutex_unlock(&relay->state_mu);
}
static void udp_relay_record_client(udp_relay_t *relay, const struct sockaddr_storage *addr, socklen_t addr_len) {
static udp_relay_client_entry_t *udp_relay_find_client_locked(udp_relay_t *relay, uint32_t conv) {
udp_relay_client_entry_t *entry;
for (entry = relay->clients; entry != NULL; entry = entry->next) {
if (entry->conv == conv) {
return entry;
}
}
return NULL;
}
static void udp_relay_record_client(udp_relay_t *relay, int has_conv, uint32_t conv, const struct sockaddr_storage *addr, socklen_t addr_len) {
pthread_mutex_lock(&relay->lock);
memcpy(&relay->client_addr, addr, sizeof(*addr));
relay->client_addr_len = addr_len;
relay->has_client = 1;
memcpy(&relay->last_client_addr, addr, sizeof(*addr));
relay->last_client_addr_len = addr_len;
relay->has_last_client = 1;
if (has_conv) {
udp_relay_client_entry_t *entry = udp_relay_find_client_locked(relay, conv);
if (entry == NULL) {
entry = (udp_relay_client_entry_t *) calloc(1, sizeof(*entry));
if (entry != NULL) {
entry->conv = conv;
entry->next = relay->clients;
relay->clients = entry;
}
}
if (entry != NULL) {
memcpy(&entry->addr, addr, sizeof(*addr));
entry->addr_len = addr_len;
}
}
pthread_mutex_unlock(&relay->lock);
}
static int udp_relay_copy_client(udp_relay_t *relay, struct sockaddr_storage *addr, socklen_t *addr_len) {
int has_client;
static int udp_relay_copy_client(udp_relay_t *relay, int has_conv, uint32_t conv, struct sockaddr_storage *addr, socklen_t *addr_len) {
int has_client = 0;
pthread_mutex_lock(&relay->lock);
has_client = relay->has_client;
if (has_client) {
memcpy(addr, &relay->client_addr, sizeof(*addr));
*addr_len = relay->client_addr_len;
if (has_conv) {
udp_relay_client_entry_t *entry = udp_relay_find_client_locked(relay, conv);
if (entry != NULL) {
memcpy(addr, &entry->addr, sizeof(*addr));
*addr_len = entry->addr_len;
has_client = 1;
}
} else if (relay->has_last_client) {
memcpy(addr, &relay->last_client_addr, sizeof(*addr));
*addr_len = relay->last_client_addr_len;
has_client = 1;
}
pthread_mutex_unlock(&relay->lock);
return has_client;
@@ -160,6 +201,8 @@ static void *udp_relay_forward_downstream_to_upstream(void *arg) {
struct sockaddr_storage source;
socklen_t source_len = sizeof(source);
ssize_t n = recvfrom(relay->downstream_fd, buffer, sizeof(buffer), 0, (struct sockaddr *) &source, &source_len);
int has_conv = 0;
uint32_t conv = 0;
if (n < 0) {
int errnum = errno;
@@ -174,7 +217,8 @@ static void *udp_relay_forward_downstream_to_upstream(void *arg) {
return NULL;
}
udp_relay_record_client(relay, &source, source_len);
udp_relay_parse_kcp_summary(buffer, (size_t) n, &has_conv, &conv, NULL);
udp_relay_record_client(relay, has_conv, conv, &source, source_len);
udp_relay_print_packet(relay, "relay_downstream_rx", relay->downstream_local_addr, &source, source_len, buffer, (size_t) n);
for (;;) {
if (send(relay->upstream_fd, buffer, (size_t) n, 0) >= 0) {
@@ -205,6 +249,8 @@ static void *udp_relay_forward_upstream_to_downstream(void *arg) {
struct sockaddr_storage client_addr;
socklen_t client_addr_len = 0;
ssize_t n = recv(relay->upstream_fd, buffer, sizeof(buffer), 0);
int has_conv = 0;
uint32_t conv = 0;
if (n < 0) {
int errnum = errno;
@@ -220,7 +266,8 @@ static void *udp_relay_forward_upstream_to_downstream(void *arg) {
}
udp_relay_print_packet(relay, "relay_upstream_rx", relay->upstream_local_addr, &relay->upstream_addr, relay->upstream_addr_len, buffer, (size_t) n);
if (!udp_relay_copy_client(relay, &client_addr, &client_addr_len)) {
udp_relay_parse_kcp_summary(buffer, (size_t) n, &has_conv, &conv, NULL);
if (!udp_relay_copy_client(relay, has_conv, conv, &client_addr, &client_addr_len)) {
udp_relay_print_packet(relay, "relay_upstream_drop_no_client", relay->upstream_local_addr, &relay->upstream_addr, relay->upstream_addr_len, buffer, (size_t) n);
continue;
}
@@ -404,11 +451,18 @@ int udp_relay_close(udp_relay_t *relay) {
}
void udp_relay_free(udp_relay_t *relay) {
udp_relay_client_entry_t *entry;
udp_relay_client_entry_t *next;
if (relay == NULL) {
return;
}
udp_relay_close(relay);
udp_relay_join_threads(relay);
for (entry = relay->clients; entry != NULL; entry = next) {
next = entry->next;
free(entry);
}
pthread_mutex_destroy(&relay->lock);
pthread_mutex_destroy(&relay->log_mu);
pthread_cond_destroy(&relay->state_cond);