Compare commits
4 Commits
878e11e597
...
dev
| Author | SHA1 | Date | |
|---|---|---|---|
| 0f2405cb04 | |||
| c7b995efd7 | |||
| 9cd1f88bfc | |||
| 6d77dc26bd |
@@ -285,6 +285,7 @@ static int open_v4l2_device(const char *device) {
|
||||
|
||||
static int init_v4l2_device(int fd, const worker_config_t *cfg) {
|
||||
struct v4l2_format fmt;
|
||||
struct v4l2_streamparm parm;
|
||||
|
||||
CLEAR(fmt);
|
||||
fmt.type = V4L2_BUF_TYPE_VIDEO_CAPTURE;
|
||||
@@ -297,6 +298,18 @@ static int init_v4l2_device(int fd, const worker_config_t *cfg) {
|
||||
perror("VIDIOC_S_FMT");
|
||||
return -1;
|
||||
}
|
||||
|
||||
CLEAR(parm);
|
||||
parm.type = V4L2_BUF_TYPE_VIDEO_CAPTURE;
|
||||
if (cfg->initial_fps > 0 && ioctl(fd, VIDIOC_G_PARM, &parm) == 0) {
|
||||
if ((parm.parm.capture.capability & V4L2_CAP_TIMEPERFRAME) != 0U) {
|
||||
parm.parm.capture.timeperframe.numerator = 1U;
|
||||
parm.parm.capture.timeperframe.denominator = (unsigned int) cfg->initial_fps;
|
||||
if (ioctl(fd, VIDIOC_S_PARM, &parm) < 0) {
|
||||
perror("VIDIOC_S_PARM");
|
||||
}
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
@@ -359,6 +372,58 @@ static int queue_all_buffers(int fd, int num_buffers) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int dequeue_latest_buffer(int fd, struct v4l2_buffer *latest_buf) {
|
||||
struct v4l2_buffer latest_local;
|
||||
bool have_latest = false;
|
||||
|
||||
if (latest_buf == NULL) {
|
||||
errno = EINVAL;
|
||||
return -1;
|
||||
}
|
||||
|
||||
for (;;) {
|
||||
struct v4l2_buffer current;
|
||||
int dq_errno;
|
||||
|
||||
CLEAR(current);
|
||||
current.type = V4L2_BUF_TYPE_VIDEO_CAPTURE;
|
||||
current.memory = V4L2_MEMORY_MMAP;
|
||||
if (ioctl(fd, VIDIOC_DQBUF, ¤t) < 0) {
|
||||
dq_errno = errno;
|
||||
if (dq_errno == EINTR) {
|
||||
continue;
|
||||
}
|
||||
if (dq_errno == EAGAIN) {
|
||||
if (!have_latest) {
|
||||
errno = EAGAIN;
|
||||
return 1;
|
||||
}
|
||||
*latest_buf = latest_local;
|
||||
return 0;
|
||||
}
|
||||
if (have_latest && ioctl(fd, VIDIOC_QBUF, &latest_local) < 0) {
|
||||
perror("VIDIOC_QBUF");
|
||||
}
|
||||
errno = dq_errno;
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (have_latest && ioctl(fd, VIDIOC_QBUF, &latest_local) < 0) {
|
||||
int q_errno = errno;
|
||||
|
||||
perror("VIDIOC_QBUF");
|
||||
if (ioctl(fd, VIDIOC_QBUF, ¤t) < 0) {
|
||||
perror("VIDIOC_QBUF");
|
||||
}
|
||||
errno = q_errno;
|
||||
return -1;
|
||||
}
|
||||
|
||||
latest_local = current;
|
||||
have_latest = true;
|
||||
}
|
||||
}
|
||||
|
||||
static AVCodecContext *create_mjpeg_decoder(const worker_config_t *cfg) {
|
||||
const AVCodec *decoder = avcodec_find_decoder(AV_CODEC_ID_MJPEG);
|
||||
AVCodecContext *ctx;
|
||||
@@ -715,10 +780,6 @@ int main(void) {
|
||||
if (fps < 1) {
|
||||
fps = 1;
|
||||
}
|
||||
if (next_deadline_ms > now_ms) {
|
||||
usleep((useconds_t) ((next_deadline_ms - now_ms) * 1000.0));
|
||||
}
|
||||
next_deadline_ms = monotonic_ms() + (1000.0 / (double) fps);
|
||||
|
||||
FD_ZERO(&fds);
|
||||
FD_SET(camera_fd, &fds);
|
||||
@@ -733,10 +794,7 @@ int main(void) {
|
||||
continue;
|
||||
}
|
||||
|
||||
CLEAR(buf);
|
||||
buf.type = V4L2_BUF_TYPE_VIDEO_CAPTURE;
|
||||
buf.memory = V4L2_MEMORY_MMAP;
|
||||
if (ioctl(camera_fd, VIDIOC_DQBUF, &buf) < 0) {
|
||||
if (dequeue_latest_buffer(camera_fd, &buf) != 0) {
|
||||
if (errno == EAGAIN) {
|
||||
continue;
|
||||
}
|
||||
@@ -744,6 +802,13 @@ int main(void) {
|
||||
break;
|
||||
}
|
||||
|
||||
now_ms = monotonic_ms();
|
||||
if (now_ms < next_deadline_ms) {
|
||||
drop_reason = "paced_drop";
|
||||
goto requeue_and_report;
|
||||
}
|
||||
next_deadline_ms = now_ms + (1000.0 / (double) fps);
|
||||
|
||||
if (decode_mjpeg_frame(decoder, (uint8_t *) buffers[buf.index].start, (int) buf.bytesused, &decoded_frame) != 0) {
|
||||
drop_reason = "decode_failed";
|
||||
goto requeue_and_report;
|
||||
|
||||
@@ -30,9 +30,9 @@ def utc_iso_now() -> str:
|
||||
|
||||
|
||||
def load_omnisocket_api():
|
||||
from omnisocket import CONTROL_DEFAULTS, MSG_TYPE_BINARY, Session, VIDEO_DEFAULTS
|
||||
from omnisocket import CONTROL_DEFAULTS, MSG_TYPE_BINARY, MSG_TYPE_ERROR, Session, VIDEO_DEFAULTS
|
||||
|
||||
return CONTROL_DEFAULTS, MSG_TYPE_BINARY, Session, VIDEO_DEFAULTS
|
||||
return CONTROL_DEFAULTS, MSG_TYPE_BINARY, MSG_TYPE_ERROR, Session, VIDEO_DEFAULTS
|
||||
|
||||
|
||||
def _merge_kcp_defaults(defaults: dict[str, Any], override: dict[str, Any]) -> dict[str, Any]:
|
||||
@@ -50,7 +50,7 @@ def _load_config(config_path: str | None) -> dict[str, Any]:
|
||||
with path.open("r", encoding="utf-8") as file:
|
||||
raw = yaml.safe_load(file) or {}
|
||||
|
||||
control_defaults, _msg_type_binary, _session_cls, video_defaults = load_omnisocket_api()
|
||||
control_defaults, _msg_type_binary, _msg_type_error, _session_cls, video_defaults = load_omnisocket_api()
|
||||
|
||||
transport = dict(raw.get("transport", {}))
|
||||
control = dict(raw.get("control_sender", {}))
|
||||
@@ -160,7 +160,7 @@ class QueuedControlEvent:
|
||||
|
||||
class ControlSessionManager:
|
||||
def __init__(self, config: dict[str, Any]) -> None:
|
||||
control_defaults, _msg_type_binary, session_cls, _video_defaults = load_omnisocket_api()
|
||||
control_defaults, _msg_type_binary, _msg_type_error, session_cls, _video_defaults = load_omnisocket_api()
|
||||
transport = config["transport"]
|
||||
control_cfg = config["control_sender"]
|
||||
daemon_cfg = config["daemon"]
|
||||
@@ -403,12 +403,13 @@ class ControlSessionManager:
|
||||
|
||||
class VideoSessionManager:
|
||||
def __init__(self, config: dict[str, Any]) -> None:
|
||||
_control_defaults, msg_type_binary, session_cls, video_defaults = load_omnisocket_api()
|
||||
_control_defaults, msg_type_binary, msg_type_error, session_cls, video_defaults = load_omnisocket_api()
|
||||
transport = config["transport"]
|
||||
video_cfg = config["video_receiver"]
|
||||
daemon_cfg = config["daemon"]
|
||||
|
||||
self._msg_type_binary = msg_type_binary
|
||||
self._msg_type_error = msg_type_error
|
||||
self._session_cls = session_cls
|
||||
self._connect_kwargs = {
|
||||
"server_addr": transport["server_addr"],
|
||||
@@ -517,9 +518,12 @@ class VideoSessionManager:
|
||||
meta = session.recv_into(buffer, timeout_ms=200)
|
||||
if meta is None:
|
||||
continue
|
||||
if meta.get("msg_type") != self._msg_type_binary:
|
||||
continue
|
||||
msg_type = int(meta.get("msg_type", -1))
|
||||
frame = bytes(buffer[: int(meta["body_len"])])
|
||||
if msg_type != self._msg_type_binary:
|
||||
self._disconnect(self._describe_unexpected_message(msg_type, frame))
|
||||
time.sleep(0.2)
|
||||
break
|
||||
jpeg_frame = self._extract_jpeg_frame(frame)
|
||||
if jpeg_frame is None:
|
||||
with self._lock:
|
||||
@@ -535,6 +539,14 @@ class VideoSessionManager:
|
||||
self._disconnect(str(error))
|
||||
time.sleep(0.2)
|
||||
|
||||
def _describe_unexpected_message(self, msg_type: int, payload: bytes) -> str:
|
||||
detail = payload.decode("utf-8", errors="replace").strip()
|
||||
if msg_type == self._msg_type_error:
|
||||
return f"video session rejected by server: {detail or 'unknown error'}"
|
||||
if detail:
|
||||
return f"received unexpected video message type {msg_type}: {detail}"
|
||||
return f"received unexpected video message type {msg_type}"
|
||||
|
||||
def _connect(self) -> None:
|
||||
session = self._session_cls()
|
||||
try:
|
||||
@@ -959,8 +971,11 @@ class OmniDaemonHTTPHandler(BaseHTTPRequestHandler):
|
||||
self.send_header("Content-Length", str(len(payload)))
|
||||
self.send_header("Cache-Control", "no-store")
|
||||
self.send_header("Connection", "keep-alive")
|
||||
self.end_headers()
|
||||
self.wfile.write(payload)
|
||||
try:
|
||||
self.end_headers()
|
||||
self.wfile.write(payload)
|
||||
except (BrokenPipeError, ConnectionResetError):
|
||||
return
|
||||
|
||||
|
||||
class ASideOmniDaemon:
|
||||
|
||||
@@ -39,9 +39,9 @@ def utc_iso_now() -> str:
|
||||
|
||||
|
||||
def load_omnisocket_api():
|
||||
from omnisocket import CONTROL_DEFAULTS, MSG_TYPE_BINARY, Session
|
||||
from omnisocket import CONTROL_DEFAULTS, MSG_TYPE_BINARY, MSG_TYPE_ERROR, Session
|
||||
|
||||
return CONTROL_DEFAULTS, MSG_TYPE_BINARY, Session
|
||||
return CONTROL_DEFAULTS, MSG_TYPE_BINARY, MSG_TYPE_ERROR, Session
|
||||
|
||||
|
||||
def _merge_kcp_defaults(defaults: dict[str, Any], override: dict[str, Any]) -> dict[str, Any]:
|
||||
@@ -115,7 +115,7 @@ def _load_config(config_path: str | None) -> dict[str, Any]:
|
||||
with path.open("r", encoding="utf-8") as file:
|
||||
raw = yaml.safe_load(file) or {}
|
||||
|
||||
control_defaults, _msg_type_binary, _session_cls = load_omnisocket_api()
|
||||
control_defaults, _msg_type_binary, _msg_type_error, _session_cls = load_omnisocket_api()
|
||||
transport = dict(raw.get("transport", {}))
|
||||
control = dict(raw.get("control_receiver", {}))
|
||||
video = dict(raw.get("video_sender", {}))
|
||||
@@ -193,12 +193,13 @@ def _load_config(config_path: str | None) -> dict[str, Any]:
|
||||
|
||||
class ControlRecvManager:
|
||||
def __init__(self, config: dict[str, Any]) -> None:
|
||||
control_defaults, msg_type_binary, session_cls = load_omnisocket_api()
|
||||
control_defaults, msg_type_binary, msg_type_error, session_cls = load_omnisocket_api()
|
||||
transport = config["transport"]
|
||||
control_cfg = config["control_receiver"]
|
||||
daemon_cfg = config["daemon"]
|
||||
|
||||
self._msg_type_binary = msg_type_binary
|
||||
self._msg_type_error = msg_type_error
|
||||
self._session_cls = session_cls
|
||||
self._connect_kwargs = {
|
||||
"server_addr": transport["server_addr"],
|
||||
@@ -307,6 +308,7 @@ class ControlRecvManager:
|
||||
if msg_type != self._msg_type_binary:
|
||||
with self._lock:
|
||||
self._ignored_non_binary += 1
|
||||
self._disconnect(self._describe_unexpected_message(msg_type, payload))
|
||||
continue
|
||||
if len(payload) != CONTROL_PACKET_STRUCT.size:
|
||||
with self._lock:
|
||||
@@ -346,6 +348,14 @@ class ControlRecvManager:
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def _describe_unexpected_message(self, msg_type: int, payload: bytes) -> str:
|
||||
detail = payload.decode("utf-8", errors="replace").strip()
|
||||
if msg_type == self._msg_type_error:
|
||||
return f"control session rejected by server: {detail or 'unknown error'}"
|
||||
if detail:
|
||||
return f"received unexpected control message type {msg_type}: {detail}"
|
||||
return f"received unexpected control message type {msg_type}"
|
||||
|
||||
def _enqueue_packet(self, payload: bytes) -> None:
|
||||
try:
|
||||
self._queue.put_nowait(payload)
|
||||
@@ -1198,8 +1208,11 @@ class OmniDaemonHTTPHandler(BaseHTTPRequestHandler):
|
||||
self.send_header("Content-Length", str(len(body)))
|
||||
self.send_header("Cache-Control", "no-store")
|
||||
self.send_header("Connection", "keep-alive")
|
||||
self.end_headers()
|
||||
self.wfile.write(body)
|
||||
try:
|
||||
self.end_headers()
|
||||
self.wfile.write(body)
|
||||
except (BrokenPipeError, ConnectionResetError):
|
||||
return
|
||||
|
||||
|
||||
class BSideOmniDaemon:
|
||||
|
||||
@@ -5,6 +5,13 @@
|
||||
|
||||
#define UDP_RELAY_BUF_SIZE (64U * 1024U)
|
||||
|
||||
typedef struct udp_relay_client_entry {
|
||||
struct udp_relay_client_entry *next;
|
||||
uint32_t conv;
|
||||
struct sockaddr_storage addr;
|
||||
socklen_t addr_len;
|
||||
} udp_relay_client_entry_t;
|
||||
|
||||
struct udp_relay {
|
||||
int downstream_fd;
|
||||
int upstream_fd;
|
||||
@@ -12,9 +19,10 @@ struct udp_relay {
|
||||
socklen_t upstream_addr_len;
|
||||
char downstream_local_addr[OMNI_MAX_ADDR_TEXT];
|
||||
char upstream_local_addr[OMNI_MAX_ADDR_TEXT];
|
||||
struct sockaddr_storage client_addr;
|
||||
socklen_t client_addr_len;
|
||||
int has_client;
|
||||
struct sockaddr_storage last_client_addr;
|
||||
socklen_t last_client_addr_len;
|
||||
int has_last_client;
|
||||
udp_relay_client_entry_t *clients;
|
||||
pthread_mutex_t lock;
|
||||
pthread_mutex_t log_mu;
|
||||
pthread_mutex_t state_mu;
|
||||
@@ -131,22 +139,55 @@ static void udp_relay_note_result(udp_relay_t *relay, int rc, int errnum) {
|
||||
pthread_mutex_unlock(&relay->state_mu);
|
||||
}
|
||||
|
||||
static void udp_relay_record_client(udp_relay_t *relay, const struct sockaddr_storage *addr, socklen_t addr_len) {
|
||||
static udp_relay_client_entry_t *udp_relay_find_client_locked(udp_relay_t *relay, uint32_t conv) {
|
||||
udp_relay_client_entry_t *entry;
|
||||
|
||||
for (entry = relay->clients; entry != NULL; entry = entry->next) {
|
||||
if (entry->conv == conv) {
|
||||
return entry;
|
||||
}
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static void udp_relay_record_client(udp_relay_t *relay, int has_conv, uint32_t conv, const struct sockaddr_storage *addr, socklen_t addr_len) {
|
||||
pthread_mutex_lock(&relay->lock);
|
||||
memcpy(&relay->client_addr, addr, sizeof(*addr));
|
||||
relay->client_addr_len = addr_len;
|
||||
relay->has_client = 1;
|
||||
memcpy(&relay->last_client_addr, addr, sizeof(*addr));
|
||||
relay->last_client_addr_len = addr_len;
|
||||
relay->has_last_client = 1;
|
||||
if (has_conv) {
|
||||
udp_relay_client_entry_t *entry = udp_relay_find_client_locked(relay, conv);
|
||||
if (entry == NULL) {
|
||||
entry = (udp_relay_client_entry_t *) calloc(1, sizeof(*entry));
|
||||
if (entry != NULL) {
|
||||
entry->conv = conv;
|
||||
entry->next = relay->clients;
|
||||
relay->clients = entry;
|
||||
}
|
||||
}
|
||||
if (entry != NULL) {
|
||||
memcpy(&entry->addr, addr, sizeof(*addr));
|
||||
entry->addr_len = addr_len;
|
||||
}
|
||||
}
|
||||
pthread_mutex_unlock(&relay->lock);
|
||||
}
|
||||
|
||||
static int udp_relay_copy_client(udp_relay_t *relay, struct sockaddr_storage *addr, socklen_t *addr_len) {
|
||||
int has_client;
|
||||
static int udp_relay_copy_client(udp_relay_t *relay, int has_conv, uint32_t conv, struct sockaddr_storage *addr, socklen_t *addr_len) {
|
||||
int has_client = 0;
|
||||
|
||||
pthread_mutex_lock(&relay->lock);
|
||||
has_client = relay->has_client;
|
||||
if (has_client) {
|
||||
memcpy(addr, &relay->client_addr, sizeof(*addr));
|
||||
*addr_len = relay->client_addr_len;
|
||||
if (has_conv) {
|
||||
udp_relay_client_entry_t *entry = udp_relay_find_client_locked(relay, conv);
|
||||
if (entry != NULL) {
|
||||
memcpy(addr, &entry->addr, sizeof(*addr));
|
||||
*addr_len = entry->addr_len;
|
||||
has_client = 1;
|
||||
}
|
||||
} else if (relay->has_last_client) {
|
||||
memcpy(addr, &relay->last_client_addr, sizeof(*addr));
|
||||
*addr_len = relay->last_client_addr_len;
|
||||
has_client = 1;
|
||||
}
|
||||
pthread_mutex_unlock(&relay->lock);
|
||||
return has_client;
|
||||
@@ -160,6 +201,8 @@ static void *udp_relay_forward_downstream_to_upstream(void *arg) {
|
||||
struct sockaddr_storage source;
|
||||
socklen_t source_len = sizeof(source);
|
||||
ssize_t n = recvfrom(relay->downstream_fd, buffer, sizeof(buffer), 0, (struct sockaddr *) &source, &source_len);
|
||||
int has_conv = 0;
|
||||
uint32_t conv = 0;
|
||||
|
||||
if (n < 0) {
|
||||
int errnum = errno;
|
||||
@@ -174,7 +217,8 @@ static void *udp_relay_forward_downstream_to_upstream(void *arg) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
udp_relay_record_client(relay, &source, source_len);
|
||||
udp_relay_parse_kcp_summary(buffer, (size_t) n, &has_conv, &conv, NULL);
|
||||
udp_relay_record_client(relay, has_conv, conv, &source, source_len);
|
||||
udp_relay_print_packet(relay, "relay_downstream_rx", relay->downstream_local_addr, &source, source_len, buffer, (size_t) n);
|
||||
for (;;) {
|
||||
if (send(relay->upstream_fd, buffer, (size_t) n, 0) >= 0) {
|
||||
@@ -205,6 +249,8 @@ static void *udp_relay_forward_upstream_to_downstream(void *arg) {
|
||||
struct sockaddr_storage client_addr;
|
||||
socklen_t client_addr_len = 0;
|
||||
ssize_t n = recv(relay->upstream_fd, buffer, sizeof(buffer), 0);
|
||||
int has_conv = 0;
|
||||
uint32_t conv = 0;
|
||||
|
||||
if (n < 0) {
|
||||
int errnum = errno;
|
||||
@@ -220,7 +266,8 @@ static void *udp_relay_forward_upstream_to_downstream(void *arg) {
|
||||
}
|
||||
|
||||
udp_relay_print_packet(relay, "relay_upstream_rx", relay->upstream_local_addr, &relay->upstream_addr, relay->upstream_addr_len, buffer, (size_t) n);
|
||||
if (!udp_relay_copy_client(relay, &client_addr, &client_addr_len)) {
|
||||
udp_relay_parse_kcp_summary(buffer, (size_t) n, &has_conv, &conv, NULL);
|
||||
if (!udp_relay_copy_client(relay, has_conv, conv, &client_addr, &client_addr_len)) {
|
||||
udp_relay_print_packet(relay, "relay_upstream_drop_no_client", relay->upstream_local_addr, &relay->upstream_addr, relay->upstream_addr_len, buffer, (size_t) n);
|
||||
continue;
|
||||
}
|
||||
@@ -404,11 +451,18 @@ int udp_relay_close(udp_relay_t *relay) {
|
||||
}
|
||||
|
||||
void udp_relay_free(udp_relay_t *relay) {
|
||||
udp_relay_client_entry_t *entry;
|
||||
udp_relay_client_entry_t *next;
|
||||
|
||||
if (relay == NULL) {
|
||||
return;
|
||||
}
|
||||
udp_relay_close(relay);
|
||||
udp_relay_join_threads(relay);
|
||||
for (entry = relay->clients; entry != NULL; entry = next) {
|
||||
next = entry->next;
|
||||
free(entry);
|
||||
}
|
||||
pthread_mutex_destroy(&relay->lock);
|
||||
pthread_mutex_destroy(&relay->log_mu);
|
||||
pthread_cond_destroy(&relay->state_cond);
|
||||
|
||||
Reference in New Issue
Block a user