Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 77 additions & 5 deletions plugins/out_azure_kusto/azure_kusto.c
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ flb_sds_t get_azure_kusto_token(struct flb_azure_kusto *ctx)
flb_sds_len(ctx->o->access_token) + 2);
if (!output) {
flb_plg_error(ctx->ins, "error creating token buffer");
pthread_mutex_unlock(&ctx->token_mutex);
return NULL;
}
flb_sds_snprintf(&output, flb_sds_alloc(output), "%s %s", ctx->o->token_type,
Expand Down Expand Up @@ -409,6 +410,7 @@ static int ingest_all_chunks(struct flb_azure_kusto *ctx, struct flb_config *con
struct azure_kusto_file *chunk;
struct mk_list *tmp;
struct mk_list *head;
struct mk_list *f_tmp;
struct mk_list *f_head;
struct flb_fstore_file *fsf;
struct flb_fstore_stream *fs_stream;
Expand All @@ -421,13 +423,16 @@ static int ingest_all_chunks(struct flb_azure_kusto *ctx, struct flb_config *con
int is_compressed = FLB_FALSE;
flb_sds_t tag_sds;

/* Lock to protect list iteration from concurrent modifications */
pthread_mutex_lock(&ctx->files_mutex);

mk_list_foreach_safe(head, tmp, &ctx->fs->streams) {
fs_stream = mk_list_entry(head, struct flb_fstore_stream, _head);
if (fs_stream == ctx->stream_upload) {
continue;
}

mk_list_foreach_safe(f_head, tmp, &fs_stream->files) {
mk_list_foreach_safe(f_head, f_tmp, &fs_stream->files) {
fsf = mk_list_entry(f_head, struct flb_fstore_file, _head);
chunk = fsf->data;

Expand All @@ -450,12 +455,19 @@ static int ingest_all_chunks(struct flb_azure_kusto *ctx, struct flb_config *con
continue;
}

/* Mark chunk as locked before releasing mutex for I/O */
azure_kusto_store_file_lock(chunk);
pthread_mutex_unlock(&ctx->files_mutex);
Comment on lines +458 to +460

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Prevent use-after-free in ingest_all_chunks iteration

mk_list_foreach_safe caches f_tmp before the body runs, but the code immediately releases files_mutex (lines 458-460) to perform I/O and only re-acquires it later. While the mutex is unlocked, other paths that also lock files_mutex (flush/exit) can delete the next list node (which isn’t locked), leaving the cached f_tmp dangling; when the loop advances it dereferences freed memory, reintroducing the list-iteration crash this change is trying to fix.

Useful? React with 👍 / 👎.


ret = construct_request_buffer(ctx, NULL, chunk,
&buffer, &buffer_size);
if (ret < 0) {
flb_plg_error(ctx->ins,
"ingest_all_old_buffer_files :: Could not construct request buffer for %s",
chunk->file_path);
pthread_mutex_lock(&ctx->files_mutex);
azure_kusto_store_file_unlock(chunk);
pthread_mutex_unlock(&ctx->files_mutex);
return -1;
}

Expand All @@ -472,6 +484,9 @@ static int ingest_all_chunks(struct flb_azure_kusto *ctx, struct flb_config *con
"ingest_all_old_buffer_files :: cannot gzip payload");
flb_sds_destroy(payload);
flb_sds_destroy(tag_sds);
pthread_mutex_lock(&ctx->files_mutex);
azure_kusto_store_file_unlock(chunk);
pthread_mutex_unlock(&ctx->files_mutex);
return -1;
}
else {
Expand All @@ -487,17 +502,27 @@ static int ingest_all_chunks(struct flb_azure_kusto *ctx, struct flb_config *con
ret = azure_kusto_load_ingestion_resources(ctx, config);
if (ret != 0) {
flb_plg_error(ctx->ins, "ingest_all_old_buffer_files :: cannot load ingestion resources");
flb_sds_destroy(payload);
flb_sds_destroy(tag_sds);
if (is_compressed) {
flb_free(final_payload);
}
pthread_mutex_lock(&ctx->files_mutex);
azure_kusto_store_file_unlock(chunk);
pthread_mutex_unlock(&ctx->files_mutex);
return -1;
}

/* Call azure_kusto_queued_ingestion to ingest the payload */
ret = azure_kusto_queued_ingestion(ctx, tag_sds, flb_sds_len(tag_sds), final_payload, final_payload_size, chunk);
if (ret != 0) {
flb_plg_error(ctx->ins, "ingest_all_old_buffer_files :: Failed to ingest data to Azure Kusto");
pthread_mutex_lock(&ctx->files_mutex);
if (chunk){
azure_kusto_store_file_unlock(chunk);
chunk->failures += 1;
}
pthread_mutex_unlock(&ctx->files_mutex);
flb_sds_destroy(tag_sds);
flb_sds_destroy(payload);
if (is_compressed) {
Expand All @@ -512,11 +537,25 @@ static int ingest_all_chunks(struct flb_azure_kusto *ctx, struct flb_config *con
flb_free(final_payload);
}

/* data was sent successfully- delete the local buffer */
azure_kusto_store_file_cleanup(ctx, chunk);
/* data was sent successfully- cleanup the local buffer */
if (ctx->buffer_file_delete_early == FLB_TRUE) {
/* File was already unlocked and deleted in azure_kusto_queued_ingestion */
flb_plg_debug(ctx->ins, "ingest_all_chunks: buffer file already deleted (early delete mode)");
}
else {
/* Unlock and delete the local buffer */
azure_kusto_store_file_unlock(chunk);
azure_kusto_store_file_cleanup(ctx, chunk);
}

/* Re-acquire lock before next iteration */
pthread_mutex_lock(&ctx->files_mutex);
}
}

/* Release lock after iteration completes */
pthread_mutex_unlock(&ctx->files_mutex);

return 0;
}

Expand Down Expand Up @@ -573,6 +612,9 @@ static void cb_azure_kusto_ingest(struct flb_config *config, void *data)
flb_plg_debug(ctx->ins, "Running upload timer callback (scheduler_kusto_ingest)..");
now = time(NULL);

/* Lock to protect list iteration from concurrent modifications */
pthread_mutex_lock(&ctx->files_mutex);

/* Iterate over all files in the active stream */
mk_list_foreach_safe(head, tmp, &ctx->stream_active->files) {
fsf = mk_list_entry(head, struct flb_fstore_file, _head);
Expand All @@ -591,6 +633,12 @@ static void cb_azure_kusto_ingest(struct flb_config *config, void *data)
continue;
}

/* Mark file as locked before releasing mutex to prevent concurrent access */
azure_kusto_store_file_lock(file);

/* Release lock before I/O operations */
pthread_mutex_unlock(&ctx->files_mutex);

retry_count = 0;
backoff_time = 2; /* Initial backoff time in seconds */

Expand Down Expand Up @@ -703,6 +751,8 @@ static void cb_azure_kusto_ingest(struct flb_config *config, void *data)
}

/* Delete the file after successful ingestion */
/* Unlock file before delete so the delete helper can proceed */
azure_kusto_store_file_unlock(file);
ret = azure_kusto_store_file_delete(ctx, file);
if (ret == 0) {
flb_plg_debug(ctx->ins, "scheduler_kusto_ingest :: deleted successfully ingested file");
Expand Down Expand Up @@ -730,14 +780,23 @@ static void cb_azure_kusto_ingest(struct flb_config *config, void *data)
/* If the maximum number of retries is reached, log an error and move to the next file */
if (retry_count >= ctx->scheduler_max_retries) {
flb_plg_error(ctx->ins, "scheduler_kusto_ingest :: Max retries reached for file %s", file->fsf->name);
/* Unlock file before delete/inactive so the helper can proceed */
azure_kusto_store_file_unlock(file);
if (ctx->delete_on_max_upload_error){
azure_kusto_store_file_delete(ctx, file);
}
else {
azure_kusto_store_file_inactive(ctx, file);
}
}

/* Re-acquire lock before next iteration */
pthread_mutex_lock(&ctx->files_mutex);
}

/* Release lock after iteration completes */
pthread_mutex_unlock(&ctx->files_mutex);

/* Log the end of the upload timer callback */
flb_plg_debug(ctx->ins, "Exited upload timer callback (cb_azure_kusto_ingest)..");
}
Expand Down Expand Up @@ -911,6 +970,7 @@ static int cb_azure_kusto_init(struct flb_output_instance *ins, struct flb_confi
}

ctx->timer_created = FLB_FALSE;
ctx->upload_timer = NULL;
ctx->timer_ms = (int) (ctx->upload_timeout / 6) * 1000;
flb_plg_info(ctx->ins, "Using upload size %lu bytes", ctx->file_size);
}
Expand All @@ -928,6 +988,7 @@ static int cb_azure_kusto_init(struct flb_output_instance *ins, struct flb_confi
pthread_mutex_init(&ctx->token_mutex, NULL);
pthread_mutex_init(&ctx->resources_mutex, NULL);
pthread_mutex_init(&ctx->blob_mutex, NULL);
pthread_mutex_init(&ctx->files_mutex, NULL);

/*
* Create upstream context for Kusto Ingestion endpoint
Expand Down Expand Up @@ -1196,7 +1257,7 @@ static void flush_init(void *out_context, struct flb_config *config)
sched = flb_sched_ctx_get();

ret = flb_sched_timer_cb_create(sched, FLB_SCHED_TIMER_CB_PERM,
ctx->timer_ms, cb_azure_kusto_ingest, ctx, NULL);
ctx->timer_ms, cb_azure_kusto_ingest, ctx, &ctx->upload_timer);
if (ret == -1) {
flb_plg_error(ctx->ins, "Failed to create upload timer");
FLB_OUTPUT_RETURN(FLB_RETRY);
Expand Down Expand Up @@ -1275,6 +1336,8 @@ static void cb_azure_kusto_flush(struct flb_event_chunk *event_chunk,
if (upload_file != NULL && upload_file->failures >= ctx->scheduler_max_retries) {
flb_plg_warn(ctx->ins, "File with tag %s failed to send %d times, will not "
"retry", event_chunk->tag, ctx->scheduler_max_retries);
/* Unlock file before delete/inactive since those skip locked files */
azure_kusto_store_file_unlock(upload_file);
if (ctx->delete_on_max_upload_error){
azure_kusto_store_file_delete(ctx, upload_file);
}
Expand Down Expand Up @@ -1318,16 +1381,18 @@ static void cb_azure_kusto_flush(struct flb_event_chunk *event_chunk,
if (ret == 0){
if (ctx->buffering_enabled == FLB_TRUE && ctx->buffer_file_delete_early == FLB_TRUE){
flb_plg_debug(ctx->ins, "buffer file already deleted after blob creation");
/* File was already unlocked and deleted in ingest_to_kusto, do not access upload_file */
ret = FLB_OK;
goto cleanup;
}
else{
/* Unlock file before delete since delete skips locked files */
azure_kusto_store_file_unlock(upload_file);
ret = azure_kusto_store_file_delete(ctx, upload_file);
if (ret != 0){
/* File couldn't be deleted */
ret = FLB_RETRY;
if (upload_file){
azure_kusto_store_file_unlock(upload_file);
upload_file->failures += 1;
}
goto error;
Expand Down Expand Up @@ -1488,6 +1553,12 @@ static int cb_azure_kusto_exit(void *data, struct flb_config *config)
return -1;
}

/* Cancel upload timer first to prevent race with concurrent ingest callback */
if (ctx->upload_timer != NULL) {
flb_sched_timer_cb_destroy(ctx->upload_timer);
ctx->upload_timer = NULL;
}


if (ctx->buffering_enabled == FLB_TRUE){
if (azure_kusto_store_has_data(ctx) == FLB_TRUE) {
Expand All @@ -1508,6 +1579,7 @@ static int cb_azure_kusto_exit(void *data, struct flb_config *config)
pthread_mutex_destroy(&ctx->resources_mutex);
pthread_mutex_destroy(&ctx->token_mutex);
pthread_mutex_destroy(&ctx->blob_mutex);
pthread_mutex_destroy(&ctx->files_mutex);

flb_azure_kusto_conf_destroy(ctx);

Expand Down
4 changes: 4 additions & 0 deletions plugins/out_azure_kusto/azure_kusto.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ struct flb_azure_kusto {

int timer_created;
int timer_ms;
struct flb_sched_timer *upload_timer; /* timer handle for cancellation on exit */

/* mutex for acquiring oauth tokens */
pthread_mutex_t token_mutex;
Expand All @@ -141,6 +142,9 @@ struct flb_azure_kusto {

pthread_mutex_t buffer_mutex;

/* mutex protecting stream_active->files list from concurrent timer/delete races */
pthread_mutex_t files_mutex;

int buffering_enabled;

size_t file_size;
Expand Down
17 changes: 11 additions & 6 deletions plugins/out_azure_kusto/azure_kusto_ingest.c
Original file line number Diff line number Diff line change
Expand Up @@ -632,18 +632,23 @@ int azure_kusto_queued_ingestion(struct flb_azure_kusto *ctx, flb_sds_t tag,
blob_uri = azure_kusto_create_blob(ctx, blob_id, payload, payload_size);

if (blob_uri) {
if (ctx->buffering_enabled == FLB_TRUE && upload_file != NULL && ctx->buffer_file_delete_early == FLB_TRUE) {
flb_plg_debug(ctx->ins, "buffering enabled, ingest to blob successfully done and now deleting the buffer file %s", blob_id);
if (azure_kusto_store_file_delete(ctx, upload_file) != 0) {
flb_plg_error(ctx->ins, "blob creation successful but error deleting buffer file %s", blob_id);
}
}
ret = azure_kusto_enqueue_ingestion(ctx, blob_uri, payload_size);

if (ret != 0) {
flb_plg_error(ctx->ins, "failed to enqueue ingestion blob to queue");
ret = -1;
}
else {
/* Only delete file after successful queue - preserves data for retry on queue failure */
if (ctx->buffering_enabled == FLB_TRUE && upload_file != NULL && ctx->buffer_file_delete_early == FLB_TRUE) {
flb_plg_debug(ctx->ins, "queue succeeded, deleting buffer file %s", blob_id);
/* Unlock file before delete since delete skips locked files */
azure_kusto_store_file_unlock(upload_file);
if (azure_kusto_store_file_delete(ctx, upload_file) != 0) {
flb_plg_error(ctx->ins, "queue successful but error deleting buffer file %s", blob_id);
}
}
}

flb_sds_destroy(blob_uri);
}
Expand Down
Loading