Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions include/fluent-bit/flb_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ struct flb_task_enqueued {

int flb_task_running_count(struct flb_config *config);
int flb_task_running_print(struct flb_config *config);
int flb_task_map_get_task_id(struct flb_config *config);

struct flb_task *flb_task_create(uint64_t ref_id,
const char *buf,
Expand Down
15 changes: 15 additions & 0 deletions src/flb_engine_dispatch.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <fluent-bit/flb_engine.h>
#include <fluent-bit/flb_task.h>
#include <fluent-bit/flb_event.h>
#include <chunkio/chunkio.h>


/* It creates a new output thread using a 'Retry' context */
Expand Down Expand Up @@ -270,6 +271,14 @@ int flb_engine_dispatch(uint64_t id, struct flb_input_instance *in,
continue;
}

if (flb_task_map_get_task_id(config) == -1) {
/*
* There isn't a task available, no more chunks can have a task
* assigned.
*/
break;
}

/* There is a match, get the buffer */
buf_data = flb_input_chunk_flush(ic, &buf_size);
if (buf_size == 0) {
Expand Down Expand Up @@ -312,6 +321,12 @@ int flb_engine_dispatch(uint64_t id, struct flb_input_instance *in,
*/
if (t_err == FLB_TRUE) {
flb_input_chunk_release_lock(ic);

/*
* If the Storage type is 'filesystem' we need to put
* the file content down.
*/
flb_input_chunk_down(ic);
}
continue;
}
Expand Down
4 changes: 4 additions & 0 deletions src/flb_task.c
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,10 @@ int flb_task_running_print(struct flb_config *config)
return 0;
}

int flb_task_map_get_task_id(struct flb_config *config) {
return map_get_task_id(config);
}

/* Create an engine task to handle the output plugin flushing work */
struct flb_task *flb_task_create(uint64_t ref_id,
const char *buf,
Expand Down