From e9e2389a860bfbe345aac10a55e2d92bfa08b4c4 Mon Sep 17 00:00:00 2001 From: Yashwanth Anantharaju Date: Wed, 26 Nov 2025 06:12:59 -0500 Subject: [PATCH 1/2] out_azure_kusto: fix SIGSEGV in nested mk_list_foreach_safe loops The ingest_all_chunks() function had nested mk_list_foreach_safe loops that both used the same 'tmp' variable as the iterator. The macro stores the 'next' pointer in its second argument for safe iteration during list modification. When the inner loop overwrote 'tmp', it corrupted the outer loop's iteration state, causing undefined behavior and a SIGSEGV crash when processing buffered backlog data on startup. Fix: Add a dedicated 'f_tmp' variable for the inner loop to prevent iterator corruption. Also adds a regression test (buffering_backlog) that exercises the buffering/backlog restart code path to guard against future regressions. Signed-off-by: Yash Ananth Signed-off-by: Yashwanth Anantharaju --- plugins/out_azure_kusto/azure_kusto.c | 3 +- tests/runtime/out_azure_kusto.c | 108 ++++++++++++++++++++++++++ 2 files changed, 110 insertions(+), 1 deletion(-) diff --git a/plugins/out_azure_kusto/azure_kusto.c b/plugins/out_azure_kusto/azure_kusto.c index 041460992c5..cf5b784f4a1 100644 --- a/plugins/out_azure_kusto/azure_kusto.c +++ b/plugins/out_azure_kusto/azure_kusto.c @@ -409,6 +409,7 @@ static int ingest_all_chunks(struct flb_azure_kusto *ctx, struct flb_config *con struct azure_kusto_file *chunk; struct mk_list *tmp; struct mk_list *head; + struct mk_list *f_tmp; struct mk_list *f_head; struct flb_fstore_file *fsf; struct flb_fstore_stream *fs_stream; @@ -427,7 +428,7 @@ static int ingest_all_chunks(struct flb_azure_kusto *ctx, struct flb_config *con continue; } - mk_list_foreach_safe(f_head, tmp, &fs_stream->files) { + mk_list_foreach_safe(f_head, f_tmp, &fs_stream->files) { fsf = mk_list_entry(f_head, struct flb_fstore_file, _head); chunk = fsf->data; diff --git a/tests/runtime/out_azure_kusto.c b/tests/runtime/out_azure_kusto.c index 6bf8499ba13..32a6ca26b82 100644 --- a/tests/runtime/out_azure_kusto.c +++ b/tests/runtime/out_azure_kusto.c @@ -20,8 +20,30 @@ #include #include "flb_tests_runtime.h" +#include +#include +#include +#include +#include /* Test data */ + +static int flb_kusto_unlink_cb(const char *fpath, const struct stat *sb, int typeflag, struct FTW *ftwbuf) +{ + return remove(fpath); +} + +static void flb_kusto_rm_rf(const char *path) +{ + struct stat st; + + if (stat(path, &st) != 0) { + return; + } + + nftw(path, flb_kusto_unlink_cb, 64, FTW_DEPTH | FTW_PHYS); +} + #include "data/common/json_invalid.h" /* JSON_INVALID */ /* Test functions */ @@ -30,6 +52,7 @@ void flb_test_azure_kusto_managed_identity_system(void); void flb_test_azure_kusto_managed_identity_user(void); void flb_test_azure_kusto_service_principal(void); void flb_test_azure_kusto_workload_identity(void); +void flb_test_azure_kusto_buffering_backlog(void); /* Test list */ TEST_LIST = { @@ -38,6 +61,7 @@ TEST_LIST = { {"managed_identity_user", flb_test_azure_kusto_managed_identity_user}, {"service_principal", flb_test_azure_kusto_service_principal}, {"workload_identity", flb_test_azure_kusto_workload_identity}, + {"buffering_backlog", flb_test_azure_kusto_buffering_backlog}, {NULL, NULL} }; @@ -210,4 +234,88 @@ void flb_test_azure_kusto_workload_identity(void) flb_stop(ctx); flb_destroy(ctx); +} + +/* Regression: exercise buffering-enabled backlog processing on restart to validate nested mk_list_foreach_safe fix */ +void flb_test_azure_kusto_buffering_backlog(void) +{ + int i; + int ret; + int bytes; + char sample[] = "{\"k\":\"v\"}"; + size_t sample_size = sizeof(sample) - 1; + char buffer_dir[PATH_MAX]; + pid_t pid; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + + pid = getpid(); + snprintf(buffer_dir, sizeof(buffer_dir), "/tmp/flb-kusto-test-%d", (int) pid); + + /* Ensure a clean buffer directory before starting */ + flb_kusto_rm_rf(buffer_dir); + mkdir(buffer_dir, 0700); + + /* First run: enable buffering and write data to disk */ + ctx = flb_create(); + flb_service_set(ctx, "Flush", "1", "Grace", "1", "Log_Level", "error", NULL); + + in_ffd = flb_input(ctx, (char *) "lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *) "azure_kusto", NULL); + TEST_CHECK(out_ffd >= 0); + flb_output_set(ctx, out_ffd, "match", "test", NULL); + flb_output_set(ctx, out_ffd, "auth_type", "managed_identity", NULL); + flb_output_set(ctx, out_ffd, "client_id", "system", NULL); + flb_output_set(ctx, out_ffd, "ingestion_endpoint", "https://ingest-CLUSTER.kusto.windows.net", NULL); + flb_output_set(ctx, out_ffd, "database_name", "telemetrydb", NULL); + flb_output_set(ctx, out_ffd, "table_name", "logs", NULL); + flb_output_set(ctx, out_ffd, "buffering_enabled", "true", NULL); + flb_output_set(ctx, out_ffd, "buffer_dir", buffer_dir, NULL); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + for (i = 0; i < 5; i++) { + bytes = flb_lib_push(ctx, in_ffd, sample, sample_size); + TEST_CHECK(bytes == (int) sample_size); + } + + sleep(1); /* allow flush to write buffered chunks */ + + flb_stop(ctx); + flb_destroy(ctx); + + /* Second run: restart to process backlog from buffer_dir */ + ctx = flb_create(); + flb_service_set(ctx, "Flush", "1", "Grace", "1", "Log_Level", "error", NULL); + + in_ffd = flb_input(ctx, (char *) "lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *) "azure_kusto", NULL); + TEST_CHECK(out_ffd >= 0); + flb_output_set(ctx, out_ffd, "match", "test", NULL); + flb_output_set(ctx, out_ffd, "auth_type", "managed_identity", NULL); + flb_output_set(ctx, out_ffd, "client_id", "system", NULL); + flb_output_set(ctx, out_ffd, "ingestion_endpoint", "https://ingest-CLUSTER.kusto.windows.net", NULL); + flb_output_set(ctx, out_ffd, "database_name", "telemetrydb", NULL); + flb_output_set(ctx, out_ffd, "table_name", "logs", NULL); + flb_output_set(ctx, out_ffd, "buffering_enabled", "true", NULL); + flb_output_set(ctx, out_ffd, "buffer_dir", buffer_dir, NULL); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + sleep(1); /* ingest_all_chunks runs on startup for buffered backlog */ + + flb_stop(ctx); + flb_destroy(ctx); + + /* Cleanup buffer directory after test */ + flb_kusto_rm_rf(buffer_dir); } \ No newline at end of file From 361baa1ba28e118055b7c74b6893de533dea2f78 Mon Sep 17 00:00:00 2001 From: Yashwanth Anantharaju Date: Fri, 19 Dec 2025 10:20:53 -0500 Subject: [PATCH 2/2] out_azure_kusto: fix timer/flush race condition causing SIGSEGV MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This patch fixes a race condition between the scheduler timer callback (cb_azure_kusto_ingest) and flush/delete paths that causes SIGSEGV crashes when the timer accesses freed file list memory. Problem: The scheduler timer iterates ctx->stream_active->files via mk_list_foreach_safe while concurrent flush/exit paths delete files from the same list without synchronization. This causes use-after-free when the timer callback accesses memory that has been freed by task destruction. Stack trace from production crash: #0 memcpy() at memmove-vec-unaligned-erms.S:222 #1 cio_file_content_copy() at lib/chunkio/src/cio_file.c:547 #2 flb_fstore_file_content_copy() at src/flb_fstore.c:289 #3 construct_request_buffer() at plugins/out_azure_kusto/azure_kusto.c #4 cb_azure_kusto_ingest() at plugins/out_azure_kusto/azure_kusto.c #5 flb_sched_event_handler() at src/flb_scheduler.c:624 Solution: 1. Add pthread_mutex_t files_mutex to protect file list operations 2. Implement lock→mark file locked→unlock→I/O→re-lock pattern in timer callback to minimize lock hold time during network I/O 3. Add skip-if-locked guards in delete/inactive/cleanup functions to prevent double-free and deadlock 4. Cancel upload timer in cb_azure_kusto_exit before cleanup to prevent races during shutdown 5. Move early-delete to after successful queue enqueue to prevent UAF on queue failure 6. Clear fsf->data = NULL in store_exit to prevent dangling pointers The fix has been validated in production environments where the SIGSEGV crashes were occurring with multiple workers and buffering enabled. Signed-off-by: Yash Ananthakrishnan Signed-off-by: Yashwanth Anantharaju --- plugins/out_azure_kusto/azure_kusto.c | 79 +++++++++++++++- plugins/out_azure_kusto/azure_kusto.h | 4 + plugins/out_azure_kusto/azure_kusto_ingest.c | 17 ++-- plugins/out_azure_kusto/azure_kusto_store.c | 54 +++++++++++ tests/runtime/out_azure_kusto.c | 98 +++++++++++++++++++- 5 files changed, 241 insertions(+), 11 deletions(-) diff --git a/plugins/out_azure_kusto/azure_kusto.c b/plugins/out_azure_kusto/azure_kusto.c index cf5b784f4a1..c32536317c1 100644 --- a/plugins/out_azure_kusto/azure_kusto.c +++ b/plugins/out_azure_kusto/azure_kusto.c @@ -143,6 +143,7 @@ flb_sds_t get_azure_kusto_token(struct flb_azure_kusto *ctx) flb_sds_len(ctx->o->access_token) + 2); if (!output) { flb_plg_error(ctx->ins, "error creating token buffer"); + pthread_mutex_unlock(&ctx->token_mutex); return NULL; } flb_sds_snprintf(&output, flb_sds_alloc(output), "%s %s", ctx->o->token_type, @@ -422,6 +423,9 @@ static int ingest_all_chunks(struct flb_azure_kusto *ctx, struct flb_config *con int is_compressed = FLB_FALSE; flb_sds_t tag_sds; + /* Lock to protect list iteration from concurrent modifications */ + pthread_mutex_lock(&ctx->files_mutex); + mk_list_foreach_safe(head, tmp, &ctx->fs->streams) { fs_stream = mk_list_entry(head, struct flb_fstore_stream, _head); if (fs_stream == ctx->stream_upload) { @@ -451,12 +455,19 @@ static int ingest_all_chunks(struct flb_azure_kusto *ctx, struct flb_config *con continue; } + /* Mark chunk as locked before releasing mutex for I/O */ + azure_kusto_store_file_lock(chunk); + pthread_mutex_unlock(&ctx->files_mutex); + ret = construct_request_buffer(ctx, NULL, chunk, &buffer, &buffer_size); if (ret < 0) { flb_plg_error(ctx->ins, "ingest_all_old_buffer_files :: Could not construct request buffer for %s", chunk->file_path); + pthread_mutex_lock(&ctx->files_mutex); + azure_kusto_store_file_unlock(chunk); + pthread_mutex_unlock(&ctx->files_mutex); return -1; } @@ -473,6 +484,9 @@ static int ingest_all_chunks(struct flb_azure_kusto *ctx, struct flb_config *con "ingest_all_old_buffer_files :: cannot gzip payload"); flb_sds_destroy(payload); flb_sds_destroy(tag_sds); + pthread_mutex_lock(&ctx->files_mutex); + azure_kusto_store_file_unlock(chunk); + pthread_mutex_unlock(&ctx->files_mutex); return -1; } else { @@ -488,6 +502,14 @@ static int ingest_all_chunks(struct flb_azure_kusto *ctx, struct flb_config *con ret = azure_kusto_load_ingestion_resources(ctx, config); if (ret != 0) { flb_plg_error(ctx->ins, "ingest_all_old_buffer_files :: cannot load ingestion resources"); + flb_sds_destroy(payload); + flb_sds_destroy(tag_sds); + if (is_compressed) { + flb_free(final_payload); + } + pthread_mutex_lock(&ctx->files_mutex); + azure_kusto_store_file_unlock(chunk); + pthread_mutex_unlock(&ctx->files_mutex); return -1; } @@ -495,10 +517,12 @@ static int ingest_all_chunks(struct flb_azure_kusto *ctx, struct flb_config *con ret = azure_kusto_queued_ingestion(ctx, tag_sds, flb_sds_len(tag_sds), final_payload, final_payload_size, chunk); if (ret != 0) { flb_plg_error(ctx->ins, "ingest_all_old_buffer_files :: Failed to ingest data to Azure Kusto"); + pthread_mutex_lock(&ctx->files_mutex); if (chunk){ azure_kusto_store_file_unlock(chunk); chunk->failures += 1; } + pthread_mutex_unlock(&ctx->files_mutex); flb_sds_destroy(tag_sds); flb_sds_destroy(payload); if (is_compressed) { @@ -513,11 +537,25 @@ static int ingest_all_chunks(struct flb_azure_kusto *ctx, struct flb_config *con flb_free(final_payload); } - /* data was sent successfully- delete the local buffer */ - azure_kusto_store_file_cleanup(ctx, chunk); + /* data was sent successfully- cleanup the local buffer */ + if (ctx->buffer_file_delete_early == FLB_TRUE) { + /* File was already unlocked and deleted in azure_kusto_queued_ingestion */ + flb_plg_debug(ctx->ins, "ingest_all_chunks: buffer file already deleted (early delete mode)"); + } + else { + /* Unlock and delete the local buffer */ + azure_kusto_store_file_unlock(chunk); + azure_kusto_store_file_cleanup(ctx, chunk); + } + + /* Re-acquire lock before next iteration */ + pthread_mutex_lock(&ctx->files_mutex); } } + /* Release lock after iteration completes */ + pthread_mutex_unlock(&ctx->files_mutex); + return 0; } @@ -574,6 +612,9 @@ static void cb_azure_kusto_ingest(struct flb_config *config, void *data) flb_plg_debug(ctx->ins, "Running upload timer callback (scheduler_kusto_ingest).."); now = time(NULL); + /* Lock to protect list iteration from concurrent modifications */ + pthread_mutex_lock(&ctx->files_mutex); + /* Iterate over all files in the active stream */ mk_list_foreach_safe(head, tmp, &ctx->stream_active->files) { fsf = mk_list_entry(head, struct flb_fstore_file, _head); @@ -592,6 +633,12 @@ static void cb_azure_kusto_ingest(struct flb_config *config, void *data) continue; } + /* Mark file as locked before releasing mutex to prevent concurrent access */ + azure_kusto_store_file_lock(file); + + /* Release lock before I/O operations */ + pthread_mutex_unlock(&ctx->files_mutex); + retry_count = 0; backoff_time = 2; /* Initial backoff time in seconds */ @@ -704,6 +751,8 @@ static void cb_azure_kusto_ingest(struct flb_config *config, void *data) } /* Delete the file after successful ingestion */ + /* Unlock file before delete so the delete helper can proceed */ + azure_kusto_store_file_unlock(file); ret = azure_kusto_store_file_delete(ctx, file); if (ret == 0) { flb_plg_debug(ctx->ins, "scheduler_kusto_ingest :: deleted successfully ingested file"); @@ -731,6 +780,8 @@ static void cb_azure_kusto_ingest(struct flb_config *config, void *data) /* If the maximum number of retries is reached, log an error and move to the next file */ if (retry_count >= ctx->scheduler_max_retries) { flb_plg_error(ctx->ins, "scheduler_kusto_ingest :: Max retries reached for file %s", file->fsf->name); + /* Unlock file before delete/inactive so the helper can proceed */ + azure_kusto_store_file_unlock(file); if (ctx->delete_on_max_upload_error){ azure_kusto_store_file_delete(ctx, file); } @@ -738,7 +789,14 @@ static void cb_azure_kusto_ingest(struct flb_config *config, void *data) azure_kusto_store_file_inactive(ctx, file); } } + + /* Re-acquire lock before next iteration */ + pthread_mutex_lock(&ctx->files_mutex); } + + /* Release lock after iteration completes */ + pthread_mutex_unlock(&ctx->files_mutex); + /* Log the end of the upload timer callback */ flb_plg_debug(ctx->ins, "Exited upload timer callback (cb_azure_kusto_ingest).."); } @@ -912,6 +970,7 @@ static int cb_azure_kusto_init(struct flb_output_instance *ins, struct flb_confi } ctx->timer_created = FLB_FALSE; + ctx->upload_timer = NULL; ctx->timer_ms = (int) (ctx->upload_timeout / 6) * 1000; flb_plg_info(ctx->ins, "Using upload size %lu bytes", ctx->file_size); } @@ -929,6 +988,7 @@ static int cb_azure_kusto_init(struct flb_output_instance *ins, struct flb_confi pthread_mutex_init(&ctx->token_mutex, NULL); pthread_mutex_init(&ctx->resources_mutex, NULL); pthread_mutex_init(&ctx->blob_mutex, NULL); + pthread_mutex_init(&ctx->files_mutex, NULL); /* * Create upstream context for Kusto Ingestion endpoint @@ -1197,7 +1257,7 @@ static void flush_init(void *out_context, struct flb_config *config) sched = flb_sched_ctx_get(); ret = flb_sched_timer_cb_create(sched, FLB_SCHED_TIMER_CB_PERM, - ctx->timer_ms, cb_azure_kusto_ingest, ctx, NULL); + ctx->timer_ms, cb_azure_kusto_ingest, ctx, &ctx->upload_timer); if (ret == -1) { flb_plg_error(ctx->ins, "Failed to create upload timer"); FLB_OUTPUT_RETURN(FLB_RETRY); @@ -1276,6 +1336,8 @@ static void cb_azure_kusto_flush(struct flb_event_chunk *event_chunk, if (upload_file != NULL && upload_file->failures >= ctx->scheduler_max_retries) { flb_plg_warn(ctx->ins, "File with tag %s failed to send %d times, will not " "retry", event_chunk->tag, ctx->scheduler_max_retries); + /* Unlock file before delete/inactive since those skip locked files */ + azure_kusto_store_file_unlock(upload_file); if (ctx->delete_on_max_upload_error){ azure_kusto_store_file_delete(ctx, upload_file); } @@ -1319,16 +1381,18 @@ static void cb_azure_kusto_flush(struct flb_event_chunk *event_chunk, if (ret == 0){ if (ctx->buffering_enabled == FLB_TRUE && ctx->buffer_file_delete_early == FLB_TRUE){ flb_plg_debug(ctx->ins, "buffer file already deleted after blob creation"); + /* File was already unlocked and deleted in ingest_to_kusto, do not access upload_file */ ret = FLB_OK; goto cleanup; } else{ + /* Unlock file before delete since delete skips locked files */ + azure_kusto_store_file_unlock(upload_file); ret = azure_kusto_store_file_delete(ctx, upload_file); if (ret != 0){ /* File couldn't be deleted */ ret = FLB_RETRY; if (upload_file){ - azure_kusto_store_file_unlock(upload_file); upload_file->failures += 1; } goto error; @@ -1489,6 +1553,12 @@ static int cb_azure_kusto_exit(void *data, struct flb_config *config) return -1; } + /* Cancel upload timer first to prevent race with concurrent ingest callback */ + if (ctx->upload_timer != NULL) { + flb_sched_timer_cb_destroy(ctx->upload_timer); + ctx->upload_timer = NULL; + } + if (ctx->buffering_enabled == FLB_TRUE){ if (azure_kusto_store_has_data(ctx) == FLB_TRUE) { @@ -1509,6 +1579,7 @@ static int cb_azure_kusto_exit(void *data, struct flb_config *config) pthread_mutex_destroy(&ctx->resources_mutex); pthread_mutex_destroy(&ctx->token_mutex); pthread_mutex_destroy(&ctx->blob_mutex); + pthread_mutex_destroy(&ctx->files_mutex); flb_azure_kusto_conf_destroy(ctx); diff --git a/plugins/out_azure_kusto/azure_kusto.h b/plugins/out_azure_kusto/azure_kusto.h index 362b1379533..0a555a9a2c9 100644 --- a/plugins/out_azure_kusto/azure_kusto.h +++ b/plugins/out_azure_kusto/azure_kusto.h @@ -127,6 +127,7 @@ struct flb_azure_kusto { int timer_created; int timer_ms; + struct flb_sched_timer *upload_timer; /* timer handle for cancellation on exit */ /* mutex for acquiring oauth tokens */ pthread_mutex_t token_mutex; @@ -141,6 +142,9 @@ struct flb_azure_kusto { pthread_mutex_t buffer_mutex; + /* mutex protecting stream_active->files list from concurrent timer/delete races */ + pthread_mutex_t files_mutex; + int buffering_enabled; size_t file_size; diff --git a/plugins/out_azure_kusto/azure_kusto_ingest.c b/plugins/out_azure_kusto/azure_kusto_ingest.c index 38d2aa076e5..2dfc2771758 100644 --- a/plugins/out_azure_kusto/azure_kusto_ingest.c +++ b/plugins/out_azure_kusto/azure_kusto_ingest.c @@ -632,18 +632,23 @@ int azure_kusto_queued_ingestion(struct flb_azure_kusto *ctx, flb_sds_t tag, blob_uri = azure_kusto_create_blob(ctx, blob_id, payload, payload_size); if (blob_uri) { - if (ctx->buffering_enabled == FLB_TRUE && upload_file != NULL && ctx->buffer_file_delete_early == FLB_TRUE) { - flb_plg_debug(ctx->ins, "buffering enabled, ingest to blob successfully done and now deleting the buffer file %s", blob_id); - if (azure_kusto_store_file_delete(ctx, upload_file) != 0) { - flb_plg_error(ctx->ins, "blob creation successful but error deleting buffer file %s", blob_id); - } - } ret = azure_kusto_enqueue_ingestion(ctx, blob_uri, payload_size); if (ret != 0) { flb_plg_error(ctx->ins, "failed to enqueue ingestion blob to queue"); ret = -1; } + else { + /* Only delete file after successful queue - preserves data for retry on queue failure */ + if (ctx->buffering_enabled == FLB_TRUE && upload_file != NULL && ctx->buffer_file_delete_early == FLB_TRUE) { + flb_plg_debug(ctx->ins, "queue succeeded, deleting buffer file %s", blob_id); + /* Unlock file before delete since delete skips locked files */ + azure_kusto_store_file_unlock(upload_file); + if (azure_kusto_store_file_delete(ctx, upload_file) != 0) { + flb_plg_error(ctx->ins, "queue successful but error deleting buffer file %s", blob_id); + } + } + } flb_sds_destroy(blob_uri); } diff --git a/plugins/out_azure_kusto/azure_kusto_store.c b/plugins/out_azure_kusto/azure_kusto_store.c index a1059edd651..5d27cceff25 100644 --- a/plugins/out_azure_kusto/azure_kusto_store.c +++ b/plugins/out_azure_kusto/azure_kusto_store.c @@ -140,6 +140,9 @@ struct azure_kusto_file *azure_kusto_store_file_get(struct flb_azure_kusto *ctx, * Based in the current ctx->stream_name, locate a candidate file to * store the incoming data using as a lookup pattern the content Tag. */ + /* Lock to protect list iteration from concurrent modifications */ + pthread_mutex_lock(&ctx->files_mutex); + mk_list_foreach_safe(head, tmp, &ctx->stream_active->files) { fsf = mk_list_entry(head, struct flb_fstore_file, _head); @@ -147,6 +150,7 @@ struct azure_kusto_file *azure_kusto_store_file_get(struct flb_azure_kusto *ctx, if (fsf->data == NULL) { flb_plg_warn(ctx->ins, "BAD: found flb_fstore_file with NULL data reference, tag=%s, file=%s, will try to delete", tag, fsf->name); flb_fstore_file_delete(ctx->fs, fsf); + continue; } if (fsf->meta_size != tag_len) { @@ -171,6 +175,8 @@ struct azure_kusto_file *azure_kusto_store_file_get(struct flb_azure_kusto *ctx, } } + pthread_mutex_unlock(&ctx->files_mutex); + if (!found) { return NULL; } @@ -239,9 +245,13 @@ int azure_kusto_store_buffer_put(struct flb_azure_kusto *ctx, struct azure_kusto flb_plg_debug(ctx->ins, "[azure_kusto] new buffer file: %s", name); + /* Lock to protect list modification and fsf->data assignment */ + pthread_mutex_lock(&ctx->files_mutex); + /* Create the file */ fsf = flb_fstore_file_create(ctx->fs, ctx->stream_active, name, bytes); if (!fsf) { + pthread_mutex_unlock(&ctx->files_mutex); flb_plg_error(ctx->ins, "could not create the file '%s' in the store", name); flb_sds_destroy(name); @@ -253,6 +263,7 @@ int azure_kusto_store_buffer_put(struct flb_azure_kusto *ctx, struct azure_kusto if (ret == -1) { flb_plg_warn(ctx->ins, "Deleting buffer file because metadata could not be written"); flb_fstore_file_delete(ctx->fs, fsf); + pthread_mutex_unlock(&ctx->files_mutex); return -1; } @@ -262,6 +273,7 @@ int azure_kusto_store_buffer_put(struct flb_azure_kusto *ctx, struct azure_kusto flb_errno(); flb_plg_warn(ctx->ins, "Deleting buffer file because azure_kusto context creation failed"); flb_fstore_file_delete(ctx->fs, fsf); + pthread_mutex_unlock(&ctx->files_mutex); return -1; } azure_kusto_file->fsf = fsf; @@ -270,6 +282,9 @@ int azure_kusto_store_buffer_put(struct flb_azure_kusto *ctx, struct azure_kusto /* Use fstore opaque 'data' reference to keep our context */ fsf->data = azure_kusto_file; + + pthread_mutex_unlock(&ctx->files_mutex); + flb_sds_destroy(name); } @@ -510,6 +525,9 @@ int azure_kusto_store_exit(struct flb_azure_kusto *ctx) return 0; } + /* Lock to protect list access during cleanup */ + pthread_mutex_lock(&ctx->files_mutex); + /* release local context on non-multi upload files */ mk_list_foreach(head, &ctx->fs->streams) { fs_stream = mk_list_entry(head, struct flb_fstore_stream, _head); @@ -522,10 +540,13 @@ int azure_kusto_store_exit(struct flb_azure_kusto *ctx) if (fsf->data != NULL) { azure_kusto_file = fsf->data; flb_free(azure_kusto_file); + fsf->data = NULL; /* Clear pointer to prevent use-after-free */ } } } + pthread_mutex_unlock(&ctx->files_mutex); + if (ctx->fs) { flb_fstore_destroy(ctx->fs); } @@ -639,11 +660,22 @@ int azure_kusto_store_file_inactive(struct flb_azure_kusto *ctx, struct azure_ku int ret; struct flb_fstore_file *fsf; + /* Lock to protect list modification from concurrent timer access */ + pthread_mutex_lock(&ctx->files_mutex); + + /* Skip if file is locked (being processed by timer callback) */ + if (azure_kusto_file->locked == FLB_TRUE) { + pthread_mutex_unlock(&ctx->files_mutex); + return -1; + } + fsf = azure_kusto_file->fsf; flb_free(azure_kusto_file); ret = flb_fstore_file_inactive(ctx->fs, fsf); + pthread_mutex_unlock(&ctx->files_mutex); + return ret; } @@ -663,12 +695,23 @@ int azure_kusto_store_file_cleanup(struct flb_azure_kusto *ctx, struct azure_kus { struct flb_fstore_file *fsf; + /* Lock to protect list modification from concurrent timer access */ + pthread_mutex_lock(&ctx->files_mutex); + + /* Skip if file is locked (being processed by timer callback) */ + if (azure_kusto_file->locked == FLB_TRUE) { + pthread_mutex_unlock(&ctx->files_mutex); + return -1; + } + fsf = azure_kusto_file->fsf; /* permanent deletion */ flb_fstore_file_delete(ctx->fs, fsf); flb_free(azure_kusto_file); + pthread_mutex_unlock(&ctx->files_mutex); + return 0; } @@ -690,6 +733,15 @@ int azure_kusto_store_file_delete(struct flb_azure_kusto *ctx, struct azure_kust { struct flb_fstore_file *fsf; + /* Lock to protect list modification from concurrent timer access */ + pthread_mutex_lock(&ctx->files_mutex); + + /* Skip if file is locked (being processed by timer callback) */ + if (azure_kusto_file->locked == FLB_TRUE) { + pthread_mutex_unlock(&ctx->files_mutex); + return -1; + } + fsf = azure_kusto_file->fsf; ctx->current_buffer_size -= azure_kusto_file->size; @@ -697,6 +749,8 @@ int azure_kusto_store_file_delete(struct flb_azure_kusto *ctx, struct azure_kust flb_fstore_file_delete(ctx->fs, fsf); flb_free(azure_kusto_file); + pthread_mutex_unlock(&ctx->files_mutex); + return 0; } diff --git a/tests/runtime/out_azure_kusto.c b/tests/runtime/out_azure_kusto.c index 32a6ca26b82..5aeed79c158 100644 --- a/tests/runtime/out_azure_kusto.c +++ b/tests/runtime/out_azure_kusto.c @@ -53,6 +53,9 @@ void flb_test_azure_kusto_managed_identity_user(void); void flb_test_azure_kusto_service_principal(void); void flb_test_azure_kusto_workload_identity(void); void flb_test_azure_kusto_buffering_backlog(void); +#ifndef _WIN32 +void flb_test_azure_kusto_timer_flush_race(void); +#endif /* Test list */ TEST_LIST = { @@ -62,6 +65,9 @@ TEST_LIST = { {"service_principal", flb_test_azure_kusto_service_principal}, {"workload_identity", flb_test_azure_kusto_workload_identity}, {"buffering_backlog", flb_test_azure_kusto_buffering_backlog}, +#ifndef _WIN32 + {"timer_flush_race", flb_test_azure_kusto_timer_flush_race}, +#endif {NULL, NULL} }; @@ -318,4 +324,94 @@ void flb_test_azure_kusto_buffering_backlog(void) /* Cleanup buffer directory after test */ flb_kusto_rm_rf(buffer_dir); -} \ No newline at end of file +} + +#ifndef _WIN32 +/* + * TDD RED PHASE: Timer/Flush race condition test + * + * This test exercises the race where cb_azure_kusto_ingest (timer callback) + * iterates ctx->stream_active->files while concurrent flush/exit paths + * delete files without synchronization, leading to UAF/SIGSEGV. + * + * Expected behavior: + * - CURRENT (unfixed): Should crash/fail under ASan due to UAF + * - AFTER FIX: Should pass cleanly when mutex protects file list access + */ +void flb_test_azure_kusto_timer_flush_race(void) +{ + int i; + int ret; + int bytes; + char sample[] = "{\"race\":\"test\"}"; + size_t sample_size = sizeof(sample) - 1; + char buffer_dir[PATH_MAX]; + pid_t pid; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + + pid = getpid(); + snprintf(buffer_dir, sizeof(buffer_dir), "/tmp/flb-kusto-race-test-%d", (int) pid); + + /* Ensure clean buffer directory */ + flb_kusto_rm_rf(buffer_dir); + mkdir(buffer_dir, 0700); + + /* Create context with aggressive timing to trigger race */ + ctx = flb_create(); + flb_service_set(ctx, "Flush", "0.5", "Grace", "1", "Log_Level", "error", NULL); + + in_ffd = flb_input(ctx, (char *) "lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "race.test", NULL); + + out_ffd = flb_output(ctx, (char *) "azure_kusto", NULL); + TEST_CHECK(out_ffd >= 0); + flb_output_set(ctx, out_ffd, "match", "race.test", NULL); + flb_output_set(ctx, out_ffd, "auth_type", "managed_identity", NULL); + flb_output_set(ctx, out_ffd, "client_id", "system", NULL); + flb_output_set(ctx, out_ffd, "ingestion_endpoint", "https://ingest-CLUSTER.kusto.windows.net", NULL); + flb_output_set(ctx, out_ffd, "database_name", "testdb", NULL); + flb_output_set(ctx, out_ffd, "table_name", "logs", NULL); + + /* Enable buffering with small timeout to trigger concurrent timer/flush */ + flb_output_set(ctx, out_ffd, "buffering_enabled", "true", NULL); + flb_output_set(ctx, out_ffd, "buffer_dir", buffer_dir, NULL); + flb_output_set(ctx, out_ffd, "upload_timeout", "1s", NULL); /* 1 second timeout triggers timer */ + flb_output_set(ctx, out_ffd, "upload_file_size", "1M", NULL); /* Minimum 1MB file size */ + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Push data to create buffered chunks */ + for (i = 0; i < 10; i++) { + bytes = flb_lib_push(ctx, in_ffd, sample, sample_size); + TEST_CHECK(bytes == (int) sample_size); + } + + /* + * Sleep enough to let: + * 1. Flush write chunks to disk + * 2. Timer callback (cb_azure_kusto_ingest) start iterating files + * 3. Concurrent flush/exit delete files while timer is running + * + * This timing creates the race window where timer accesses freed memory. + */ + sleep(2); + + /* + * Stop quickly to trigger exit-path race: + * cb_azure_kusto_exit will delete files while timer callback + * may still be iterating the list. + * + * Expected on CURRENT code: UAF crash under ASan + * Expected AFTER fix: Clean shutdown + */ + flb_stop(ctx); + flb_destroy(ctx); + + /* Cleanup */ + flb_kusto_rm_rf(buffer_dir); +} +#endif /* _WIN32 */ \ No newline at end of file