#include "omni_common.h" #include #include #include #include #include int64_t omni_now_unix_nano(void) { struct timespec ts; clock_gettime(CLOCK_REALTIME, &ts); return (int64_t) ts.tv_sec * 1000000000LL + ts.tv_nsec; } uint32_t omni_now_millis32(void) { struct timespec ts; uint64_t ms; clock_gettime(CLOCK_MONOTONIC, &ts); ms = (uint64_t) ts.tv_sec * 1000ULL + (uint64_t) (ts.tv_nsec / 1000000L); return (uint32_t) (ms & 0xffffffffu); } int omni_set_nonblocking(int fd, int enabled) { int flags = fcntl(fd, F_GETFL, 0); if (flags < 0) { return -1; } if (enabled) { flags |= O_NONBLOCK; } else { flags &= ~O_NONBLOCK; } return fcntl(fd, F_SETFL, flags); } int omni_parse_sockaddr(const char *raw, int passive, struct sockaddr_storage *addr, socklen_t *addr_len, int *family_out) { struct addrinfo hints; struct addrinfo *result = NULL; char host_copy[OMNI_MAX_ADDR_TEXT]; char port_copy[32]; const char *host = NULL; const char *service = NULL; const char *last_colon; size_t host_len; if (raw == NULL || addr == NULL || addr_len == NULL) { errno = EINVAL; return -1; } memset(&hints, 0, sizeof(hints)); hints.ai_family = AF_UNSPEC; hints.ai_socktype = SOCK_DGRAM; hints.ai_flags = passive ? AI_PASSIVE : 0; last_colon = strrchr(raw, ':'); if (last_colon == NULL) { host = passive ? NULL : raw; service = passive ? raw : "0"; } else { host_len = (size_t) (last_colon - raw); if (host_len >= sizeof(host_copy)) { errno = ENAMETOOLONG; return -1; } memcpy(host_copy, raw, host_len); host_copy[host_len] = '\0'; snprintf(port_copy, sizeof(port_copy), "%s", last_colon + 1); host = host_len == 0 ? NULL : host_copy; service = port_copy; } if (getaddrinfo(host, service, &hints, &result) != 0 || result == NULL) { errno = EINVAL; return -1; } memcpy(addr, result->ai_addr, result->ai_addrlen); *addr_len = (socklen_t) result->ai_addrlen; if (family_out != NULL) { *family_out = result->ai_family; } freeaddrinfo(result); return 0; } int omni_clone_sockaddr(const struct sockaddr *src, socklen_t src_len, struct sockaddr_storage *dst, socklen_t *dst_len) { if (src == NULL || dst == NULL || dst_len == NULL || src_len > sizeof(*dst)) { errno = EINVAL; return -1; } memset(dst, 0, sizeof(*dst)); memcpy(dst, src, src_len); *dst_len = src_len; return 0; } const char *omni_sockaddr_to_string(const struct sockaddr *addr, socklen_t addr_len, char *buffer, size_t buffer_len) { char host[NI_MAXHOST]; char service[NI_MAXSERV]; if (buffer == NULL || buffer_len == 0) { return ""; } if (addr == NULL) { snprintf(buffer, buffer_len, ""); return buffer; } if (getnameinfo(addr, addr_len, host, sizeof(host), service, sizeof(service), NI_NUMERICHOST | NI_NUMERICSERV) != 0) { snprintf(buffer, buffer_len, ""); return buffer; } if (addr->sa_family == AF_INET6) { snprintf(buffer, buffer_len, "[%s]:%s", host, service); } else { snprintf(buffer, buffer_len, "%s:%s", host, service); } return buffer; } int omni_bind_device(int fd, const char *device) { #ifdef __linux__ if (device == NULL || device[0] == '\0') { return 0; } return setsockopt(fd, SOL_SOCKET, SO_BINDTODEVICE, device, (socklen_t) strlen(device)); #else (void) fd; (void) device; errno = ENOTSUP; return -1; #endif } static int omni_mkdir_single(const char *path) { if (mkdir(path, 0755) == 0 || errno == EEXIST) { return 0; } return -1; } int omni_ensure_dir(const char *path) { char tmp[PATH_MAX]; size_t i; if (path == NULL || path[0] == '\0') { return 0; } if (strlen(path) >= sizeof(tmp)) { errno = ENAMETOOLONG; return -1; } snprintf(tmp, sizeof(tmp), "%s", path); for (i = 1; tmp[i] != '\0'; ++i) { if (tmp[i] == '/') { tmp[i] = '\0'; if (tmp[0] != '\0' && omni_mkdir_single(tmp) != 0) { return -1; } tmp[i] = '/'; } } return omni_mkdir_single(tmp); } int omni_ensure_parent_dir(const char *path) { char tmp[PATH_MAX]; char *slash; if (path == NULL || path[0] == '\0') { return 0; } if (strlen(path) >= sizeof(tmp)) { errno = ENAMETOOLONG; return -1; } snprintf(tmp, sizeof(tmp), "%s", path); slash = strrchr(tmp, '/'); if (slash == NULL) { return 0; } if (slash == tmp) { return omni_mkdir_single("/"); } *slash = '\0'; return omni_ensure_dir(tmp); } int omni_read_file(const char *path, uint8_t **out, size_t *out_len) { FILE *file; long size; uint8_t *buffer; if (out == NULL || out_len == NULL) { errno = EINVAL; return -1; } *out = NULL; *out_len = 0; file = fopen(path, "rb"); if (file == NULL) { return -1; } if (fseek(file, 0, SEEK_END) != 0) { fclose(file); return -1; } size = ftell(file); if (size < 0) { fclose(file); return -1; } if (fseek(file, 0, SEEK_SET) != 0) { fclose(file); return -1; } buffer = (uint8_t *) malloc((size_t) size); if (size > 0 && buffer == NULL) { fclose(file); errno = ENOMEM; return -1; } if ((size_t) size > 0 && fread(buffer, 1, (size_t) size, file) != (size_t) size) { free(buffer); fclose(file); errno = EIO; return -1; } fclose(file); *out = buffer; *out_len = (size_t) size; return 0; } int omni_write_full_fd(int fd, const uint8_t *data, size_t len) { ssize_t written; while (len > 0) { written = write(fd, data, len); if (written < 0) { if (errno == EINTR) { continue; } return -1; } if (written == 0) { errno = EIO; return -1; } data += written; len -= (size_t) written; } return 0; } static int omni_write_file_internal(const char *path, const uint8_t *data, size_t len, const char *mode) { FILE *file; if (omni_ensure_parent_dir(path) != 0) { return -1; } file = fopen(path, mode); if (file == NULL) { return -1; } if (len > 0 && fwrite(data, 1, len, file) != len) { fclose(file); errno = EIO; return -1; } if (fclose(file) != 0) { return -1; } return 0; } int omni_append_file(const char *path, const uint8_t *data, size_t len) { return omni_write_file_internal(path, data, len, "ab"); } int omni_write_file(const char *path, const uint8_t *data, size_t len) { return omni_write_file_internal(path, data, len, "wb"); } int omni_random_u32(uint32_t *out) { uint8_t *cursor; size_t remaining; int fd; if (out == NULL) { errno = EINVAL; return -1; } fd = open("/dev/urandom", O_RDONLY); if (fd < 0) { return -1; } cursor = (uint8_t *) out; remaining = sizeof(*out); while (remaining > 0) { ssize_t n = read(fd, cursor, remaining); if (n < 0) { if (errno == EINTR) { continue; } close(fd); return -1; } if (n == 0) { close(fd); errno = EIO; return -1; } cursor += n; remaining -= (size_t) n; } close(fd); if (*out == 0) { *out = 1; } return 0; } char *omni_strdup(const char *src) { size_t len; char *dst; if (src == NULL) { return NULL; } len = strlen(src); dst = (char *) malloc(len + 1U); if (dst == NULL) { return NULL; } memcpy(dst, src, len + 1U); return dst; } char *omni_strdup_printf(const char *fmt, ...) { va_list args; va_list copy; int needed; char *buffer; va_start(args, fmt); va_copy(copy, args); needed = vsnprintf(NULL, 0, fmt, copy); va_end(copy); if (needed < 0) { va_end(args); return NULL; } buffer = (char *) malloc((size_t) needed + 1U); if (buffer == NULL) { va_end(args); return NULL; } vsnprintf(buffer, (size_t) needed + 1U, fmt, args); va_end(args); return buffer; } char *omni_json_escape_bytes(const uint8_t *src, size_t len) { size_t i; size_t out_len = 0; char *out; char *cursor; if (src == NULL) { if (len == 0) { return omni_strdup(""); } errno = EINVAL; return NULL; } for (i = 0; i < len; ++i) { switch (src[i]) { case '\\': case '"': case '\b': case '\f': case '\n': case '\r': case '\t': out_len += 2; break; default: out_len += src[i] < 0x20 ? 6U : 1U; break; } } out = (char *) malloc(out_len + 1U); if (out == NULL) { return NULL; } cursor = out; for (i = 0; i < len; ++i) { switch (src[i]) { case '\\': *cursor++ = '\\'; *cursor++ = '\\'; break; case '"': *cursor++ = '\\'; *cursor++ = '"'; break; case '\b': *cursor++ = '\\'; *cursor++ = 'b'; break; case '\f': *cursor++ = '\\'; *cursor++ = 'f'; break; case '\n': *cursor++ = '\\'; *cursor++ = 'n'; break; case '\r': *cursor++ = '\\'; *cursor++ = 'r'; break; case '\t': *cursor++ = '\\'; *cursor++ = 't'; break; default: if (src[i] < 0x20) { snprintf(cursor, 7, "\\u%04x", src[i]); cursor += 6; } else { *cursor++ = (char) src[i]; } break; } } *cursor = '\0'; return out; } char *omni_json_escape(const char *src) { if (src == NULL) { return omni_strdup(""); } return omni_json_escape_bytes((const uint8_t *) src, strlen(src)); } int omni_utf8_valid(const uint8_t *data, size_t len) { size_t i = 0; uint8_t c; while (i < len) { c = data[i]; if (c <= 0x7f) { i++; continue; } if ((c & 0xe0) == 0xc0) { if (i + 1 >= len || (data[i + 1] & 0xc0) != 0x80 || c < 0xc2) { return 0; } i += 2; continue; } if ((c & 0xf0) == 0xe0) { if (i + 2 >= len || (data[i + 1] & 0xc0) != 0x80 || (data[i + 2] & 0xc0) != 0x80) { return 0; } if (c == 0xe0 && data[i + 1] < 0xa0) { return 0; } if (c == 0xed && data[i + 1] >= 0xa0) { return 0; } i += 3; continue; } if ((c & 0xf8) == 0xf0) { if (i + 3 >= len || (data[i + 1] & 0xc0) != 0x80 || (data[i + 2] & 0xc0) != 0x80 || (data[i + 3] & 0xc0) != 0x80) { return 0; } if (c == 0xf0 && data[i + 1] < 0x90) { return 0; } if (c > 0xf4 || (c == 0xf4 && data[i + 1] >= 0x90)) { return 0; } i += 4; continue; } return 0; } return 1; } void omni_trim_newline(char *line) { size_t len; if (line == NULL) { return; } len = strlen(line); while (len > 0 && (line[len - 1] == '\n' || line[len - 1] == '\r')) { line[--len] = '\0'; } } int omni_parse_duration_ms(const char *raw, int default_ms, int *out_ms) { char *endptr; long value; if (out_ms == NULL) { errno = EINVAL; return -1; } if (raw == NULL || raw[0] == '\0') { *out_ms = default_ms; return 0; } value = strtol(raw, &endptr, 10); if (endptr == raw || value <= 0) { errno = EINVAL; return -1; } if (*endptr == '\0' || strcmp(endptr, "ms") == 0) { *out_ms = (int) value; return 0; } if (strcmp(endptr, "s") == 0) { *out_ms = (int) (value * 1000L); return 0; } errno = EINVAL; return -1; } double omni_duration_ms_to_ns(double ms) { return ms * 1000000.0; } const char *omni_path_base_name(const char *path) { const char *slash; if (path == NULL) { return ""; } slash = strrchr(path, '/'); return slash == NULL ? path : slash + 1; } static uint64_t omni_now_monotonic_ms64(void) { struct timespec ts; clock_gettime(CLOCK_MONOTONIC, &ts); return (uint64_t) ts.tv_sec * 1000ULL + (uint64_t) (ts.tv_nsec / 1000000L); } static int omni_positive_int_env(const char *name, int default_value) { const char *raw = getenv(name); long parsed; char *endptr = NULL; if (raw == NULL || raw[0] == '\0') { return default_value; } parsed = strtol(raw, &endptr, 10); if (endptr == raw || *endptr != '\0' || parsed <= 0) { return default_value; } return (int) parsed; } static size_t omni_positive_size_env(const char *name, size_t default_value) { const char *raw = getenv(name); unsigned long long parsed; char *endptr = NULL; if (raw == NULL || raw[0] == '\0') { return default_value; } parsed = strtoull(raw, &endptr, 10); if (endptr == raw || *endptr != '\0' || parsed == 0ULL) { return default_value; } return (size_t) parsed; } static int omni_file_logger_flush_locked(omni_file_logger_t *logger, uint64_t now_ms) { if (logger == NULL || logger->file == NULL) { errno = EINVAL; return -1; } if (fflush(logger->file) != 0) { return -1; } logger->buffered_bytes = 0U; logger->last_flush_monotonic_ms = now_ms; return 0; } static int omni_build_rotated_path(char *buffer, size_t buffer_len, const char *path, int suffix) { size_t path_len; int written; if (buffer == NULL || buffer_len == 0U || path == NULL || path[0] == '\0') { errno = EINVAL; return -1; } path_len = strlen(path); if (path_len + 16U >= buffer_len) { errno = ENAMETOOLONG; return -1; } memcpy(buffer, path, path_len); written = snprintf(buffer + path_len, buffer_len - path_len, ".%d", suffix); if (written < 0 || (size_t) written >= buffer_len - path_len) { errno = ENAMETOOLONG; return -1; } return 0; } static int omni_file_logger_reopen_append_locked(omni_file_logger_t *logger) { struct stat st; FILE *file; if (logger == NULL || logger->path[0] == '\0') { errno = EINVAL; return -1; } file = fopen(logger->path, "ab"); if (file == NULL) { return -1; } logger->file = file; logger->current_bytes = 0U; if (stat(logger->path, &st) == 0) { logger->current_bytes = (size_t) st.st_size; } logger->buffered_bytes = 0U; logger->last_flush_monotonic_ms = omni_now_monotonic_ms64(); return 0; } static int omni_file_logger_recover_after_rotate_locked(omni_file_logger_t *logger, const char *rotated_current_path) { int reopen_errno; if (omni_file_logger_reopen_append_locked(logger) == 0) { return 0; } reopen_errno = errno; if (rotated_current_path != NULL && rotated_current_path[0] != '\0') { if (rename(rotated_current_path, logger->path) == 0) { if (omni_file_logger_reopen_append_locked(logger) == 0) { return 0; } } } errno = reopen_errno; return -1; } static int omni_file_logger_rotate_locked(omni_file_logger_t *logger) { int index; int saved_errno = 0; int should_recover = 0; char rotated_current_path[PATH_MAX]; char from_path[PATH_MAX]; char to_path[PATH_MAX]; if (logger == NULL || logger->path[0] == '\0' || logger->max_bytes == 0U || logger->max_files <= 0) { return 0; } rotated_current_path[0] = '\0'; if (logger->file != NULL) { if (omni_file_logger_flush_locked(logger, omni_now_monotonic_ms64()) != 0) { return -1; } should_recover = 1; if (fclose(logger->file) != 0) { logger->file = NULL; saved_errno = errno; goto recover; } logger->file = NULL; } if (omni_build_rotated_path(from_path, sizeof(from_path), logger->path, logger->max_files) != 0) { saved_errno = errno; goto recover; } unlink(from_path); for (index = logger->max_files - 1; index >= 1; --index) { if (omni_build_rotated_path(from_path, sizeof(from_path), logger->path, index) != 0 || omni_build_rotated_path(to_path, sizeof(to_path), logger->path, index + 1) != 0) { saved_errno = errno; goto recover; } if (rename(from_path, to_path) != 0 && errno != ENOENT) { saved_errno = errno; goto recover; } } if (omni_build_rotated_path(to_path, sizeof(to_path), logger->path, 1) != 0) { saved_errno = errno; goto recover; } if (rename(logger->path, to_path) != 0 && errno != ENOENT) { saved_errno = errno; goto recover; } snprintf(rotated_current_path, sizeof(rotated_current_path), "%s", to_path); if (omni_file_logger_reopen_append_locked(logger) != 0) { saved_errno = errno; goto recover; } return 0; recover: if (should_recover) { int recover_errno = saved_errno != 0 ? saved_errno : errno; if (omni_file_logger_recover_after_rotate_locked(logger, rotated_current_path) == 0) { errno = recover_errno; } else if (saved_errno != 0) { errno = saved_errno; } } else if (saved_errno != 0) { errno = saved_errno; } return -1; } void omni_file_logger_init(omni_file_logger_t *logger, FILE *file) { memset(logger, 0, sizeof(*logger)); logger->file = file; pthread_mutex_init(&logger->mutex, NULL); logger->flush_bytes = 1U; logger->flush_interval_ms = 0; logger->immediate_flush = 1; logger->last_flush_monotonic_ms = omni_now_monotonic_ms64(); } void omni_file_logger_init_path(omni_file_logger_t *logger, FILE *file, const char *path, int immediate_flush) { struct stat st; omni_file_logger_init(logger, file); if (path != NULL && path[0] != '\0') { snprintf(logger->path, sizeof(logger->path), "%s", path); if (stat(path, &st) == 0) { logger->current_bytes = (size_t) st.st_size; } } logger->flush_bytes = omni_positive_size_env("BLITZ_JSONL_FLUSH_BYTES", 262144U); logger->flush_interval_ms = omni_positive_int_env("BLITZ_JSONL_FLUSH_INTERVAL_MS", 1000); logger->max_bytes = omni_positive_size_env("BLITZ_JSONL_ROTATE_BYTES", 134217728U); logger->max_files = omni_positive_int_env("BLITZ_JSONL_ROTATE_FILES", 8); logger->immediate_flush = immediate_flush != 0; } void omni_file_logger_destroy(omni_file_logger_t *logger) { pthread_mutex_destroy(&logger->mutex); } int omni_file_logger_write_line(omni_file_logger_t *logger, const char *line) { int rc = 0; size_t line_len; uint64_t now_ms; if (logger == NULL || logger->file == NULL || line == NULL) { errno = EINVAL; return -1; } line_len = strlen(line) + 1U; now_ms = omni_now_monotonic_ms64(); pthread_mutex_lock(&logger->mutex); if (fputs(line, logger->file) == EOF || fputc('\n', logger->file) == EOF) { rc = -1; } else { logger->current_bytes += line_len; logger->buffered_bytes += line_len; if (logger->immediate_flush || logger->buffered_bytes >= logger->flush_bytes || (logger->flush_interval_ms > 0 && now_ms - logger->last_flush_monotonic_ms >= (uint64_t) logger->flush_interval_ms)) { if (omni_file_logger_flush_locked(logger, now_ms) != 0) { rc = -1; } } if (rc == 0 && logger->max_bytes > 0U && logger->current_bytes >= logger->max_bytes) { if (omni_file_logger_rotate_locked(logger) != 0) { rc = -1; } } } pthread_mutex_unlock(&logger->mutex); return rc; }