diff --git a/README.md b/README.md index c77e734..28cba04 100644 --- a/README.md +++ b/README.md @@ -33,6 +33,7 @@ Server `D` runs the KCP hub on `0.0.0.0:10909`: ```bash ./bin/kcpserver -listen 0.0.0.0:10909 \ + -telemetry-peer peer-a-telemetry \ -kcp-ts-debug-log logs/d-kcp-ts.jsonl \ -kcp-session-stats-log logs/d-kcp-stats.jsonl ``` diff --git a/cmd/kcpserver.c b/cmd/kcpserver.c index c73b1ed..afcf8f4 100644 --- a/cmd/kcpserver.c +++ b/cmd/kcpserver.c @@ -6,6 +6,7 @@ static void kcpserver_usage(FILE *out) { fprintf(out, "usage: kcpserver [-mode hub|relay] [-listen addr] [-bind-device dev]\n"); fprintf(out, " [-latency-log path] [-kcp-ts-debug-log path]\n"); fprintf(out, " [-kcp-session-stats-log path] [-kcp-session-stats-interval 100ms]\n"); + fprintf(out, " [-telemetry-peer peer-id] [-telemetry-interval 500ms]\n"); fprintf(out, " [-relay-remote addr] [-relay-listen addr] [-relay-peer addr]\n"); } @@ -17,10 +18,13 @@ int main(int argc, char **argv) { const char *packet_log_path = ""; const char *stats_log_path = ""; const char *stats_interval_raw = ""; + const char *telemetry_peer_id = ""; + const char *telemetry_interval_raw = ""; const char *relay_listen_alias = ""; const char *relay_remote_addr = ""; const char *relay_peer_alias = ""; int stats_interval_ms = KCP_DEFAULT_STATS_INTERVAL_MS; + int telemetry_interval_ms = 500; int i; int rc = 1; @@ -84,6 +88,20 @@ int main(int argc, char **argv) { stats_interval_raw = value; continue; } + if ((handled = cli_parse_value_flag(argc, argv, &i, argv[i], "-telemetry-peer", &value)) < 0) { + fprintf(stderr, "kcpserver: flag -telemetry-peer requires a value\n"); + return 1; + } else if (handled) { + telemetry_peer_id = value; + continue; + } + if ((handled = cli_parse_value_flag(argc, argv, &i, argv[i], "-telemetry-interval", &value)) < 0) { + fprintf(stderr, "kcpserver: flag -telemetry-interval requires a value\n"); + return 1; + } else if (handled) { + telemetry_interval_raw = value; + continue; + } if ((handled = cli_parse_value_flag(argc, argv, &i, argv[i], "-relay-listen", &value)) < 0) { fprintf(stderr, "kcpserver: flag -relay-listen requires a value\n"); return 1; @@ -118,6 +136,10 @@ int main(int argc, char **argv) { fprintf(stderr, "kcpserver: invalid -kcp-session-stats-interval value %s\n", stats_interval_raw); return 1; } + if (omni_parse_duration_ms(telemetry_interval_raw, 500, &telemetry_interval_ms) != 0) { + fprintf(stderr, "kcpserver: invalid -telemetry-interval value %s\n", telemetry_interval_raw); + return 1; + } if (relay_peer_alias[0] != '\0' && relay_remote_addr[0] != '\0' && strcmp(relay_peer_alias, relay_remote_addr) != 0) { fprintf(stderr, "kcpserver: flags -relay-remote and -relay-peer must match when both are set\n"); @@ -178,6 +200,10 @@ int main(int argc, char **argv) { fprintf(stderr, "kcpserver: create hub failed\n"); goto cleanup; } + if (telemetry_peer_id[0] != '\0' && kcp_hub_set_telemetry(hub, telemetry_peer_id, telemetry_interval_ms) != 0) { + fprintf(stderr, "kcpserver: configure telemetry peer %s failed\n", telemetry_peer_id); + goto cleanup; + } fprintf(stderr, "kcp hub listening on %s\n", listen_addr); if (kcp_hub_serve_listener(hub, listener) != 0) { fprintf(stderr, "kcpserver: serve listener failed\n"); @@ -188,6 +214,10 @@ int main(int argc, char **argv) { } if (strcmp(mode, "relay") == 0) { + if (telemetry_peer_id[0] != '\0') { + fprintf(stderr, "kcpserver: flag -telemetry-peer may only be used in hub mode\n"); + return 1; + } if (bind_device[0] != '\0') { fprintf(stderr, "kcpserver: flag -bind-device is not supported in relay mode\n"); return 1; diff --git a/include/kcp_session_stats.h b/include/kcp_session_stats.h index d95f01e..a274df6 100644 --- a/include/kcp_session_stats.h +++ b/include/kcp_session_stats.h @@ -26,6 +26,16 @@ typedef struct kcp_session_stats_record { int32_t srtt_ms; int has_srttvar_ms; int32_t srttvar_ms; + int has_snd_wnd; + uint32_t snd_wnd; + int has_rmt_wnd; + uint32_t rmt_wnd; + int has_inflight; + uint32_t inflight; + int has_window_limit; + uint32_t window_limit; + int has_window_pressure_pct; + double window_pressure_pct; int has_bytes_sent; uint64_t bytes_sent; int has_bytes_received; diff --git a/include/server_kcp_hub.h b/include/server_kcp_hub.h index dd4117f..37140df 100644 --- a/include/server_kcp_hub.h +++ b/include/server_kcp_hub.h @@ -14,6 +14,7 @@ int kcp_hub_serve_listener(kcp_hub_t *hub, kcp_listener_t *listener); int kcp_hub_serve_session(kcp_hub_t *hub, kcp_conn_t *conn); int kcp_hub_set_relay(kcp_hub_t *hub, int relay_fd, const struct sockaddr *peer_addr, socklen_t peer_addr_len, int learn_peer); +int kcp_hub_set_telemetry(kcp_hub_t *hub, const char *peer_id, int interval_ms); int kcp_hub_serve_relay(kcp_hub_t *hub); int kcp_hub_close(kcp_hub_t *hub); diff --git a/include/transport_kcp.h b/include/transport_kcp.h index 874169e..375fc6b 100644 --- a/include/transport_kcp.h +++ b/include/transport_kcp.h @@ -34,6 +34,14 @@ extern "C" { #define KCP_VIDEO_RCV_WND 256 #define KCP_VIDEO_MTU 1400 +#define KCP_TELEMETRY_NODELAY 0 +#define KCP_TELEMETRY_INTERVAL_MS 50 +#define KCP_TELEMETRY_RESEND 0 +#define KCP_TELEMETRY_NC 0 +#define KCP_TELEMETRY_SND_WND 64 +#define KCP_TELEMETRY_RCV_WND 64 +#define KCP_TELEMETRY_MTU 1400 + #define KCP_NODELAY KCP_DEFAULT_NODELAY #define KCP_INTERVAL KCP_DEFAULT_INTERVAL_MS #define KCP_RESEND KCP_DEFAULT_RESEND @@ -49,9 +57,19 @@ typedef struct kcp_runtime_stats { uint32_t rto_ms; int32_t srtt_ms; int32_t srttvar_ms; + uint32_t snd_wnd; + uint32_t rmt_wnd; + uint32_t inflight; + uint32_t window_limit; + double window_pressure_pct; uint32_t snd_queue; uint32_t rcv_queue; uint32_t snd_buffer; + uint64_t out_segs_total; + uint64_t retrans_total; + uint64_t fast_retrans_total; + uint64_t lost_total; + uint64_t repeat_total; uint32_t xmit_total; } kcp_runtime_stats_t; typedef struct kcp_conn_options { @@ -67,6 +85,7 @@ typedef struct kcp_conn_options { void kcp_conn_options_init(kcp_conn_options_t *options); void kcp_conn_options_set_control_defaults(kcp_conn_options_t *options); void kcp_conn_options_set_video_defaults(kcp_conn_options_t *options); +void kcp_conn_options_set_telemetry_defaults(kcp_conn_options_t *options); kcp_conn_t *kcp_conn_dial_with_options(const char *server_addr, const char *bind_ip, const char *bind_device, const kcp_conn_options_t *options, kcp_packet_debug_logger_t *packet_logger, latency_logger_t *logger, const char *node_role, const char *node_id, kcp_session_stats_logger_t *stats_logger, int stats_interval_ms); kcp_conn_t *kcp_conn_dial(const char *server_addr, const char *bind_ip, const char *bind_device, kcp_packet_debug_logger_t *packet_logger, latency_logger_t *logger, const char *node_role, const char *node_id, kcp_session_stats_logger_t *stats_logger, int stats_interval_ms); @@ -79,6 +98,7 @@ int kcp_conn_close(kcp_conn_t *conn); void kcp_conn_free(kcp_conn_t *conn); uint32_t kcp_conn_conv(const kcp_conn_t *conn); int kcp_conn_local_addr(const kcp_conn_t *conn, struct sockaddr_storage *addr, socklen_t *addr_len); +int kcp_conn_remote_addr(const kcp_conn_t *conn, struct sockaddr_storage *addr, socklen_t *addr_len); void kcp_conn_runtime_stats_snapshot(kcp_conn_t *conn, kcp_runtime_stats_t *out_stats); kcp_listener_t *kcp_listener_listen(const char *listen_addr, const char *bind_device, kcp_packet_debug_logger_t *packet_logger, const char *node_role, const char *node_id); diff --git a/python/omnisocket/__init__.py b/python/omnisocket/__init__.py index 47fbdd5..2b23277 100644 --- a/python/omnisocket/__init__.py +++ b/python/omnisocket/__init__.py @@ -33,8 +33,19 @@ VIDEO_DEFAULTS = { "mtu": 1400, } +TELEMETRY_DEFAULTS = { + "nodelay": 0, + "interval_ms": 50, + "resend": 0, + "nc": 0, + "sndwnd": 64, + "rcvwnd": 64, + "mtu": 1400, +} + __all__ = [ "CONTROL_DEFAULTS", + "TELEMETRY_DEFAULTS", "VIDEO_DEFAULTS", "MSG_TYPE_BINARY", "MSG_TYPE_ERROR", diff --git a/python/omnisocket/_omnisocket.c b/python/omnisocket/_omnisocket.c index 0739ec7..0538ec4 100644 --- a/python/omnisocket/_omnisocket.c +++ b/python/omnisocket/_omnisocket.c @@ -88,27 +88,52 @@ static PyObject *build_stats_dict(const omnisocket_session_stats_t *stats) { } static PyObject *build_kcp_stats_dict(const omnisocket_session_kcp_stats_t *stats) { - return Py_BuildValue( - "{s:i,s:I,s:I,s:i,s:i,s:I,s:I,s:I,s:I}", - "connected", - stats->connected, - "conv", - stats->conv, - "rto_ms", - stats->rto_ms, - "srtt_ms", - stats->srtt_ms, - "srttvar_ms", - stats->srttvar_ms, - "snd_queue", - stats->snd_queue, - "rcv_queue", - stats->rcv_queue, - "snd_buffer", - stats->snd_buffer, - "xmit_total", - stats->xmit_total - ); + PyObject *dict = PyDict_New(); + PyObject *value = NULL; + + if (dict == NULL) { + return NULL; + } + +#define SET_KCP_STAT(key, expr) \ + do { \ + value = (expr); \ + if (value == NULL) { \ + Py_DECREF(dict); \ + return NULL; \ + } \ + if (PyDict_SetItemString(dict, (key), value) != 0) { \ + Py_DECREF(value); \ + Py_DECREF(dict); \ + return NULL; \ + } \ + Py_DECREF(value); \ + value = NULL; \ + } while (0) + + SET_KCP_STAT("connected", PyLong_FromLong(stats->connected)); + SET_KCP_STAT("conv", PyLong_FromUnsignedLong(stats->conv)); + SET_KCP_STAT("rto_ms", PyLong_FromUnsignedLong(stats->rto_ms)); + SET_KCP_STAT("srtt_ms", PyLong_FromLong(stats->srtt_ms)); + SET_KCP_STAT("srttvar_ms", PyLong_FromLong(stats->srttvar_ms)); + SET_KCP_STAT("snd_wnd", PyLong_FromUnsignedLong(stats->snd_wnd)); + SET_KCP_STAT("rmt_wnd", PyLong_FromUnsignedLong(stats->rmt_wnd)); + SET_KCP_STAT("inflight", PyLong_FromUnsignedLong(stats->inflight)); + SET_KCP_STAT("window_limit", PyLong_FromUnsignedLong(stats->window_limit)); + SET_KCP_STAT("window_pressure_pct", PyFloat_FromDouble(stats->window_pressure_pct)); + SET_KCP_STAT("snd_queue", PyLong_FromUnsignedLong(stats->snd_queue)); + SET_KCP_STAT("rcv_queue", PyLong_FromUnsignedLong(stats->rcv_queue)); + SET_KCP_STAT("snd_buffer", PyLong_FromUnsignedLong(stats->snd_buffer)); + SET_KCP_STAT("out_segs_total", PyLong_FromUnsignedLongLong((unsigned long long) stats->out_segs_total)); + SET_KCP_STAT("retrans_total", PyLong_FromUnsignedLongLong((unsigned long long) stats->retrans_total)); + SET_KCP_STAT("fast_retrans_total", PyLong_FromUnsignedLongLong((unsigned long long) stats->fast_retrans_total)); + SET_KCP_STAT("lost_total", PyLong_FromUnsignedLongLong((unsigned long long) stats->lost_total)); + SET_KCP_STAT("repeat_total", PyLong_FromUnsignedLongLong((unsigned long long) stats->repeat_total)); + SET_KCP_STAT("xmit_total", PyLong_FromUnsignedLong(stats->xmit_total)); + +#undef SET_KCP_STAT + + return dict; } static PyObject *PyOmniSession_new(PyTypeObject *type, PyObject *args, PyObject *kwargs) { diff --git a/python/omnisocket/omnisocket_client.c b/python/omnisocket/omnisocket_client.c index 2df0b3d..b181b23 100644 --- a/python/omnisocket/omnisocket_client.c +++ b/python/omnisocket/omnisocket_client.c @@ -267,9 +267,19 @@ void omnisocket_session_kcp_stats_snapshot(omnisocket_session_t *session, omniso out_stats->rto_ms = runtime_stats.rto_ms; out_stats->srtt_ms = runtime_stats.srtt_ms; out_stats->srttvar_ms = runtime_stats.srttvar_ms; + out_stats->snd_wnd = runtime_stats.snd_wnd; + out_stats->rmt_wnd = runtime_stats.rmt_wnd; + out_stats->inflight = runtime_stats.inflight; + out_stats->window_limit = runtime_stats.window_limit; + out_stats->window_pressure_pct = runtime_stats.window_pressure_pct; out_stats->snd_queue = runtime_stats.snd_queue; out_stats->rcv_queue = runtime_stats.rcv_queue; out_stats->snd_buffer = runtime_stats.snd_buffer; + out_stats->out_segs_total = runtime_stats.out_segs_total; + out_stats->retrans_total = runtime_stats.retrans_total; + out_stats->fast_retrans_total = runtime_stats.fast_retrans_total; + out_stats->lost_total = runtime_stats.lost_total; + out_stats->repeat_total = runtime_stats.repeat_total; out_stats->xmit_total = runtime_stats.xmit_total; } diff --git a/python/omnisocket/omnisocket_client.h b/python/omnisocket/omnisocket_client.h index cc7d011..d24c7af 100644 --- a/python/omnisocket/omnisocket_client.h +++ b/python/omnisocket/omnisocket_client.h @@ -21,9 +21,19 @@ typedef struct omnisocket_session_kcp_stats { uint32_t rto_ms; int32_t srtt_ms; int32_t srttvar_ms; + uint32_t snd_wnd; + uint32_t rmt_wnd; + uint32_t inflight; + uint32_t window_limit; + double window_pressure_pct; uint32_t snd_queue; uint32_t rcv_queue; uint32_t snd_buffer; + uint64_t out_segs_total; + uint64_t retrans_total; + uint64_t fast_retrans_total; + uint64_t lost_total; + uint64_t repeat_total; uint32_t xmit_total; } omnisocket_session_kcp_stats_t; diff --git a/ros-control-c/README.md b/ros-control-c/README.md index b280639..42603d0 100644 --- a/ros-control-c/README.md +++ b/ros-control-c/README.md @@ -45,7 +45,7 @@ python3 ros-control-c/robot/udp_ros_bridge.py Start the existing OmniSocket KCP hub from the repo root: ```bash -./bin/kcpserver -listen :9002 +./bin/kcpserver -listen :9002 -telemetry-peer peer-a-telemetry ``` Sender: diff --git a/ros-control-py/README.md b/ros-control-py/README.md index dd6f71d..be5c392 100644 --- a/ros-control-py/README.md +++ b/ros-control-py/README.md @@ -89,7 +89,7 @@ OmniSocket UDP: OmniSocket KCP: ```bash -./bin/kcpserver -listen :9002 +./bin/kcpserver -listen :9002 -telemetry-peer peer-a-telemetry ``` `server_addr` 不传时,节点会按 `transport` 自动选择默认值: diff --git a/ros-control-py/ROS2 Teleop over UDP.md b/ros-control-py/ROS2 Teleop over UDP.md index fe62a27..976d719 100644 --- a/ros-control-py/ROS2 Teleop over UDP.md +++ b/ros-control-py/ROS2 Teleop over UDP.md @@ -42,7 +42,7 @@ OmniSocket UDP: OmniSocket KCP: ```bash -./bin/kcpserver -listen :9002 +./bin/kcpserver -listen :9002 -telemetry-peer peer-a-telemetry ``` ### 机器人端 Receiver diff --git a/scripts/BACDauto_test.sh b/scripts/BACDauto_test.sh index 643530d..70f7300 100755 --- a/scripts/BACDauto_test.sh +++ b/scripts/BACDauto_test.sh @@ -56,6 +56,7 @@ if [ ! -x ./bin/kcpserver ]; then exit 1 fi setsid ./bin/kcpserver -listen 0.0.0.0:10909 \ + -telemetry-peer peer-a-telemetry \ -kcp-ts-debug-log logs/d-kcp-ts.jsonl \ -kcp-session-stats-log logs/d-kcp-stats.jsonl > server_console.log 2>&1 server_console.log 2>&1 srttvar_ms) != 0) { goto cleanup; } + if (record->has_snd_wnd && + kcp_session_stats_appendf(&line, &line_len, ",\"snd_wnd\":%u", record->snd_wnd) != 0) { + goto cleanup; + } + if (record->has_rmt_wnd && + kcp_session_stats_appendf(&line, &line_len, ",\"rmt_wnd\":%u", record->rmt_wnd) != 0) { + goto cleanup; + } + if (record->has_inflight && + kcp_session_stats_appendf(&line, &line_len, ",\"inflight\":%u", record->inflight) != 0) { + goto cleanup; + } + if (record->has_window_limit && + kcp_session_stats_appendf(&line, &line_len, ",\"window_limit\":%u", record->window_limit) != 0) { + goto cleanup; + } + if (record->has_window_pressure_pct && + kcp_session_stats_appendf(&line, &line_len, ",\"window_pressure_pct\":%.3f", record->window_pressure_pct) != 0) { + goto cleanup; + } if (record->has_bytes_sent && kcp_session_stats_appendf(&line, &line_len, ",\"bytes_sent\":%" PRIu64, record->bytes_sent) != 0) { goto cleanup; diff --git a/src/server_kcp_hub.c b/src/server_kcp_hub.c index 8ffbbf4..91802c4 100644 --- a/src/server_kcp_hub.c +++ b/src/server_kcp_hub.c @@ -1,8 +1,14 @@ #include "server_kcp_hub.h" +#include "cJSON.h" + +#include #include #define KCP_RELAY_MAX_DATAGRAM_SIZE (60 * 1024) +#define KCP_HUB_DEFAULT_TELEMETRY_INTERVAL_MS 500 +#define KCP_HUB_TELEMETRY_NODE_ID "hub-telemetry" +#define KCP_HUB_DEFAULT_NODE_ID "hub" typedef struct kcp_peer_entry { struct kcp_peer_entry *next; @@ -21,14 +27,29 @@ struct kcp_hub { latency_logger_t *logger; kcp_session_stats_logger_t *stats_logger; int stats_interval_ms; + char telemetry_peer_id[OMNI_MAX_PEER_ID]; + int telemetry_interval_ms; + pthread_t telemetry_thread; + int telemetry_thread_started; int relay_fd; int relay_configured; int relay_learn_peer; struct sockaddr_storage relay_peer_addr; socklen_t relay_peer_addr_len; - int closed; + atomic_int closed; }; +static int kcp_hub_peer_id_has_suffix(const char *peer_id, const char *suffix); +static int kcp_hub_deliver_to_local_peer(kcp_hub_t *hub, const message_t *msg); + +static int kcp_hub_peer_is_telemetry(const char *peer_id) { + return kcp_hub_peer_id_has_suffix(peer_id, "-telemetry"); +} + +static const char *kcp_hub_peer_node_id(const char *peer_id) { + return kcp_hub_peer_is_telemetry(peer_id) ? KCP_HUB_TELEMETRY_NODE_ID : KCP_HUB_DEFAULT_NODE_ID; +} + static void kcp_hub_unregister(kcp_hub_t *hub, const char *peer_id, kcp_conn_t *conn) { kcp_peer_entry_t *prev = NULL; kcp_peer_entry_t *entry; @@ -90,9 +111,201 @@ static int kcp_hub_configure_peer_transport(kcp_conn_t *conn, const char *peer_i kcp_conn_options_set_video_defaults(&options); return kcp_conn_apply_options(conn, &options); } + if (kcp_hub_peer_is_telemetry(peer_id)) { + kcp_conn_options_set_telemetry_defaults(&options); + return kcp_conn_apply_options(conn, &options); + } return 0; } +static int kcp_hub_add_runtime_stats_json(cJSON *object, const kcp_runtime_stats_t *stats) { + if (object == NULL || stats == NULL) { + errno = EINVAL; + return -1; + } + if (cJSON_AddNumberToObject(object, "connected", stats->connected) == NULL || + cJSON_AddNumberToObject(object, "conv", (double) stats->conv) == NULL || + cJSON_AddNumberToObject(object, "rto_ms", (double) stats->rto_ms) == NULL || + cJSON_AddNumberToObject(object, "srtt_ms", (double) stats->srtt_ms) == NULL || + cJSON_AddNumberToObject(object, "srttvar_ms", (double) stats->srttvar_ms) == NULL || + cJSON_AddNumberToObject(object, "snd_wnd", (double) stats->snd_wnd) == NULL || + cJSON_AddNumberToObject(object, "rmt_wnd", (double) stats->rmt_wnd) == NULL || + cJSON_AddNumberToObject(object, "inflight", (double) stats->inflight) == NULL || + cJSON_AddNumberToObject(object, "window_limit", (double) stats->window_limit) == NULL || + cJSON_AddNumberToObject(object, "window_pressure_pct", stats->window_pressure_pct) == NULL || + cJSON_AddNumberToObject(object, "snd_queue", (double) stats->snd_queue) == NULL || + cJSON_AddNumberToObject(object, "rcv_queue", (double) stats->rcv_queue) == NULL || + cJSON_AddNumberToObject(object, "snd_buffer", (double) stats->snd_buffer) == NULL || + cJSON_AddNumberToObject(object, "out_segs_total", (double) stats->out_segs_total) == NULL || + cJSON_AddNumberToObject(object, "retrans_total", (double) stats->retrans_total) == NULL || + cJSON_AddNumberToObject(object, "fast_retrans_total", (double) stats->fast_retrans_total) == NULL || + cJSON_AddNumberToObject(object, "lost_total", (double) stats->lost_total) == NULL || + cJSON_AddNumberToObject(object, "repeat_total", (double) stats->repeat_total) == NULL || + cJSON_AddNumberToObject(object, "xmit_total", (double) stats->xmit_total) == NULL) { + errno = ENOMEM; + return -1; + } + return 0; +} + +static int kcp_hub_build_telemetry_payload_locked(kcp_hub_t *hub, char **out_payload) { + cJSON *root = NULL; + cJSON *sessions = NULL; + char *ts_unix_nano_text = NULL; + char *payload = NULL; + kcp_peer_entry_t *entry; + + if (hub == NULL || out_payload == NULL) { + errno = EINVAL; + return -1; + } + *out_payload = NULL; + + root = cJSON_CreateObject(); + if (root == NULL) { + errno = ENOMEM; + return -1; + } + sessions = cJSON_AddArrayToObject(root, "sessions"); + if (sessions == NULL) { + cJSON_Delete(root); + errno = ENOMEM; + return -1; + } + + ts_unix_nano_text = omni_strdup_printf("%" PRId64, omni_now_unix_nano()); + if (ts_unix_nano_text == NULL) { + cJSON_Delete(root); + return -1; + } + if (cJSON_AddStringToObject(root, "type", "hub_kcp_snapshot") == NULL || + cJSON_AddStringToObject(root, "ts_unix_nano", ts_unix_nano_text) == NULL || + cJSON_AddStringToObject(root, "node_id", KCP_HUB_DEFAULT_NODE_ID) == NULL) { + free(ts_unix_nano_text); + cJSON_Delete(root); + errno = ENOMEM; + return -1; + } + free(ts_unix_nano_text); + + for (entry = hub->peers; entry != NULL; entry = entry->next) { + cJSON *session = NULL; + kcp_runtime_stats_t stats; + struct sockaddr_storage local_addr; + struct sockaddr_storage remote_addr; + socklen_t local_len = sizeof(local_addr); + socklen_t remote_len = sizeof(remote_addr); + char local_text[OMNI_MAX_ADDR_TEXT] = ""; + char remote_text[OMNI_MAX_ADDR_TEXT] = ""; + + if (entry->conn == NULL || entry->peer_id[0] == '\0' || kcp_hub_peer_is_telemetry(entry->peer_id)) { + continue; + } + + memset(&stats, 0, sizeof(stats)); + kcp_conn_runtime_stats_snapshot(entry->conn, &stats); + if (kcp_conn_local_addr(entry->conn, &local_addr, &local_len) != 0) { + local_len = 0; + } + if (kcp_conn_remote_addr(entry->conn, &remote_addr, &remote_len) != 0) { + remote_len = 0; + } + if (local_len > 0) { + omni_sockaddr_to_string((const struct sockaddr *) &local_addr, local_len, local_text, sizeof(local_text)); + } + if (remote_len > 0) { + omni_sockaddr_to_string((const struct sockaddr *) &remote_addr, remote_len, remote_text, sizeof(remote_text)); + } + + session = cJSON_CreateObject(); + if (session == NULL) { + cJSON_Delete(root); + errno = ENOMEM; + return -1; + } + cJSON_AddItemToArray(sessions, session); + if (cJSON_AddStringToObject(session, "peer_id", entry->peer_id) == NULL || + cJSON_AddStringToObject(session, "local_addr", local_text) == NULL || + cJSON_AddStringToObject(session, "remote_addr", remote_text) == NULL || + kcp_hub_add_runtime_stats_json(session, &stats) != 0) { + cJSON_Delete(root); + errno = ENOMEM; + return -1; + } + } + + payload = cJSON_PrintUnformatted(root); + cJSON_Delete(root); + if (payload == NULL) { + errno = ENOMEM; + return -1; + } + *out_payload = payload; + return 0; +} + +static int kcp_hub_push_telemetry_snapshot(kcp_hub_t *hub) { + message_t msg; + char *payload = NULL; + char telemetry_peer_id[OMNI_MAX_PEER_ID]; + int rc; + + if (hub == NULL) { + errno = EINVAL; + return -1; + } + + pthread_rwlock_rdlock(&hub->lock); + if (hub->telemetry_peer_id[0] == '\0' || kcp_hub_find_peer(hub, hub->telemetry_peer_id) == NULL) { + pthread_rwlock_unlock(&hub->lock); + return 0; + } + snprintf(telemetry_peer_id, sizeof(telemetry_peer_id), "%s", hub->telemetry_peer_id); + rc = kcp_hub_build_telemetry_payload_locked(hub, &payload); + pthread_rwlock_unlock(&hub->lock); + if (rc != 0) { + return -1; + } + + protocol_message_init(&msg); + msg.type = MSG_TYPE_TEXT; + snprintf(msg.from, sizeof(msg.from), "%s", SERVER_PEER_ID); + snprintf(msg.to, sizeof(msg.to), "%s", telemetry_peer_id); + msg.body = (uint8_t *) omni_strdup(payload == NULL ? "" : payload); + cJSON_free(payload); + if (msg.body == NULL) { + return -1; + } + msg.body_len = strlen((const char *) msg.body); + rc = kcp_hub_deliver_to_local_peer(hub, &msg); + protocol_message_clear(&msg); + if (rc != 0 && errno == ENOENT) { + return 0; + } + return rc; +} + +static void *kcp_hub_telemetry_thread_main(void *arg) { + kcp_hub_t *hub = (kcp_hub_t *) arg; + + while (!atomic_load(&hub->closed)) { + int interval_ms = KCP_HUB_DEFAULT_TELEMETRY_INTERVAL_MS; + + pthread_rwlock_rdlock(&hub->lock); + if (hub->telemetry_interval_ms > 0) { + interval_ms = hub->telemetry_interval_ms; + } + pthread_rwlock_unlock(&hub->lock); + + (void) kcp_hub_push_telemetry_snapshot(hub); + if (atomic_load(&hub->closed)) { + break; + } + usleep((useconds_t) interval_ms * 1000U); + } + return NULL; +} + static int kcp_hub_send_server_error(kcp_conn_t *conn, const char *to, const char *message) { message_t msg; protocol_message_init(&msg); @@ -389,11 +602,6 @@ static int kcp_hub_register_conn(kcp_hub_t *hub, kcp_conn_t *conn, char *peer_id pthread_rwlock_unlock(&hub->lock); snprintf(peer_id, peer_id_len, "%s", msg.from); - if (kcp_hub_configure_peer_transport(conn, peer_id) != 0) { - kcp_hub_unregister(hub, peer_id, conn); - protocol_message_clear(&msg); - return -1; - } protocol_message_clear(&msg); return 0; } @@ -414,7 +622,9 @@ kcp_hub_t *kcp_hub_new(latency_logger_t *logger, kcp_session_stats_logger_t *sta hub->logger = logger; hub->stats_logger = stats_logger; hub->stats_interval_ms = stats_interval_ms > 0 ? stats_interval_ms : KCP_DEFAULT_STATS_INTERVAL_MS; + hub->telemetry_interval_ms = KCP_HUB_DEFAULT_TELEMETRY_INTERVAL_MS; hub->relay_fd = -1; + atomic_init(&hub->closed, 0); return hub; } @@ -423,13 +633,13 @@ int kcp_hub_serve_listener(kcp_hub_t *hub, kcp_listener_t *listener) { errno = EINVAL; return -1; } - while (!hub->closed) { + while (!atomic_load(&hub->closed)) { kcp_conn_t *conn = kcp_listener_accept(listener); kcp_session_thread_ctx_t *ctx; pthread_t thread; if (conn == NULL) { - if (hub->closed) { + if (atomic_load(&hub->closed)) { return 0; } return -1; @@ -455,6 +665,7 @@ int kcp_hub_serve_listener(kcp_hub_t *hub, kcp_listener_t *listener) { int kcp_hub_serve_session(kcp_hub_t *hub, kcp_conn_t *conn) { char peer_id[OMNI_MAX_PEER_ID]; + const char *node_id; int rc = 0; if (hub == NULL || conn == NULL) { @@ -462,12 +673,20 @@ int kcp_hub_serve_session(kcp_hub_t *hub, kcp_conn_t *conn) { return -1; } peer_id[0] = '\0'; - if (kcp_conn_configure_runtime(conn, hub->logger, OMNI_NODE_ROLE_SERVER, "hub", hub->stats_logger, hub->stats_interval_ms) != 0) { + if (kcp_hub_register_conn(hub, conn, peer_id, sizeof(peer_id)) != 0) { kcp_conn_close(conn); kcp_conn_free(conn); return -1; } - if (kcp_hub_register_conn(hub, conn, peer_id, sizeof(peer_id)) != 0) { + if (kcp_hub_configure_peer_transport(conn, peer_id) != 0) { + kcp_hub_unregister(hub, peer_id, conn); + kcp_conn_close(conn); + kcp_conn_free(conn); + return -1; + } + node_id = kcp_hub_peer_node_id(peer_id); + if (kcp_conn_configure_runtime(conn, hub->logger, OMNI_NODE_ROLE_SERVER, node_id, hub->stats_logger, hub->stats_interval_ms) != 0) { + kcp_hub_unregister(hub, peer_id, conn); kcp_conn_close(conn); kcp_conn_free(conn); return -1; @@ -512,6 +731,34 @@ int kcp_hub_set_relay(kcp_hub_t *hub, int relay_fd, const struct sockaddr *peer_ return 0; } +int kcp_hub_set_telemetry(kcp_hub_t *hub, const char *peer_id, int interval_ms) { + int start_thread = 0; + + if (hub == NULL || peer_id == NULL) { + errno = EINVAL; + return -1; + } + pthread_rwlock_wrlock(&hub->lock); + snprintf(hub->telemetry_peer_id, sizeof(hub->telemetry_peer_id), "%s", peer_id); + hub->telemetry_interval_ms = interval_ms > 0 ? interval_ms : KCP_HUB_DEFAULT_TELEMETRY_INTERVAL_MS; + if (!hub->telemetry_thread_started && hub->telemetry_peer_id[0] != '\0') { + start_thread = 1; + hub->telemetry_thread_started = 1; + } + pthread_rwlock_unlock(&hub->lock); + + if (start_thread) { + if (pthread_create(&hub->telemetry_thread, NULL, kcp_hub_telemetry_thread_main, hub) != 0) { + pthread_rwlock_wrlock(&hub->lock); + hub->telemetry_thread_started = 0; + hub->telemetry_peer_id[0] = '\0'; + pthread_rwlock_unlock(&hub->lock); + return -1; + } + } + return 0; +} + int kcp_hub_serve_relay(kcp_hub_t *hub) { uint8_t buffer[KCP_RELAY_MAX_DATAGRAM_SIZE]; @@ -519,7 +766,7 @@ int kcp_hub_serve_relay(kcp_hub_t *hub) { errno = EINVAL; return -1; } - while (!hub->closed) { + while (!atomic_load(&hub->closed)) { struct sockaddr_storage source; socklen_t source_len = sizeof(source); ssize_t n; @@ -537,7 +784,7 @@ int kcp_hub_serve_relay(kcp_hub_t *hub) { n = recvfrom(relay_fd, buffer, sizeof(buffer), 0, (struct sockaddr *) &source, &source_len); if (n < 0) { - if (hub->closed) { + if (atomic_load(&hub->closed)) { return 0; } if (errno == EINTR) { @@ -568,8 +815,7 @@ int kcp_hub_close(kcp_hub_t *hub) { if (hub == NULL) { return 0; } - if (!hub->closed) { - hub->closed = 1; + if (!atomic_exchange(&hub->closed, 1)) { if (hub->relay_fd >= 0) { close(hub->relay_fd); hub->relay_fd = -1; @@ -586,6 +832,9 @@ void kcp_hub_free(kcp_hub_t *hub) { return; } kcp_hub_close(hub); + if (hub->telemetry_thread_started) { + pthread_join(hub->telemetry_thread, NULL); + } for (entry = hub->peers; entry != NULL; entry = next) { next = entry->next; if (entry->conn != NULL) { diff --git a/src/transport_kcp.c b/src/transport_kcp.c index 47bb55e..8fd21e1 100644 --- a/src/transport_kcp.c +++ b/src/transport_kcp.c @@ -62,6 +62,7 @@ struct kcp_conn { int stats_thread_started; kcp_conn_options_t options; int update_interval_ms; + atomic_uint_fast64_t total_out_segs; uint64_t pending_bytes_sent; uint64_t pending_bytes_received; uint64_t pending_in_pkts; @@ -129,6 +130,10 @@ struct kcp_process_sampler { uint64_t prev_out_segs; uint64_t prev_in_errs; uint64_t prev_kcp_in_errs; + uint64_t prev_retrans_segs; + uint64_t prev_fast_retrans_segs; + uint64_t prev_lost_segs; + uint64_t prev_repeat_segs; atomic_uint_fast64_t bytes_sent; atomic_uint_fast64_t bytes_received; atomic_uint_fast64_t in_pkts; @@ -185,6 +190,20 @@ void kcp_conn_options_set_video_defaults(kcp_conn_options_t *options) { options->mtu = KCP_VIDEO_MTU; } +void kcp_conn_options_set_telemetry_defaults(kcp_conn_options_t *options) { + if (options == NULL) { + return; + } + memset(options, 0, sizeof(*options)); + options->nodelay = KCP_TELEMETRY_NODELAY; + options->interval_ms = KCP_TELEMETRY_INTERVAL_MS; + options->resend = KCP_TELEMETRY_RESEND; + options->nc = KCP_TELEMETRY_NC; + options->sndwnd = KCP_TELEMETRY_SND_WND; + options->rcvwnd = KCP_TELEMETRY_RCV_WND; + options->mtu = KCP_TELEMETRY_MTU; +} + static int kcp_conn_validate_options(const kcp_conn_options_t *options) { if (options == NULL) { errno = EINVAL; @@ -328,6 +347,7 @@ static void kcp_conn_record_send(kcp_conn_t *conn, int packet_bytes, size_t segm if (conn == NULL) { return; } + atomic_fetch_add_explicit(&conn->total_out_segs, (uint64_t) segments, memory_order_relaxed); if (conn->process_sampler != NULL) { kcp_process_sampler_record_send(conn->process_sampler, packet_bytes, segments); return; @@ -420,7 +440,14 @@ static void kcp_process_sampler_remove_conn(kcp_process_sampler_t *sampler, kcp_ } } -static void kcp_process_sampler_collect_gauges(kcp_process_sampler_t *sampler, uint64_t *snd_queue, uint64_t *rcv_queue, uint64_t *snd_buffer) { +static void kcp_process_sampler_collect_gauges(kcp_process_sampler_t *sampler, + uint64_t *snd_queue, + uint64_t *rcv_queue, + uint64_t *snd_buffer, + uint64_t *retrans_segs, + uint64_t *fast_retrans_segs, + uint64_t *lost_segs, + uint64_t *repeat_segs) { kcp_conn_t *conn; if (snd_queue != NULL) { @@ -432,6 +459,18 @@ static void kcp_process_sampler_collect_gauges(kcp_process_sampler_t *sampler, u if (snd_buffer != NULL) { *snd_buffer = 0; } + if (retrans_segs != NULL) { + *retrans_segs = 0; + } + if (fast_retrans_segs != NULL) { + *fast_retrans_segs = 0; + } + if (lost_segs != NULL) { + *lost_segs = 0; + } + if (repeat_segs != NULL) { + *repeat_segs = 0; + } if (sampler == NULL) { return; } @@ -449,6 +488,18 @@ static void kcp_process_sampler_collect_gauges(kcp_process_sampler_t *sampler, u if (snd_buffer != NULL) { *snd_buffer += conn->kcp->nsnd_buf; } + if (lost_segs != NULL) { + *lost_segs += conn->kcp->timeout_retrans_total; + } + if (fast_retrans_segs != NULL) { + *fast_retrans_segs += conn->kcp->fast_retrans_total; + } + if (retrans_segs != NULL) { + *retrans_segs += conn->kcp->timeout_retrans_total + conn->kcp->fast_retrans_total; + } + if (repeat_segs != NULL) { + *repeat_segs += conn->kcp->duplicate_recv_total; + } } pthread_mutex_unlock(&conn->kcp_mu); } @@ -468,6 +519,10 @@ static void kcp_process_sampler_log_snapshot(kcp_process_sampler_t *sampler, con uint64_t snd_queue = 0; uint64_t rcv_queue = 0; uint64_t snd_buffer = 0; + uint64_t retrans_segs = 0; + uint64_t fast_retrans_segs = 0; + uint64_t lost_segs = 0; + uint64_t repeat_segs = 0; if (sampler == NULL || sampler->logger == NULL) { return; @@ -481,7 +536,15 @@ static void kcp_process_sampler_log_snapshot(kcp_process_sampler_t *sampler, con out_segs = atomic_load_explicit(&sampler->out_segs, memory_order_relaxed); in_errs = atomic_load_explicit(&sampler->in_errs, memory_order_relaxed); kcp_in_errs = atomic_load_explicit(&sampler->kcp_in_errs, memory_order_relaxed); - kcp_process_sampler_collect_gauges(sampler, &snd_queue, &rcv_queue, &snd_buffer); + kcp_process_sampler_collect_gauges( + sampler, + &snd_queue, + &rcv_queue, + &snd_buffer, + &retrans_segs, + &fast_retrans_segs, + &lost_segs, + &repeat_segs); memset(&record, 0, sizeof(record)); snprintf(record.record_type, sizeof(record.record_type), "%s", KCP_SESSION_STATS_RECORD_PROCESS_SAMPLE); @@ -502,6 +565,14 @@ static void kcp_process_sampler_log_snapshot(kcp_process_sampler_t *sampler, con record.in_segs = kcp_counter_diff(sampler->prev_in_segs, in_segs); record.has_out_segs = 1; record.out_segs = kcp_counter_diff(sampler->prev_out_segs, out_segs); + record.has_retrans_segs = 1; + record.retrans_segs = kcp_counter_diff(sampler->prev_retrans_segs, retrans_segs); + record.has_fast_retrans_segs = 1; + record.fast_retrans_segs = kcp_counter_diff(sampler->prev_fast_retrans_segs, fast_retrans_segs); + record.has_lost_segs = 1; + record.lost_segs = kcp_counter_diff(sampler->prev_lost_segs, lost_segs); + record.has_repeat_segs = 1; + record.repeat_segs = kcp_counter_diff(sampler->prev_repeat_segs, repeat_segs); record.has_in_errs = 1; record.in_errs = kcp_counter_diff(sampler->prev_in_errs, in_errs); record.has_kcp_in_errs = 1; @@ -521,6 +592,10 @@ static void kcp_process_sampler_log_snapshot(kcp_process_sampler_t *sampler, con sampler->prev_out_pkts = out_pkts; sampler->prev_in_segs = in_segs; sampler->prev_out_segs = out_segs; + sampler->prev_retrans_segs = retrans_segs; + sampler->prev_fast_retrans_segs = fast_retrans_segs; + sampler->prev_lost_segs = lost_segs; + sampler->prev_repeat_segs = repeat_segs; sampler->prev_in_errs = in_errs; sampler->prev_kcp_in_errs = kcp_in_errs; @@ -1006,6 +1081,11 @@ static void kcp_log_session_snapshot(kcp_conn_t *conn, const char *reason) { socklen_t local_len = sizeof(local_addr); char local_text[OMNI_MAX_ADDR_TEXT]; char remote_text[OMNI_MAX_ADDR_TEXT]; + uint32_t inflight = 0; + uint32_t window_limit = 0; + uint64_t out_segs_total = 0; + uint64_t fast_retrans_total = 0; + uint64_t lost_total = 0; if (conn == NULL || conn->stats_logger == NULL || conn->sock_state == NULL || conn->kcp == NULL) { return; } @@ -1029,7 +1109,38 @@ static void kcp_log_session_snapshot(kcp_conn_t *conn, const char *reason) { record.srtt_ms = conn->kcp->rx_srtt; record.has_srttvar_ms = 1; record.srttvar_ms = conn->kcp->rx_rttval; + record.has_snd_wnd = 1; + record.snd_wnd = conn->kcp->snd_wnd; + record.has_rmt_wnd = 1; + record.rmt_wnd = conn->kcp->rmt_wnd; + inflight = conn->kcp->snd_nxt - conn->kcp->snd_una; + window_limit = conn->kcp->snd_wnd < conn->kcp->rmt_wnd ? conn->kcp->snd_wnd : conn->kcp->rmt_wnd; + record.has_inflight = 1; + record.inflight = inflight; + record.has_window_limit = 1; + record.window_limit = window_limit; + record.has_window_pressure_pct = 1; + record.window_pressure_pct = window_limit == 0 ? 0.0 : ((double) inflight * 100.0) / (double) window_limit; + record.has_ring_buffer_snd_queue = 1; + record.ring_buffer_snd_queue = conn->kcp->nsnd_que; + record.has_ring_buffer_rcv_queue = 1; + record.ring_buffer_rcv_queue = conn->kcp->nrcv_que; + record.has_ring_buffer_snd_buffer = 1; + record.ring_buffer_snd_buffer = conn->kcp->nsnd_buf; + lost_total = conn->kcp->timeout_retrans_total; + fast_retrans_total = conn->kcp->fast_retrans_total; + record.has_retrans_segs = 1; + record.retrans_segs = lost_total + fast_retrans_total; + record.has_fast_retrans_segs = 1; + record.fast_retrans_segs = fast_retrans_total; + record.has_lost_segs = 1; + record.lost_segs = lost_total; + record.has_repeat_segs = 1; + record.repeat_segs = conn->kcp->duplicate_recv_total; pthread_mutex_unlock(&conn->kcp_mu); + out_segs_total = atomic_load_explicit(&conn->total_out_segs, memory_order_relaxed); + record.has_out_segs = 1; + record.out_segs = out_segs_total; (void) kcp_session_stats_log(conn->stats_logger, &record); } @@ -1788,6 +1899,18 @@ int kcp_conn_local_addr(const kcp_conn_t *conn, struct sockaddr_storage *addr, s return 0; } +int kcp_conn_remote_addr(const kcp_conn_t *conn, struct sockaddr_storage *addr, socklen_t *addr_len) { + if (conn == NULL || addr == NULL || addr_len == NULL) { + errno = EINVAL; + return -1; + } + if (conn->remote_addr_len == 0) { + errno = ENOTCONN; + return -1; + } + return omni_clone_sockaddr((const struct sockaddr *) &conn->remote_addr, conn->remote_addr_len, addr, addr_len); +} + void kcp_conn_runtime_stats_snapshot(kcp_conn_t *conn, kcp_runtime_stats_t *out_stats) { if (out_stats == NULL) { return; @@ -1805,9 +1928,21 @@ void kcp_conn_runtime_stats_snapshot(kcp_conn_t *conn, kcp_runtime_stats_t *out_ out_stats->rto_ms = conn->kcp->rx_rto; out_stats->srtt_ms = conn->kcp->rx_srtt; out_stats->srttvar_ms = conn->kcp->rx_rttval; + out_stats->snd_wnd = conn->kcp->snd_wnd; + out_stats->rmt_wnd = conn->kcp->rmt_wnd; + out_stats->inflight = conn->kcp->snd_nxt - conn->kcp->snd_una; + out_stats->window_limit = conn->kcp->snd_wnd < conn->kcp->rmt_wnd ? conn->kcp->snd_wnd : conn->kcp->rmt_wnd; + out_stats->window_pressure_pct = out_stats->window_limit == 0 + ? 0.0 + : ((double) out_stats->inflight * 100.0) / (double) out_stats->window_limit; out_stats->snd_queue = conn->kcp->nsnd_que; out_stats->rcv_queue = conn->kcp->nrcv_que; out_stats->snd_buffer = conn->kcp->nsnd_buf; + out_stats->out_segs_total = atomic_load_explicit(&conn->total_out_segs, memory_order_relaxed); + out_stats->fast_retrans_total = conn->kcp->fast_retrans_total; + out_stats->lost_total = conn->kcp->timeout_retrans_total; + out_stats->retrans_total = out_stats->lost_total + out_stats->fast_retrans_total; + out_stats->repeat_total = conn->kcp->duplicate_recv_total; out_stats->xmit_total = conn->kcp->xmit; } else { out_stats->connected = 0; diff --git a/third_party/kcp/ikcp.c b/third_party/kcp/ikcp.c index b6176b9..593ae41 100644 --- a/third_party/kcp/ikcp.c +++ b/third_party/kcp/ikcp.c @@ -298,6 +298,9 @@ ikcpcb *ikcp_create(IUINT32 conv, void *user) kcp->fastlimit = IKCP_FASTACK_LIMIT; kcp->nocwnd = 0; kcp->xmit = 0; + kcp->timeout_retrans_total = 0; + kcp->fast_retrans_total = 0; + kcp->duplicate_recv_total = 0; kcp->dead_link = IKCP_DEADLINK; kcp->output = NULL; kcp->writelog = NULL; @@ -788,6 +791,7 @@ void ikcp_parse_data(ikcpcb *kcp, IKCPSEG *newseg) } else { + kcp->duplicate_recv_total++; ikcp_segment_delete(kcp, newseg); } @@ -1192,6 +1196,7 @@ void ikcp_flush(ikcpcb *kcp) needsend = 1; segment->xmit++; kcp->xmit++; + kcp->timeout_retrans_total++; if (kcp->nodelay == 0) { segment->rto += _imax_(segment->rto, (IUINT32)kcp->rx_rto); @@ -1211,6 +1216,7 @@ void ikcp_flush(ikcpcb *kcp) { needsend = 1; segment->xmit++; + kcp->fast_retrans_total++; segment->fastack = 0; segment->resendts = current + segment->rto; change++; diff --git a/third_party/kcp/ikcp.h b/third_party/kcp/ikcp.h index 5630269..54106f2 100644 --- a/third_party/kcp/ikcp.h +++ b/third_party/kcp/ikcp.h @@ -312,6 +312,9 @@ struct IKCPCB IUINT32 *acklist; IUINT32 ackcount; IUINT32 ackblock; + IUINT64 timeout_retrans_total; + IUINT64 fast_retrans_total; + IUINT64 duplicate_recv_total; void *user; char *buffer; int fastresend;