diff --git a/plugins/filter_multiline/ml.c b/plugins/filter_multiline/ml.c index 41b1b8a4d64..ced8ec83739 100644 --- a/plugins/filter_multiline/ml.c +++ b/plugins/filter_multiline/ml.c @@ -176,7 +176,7 @@ static int flush_callback(struct flb_ml_parser *parser, /* Emit record with original tag */ flb_plg_trace(ctx->ins, "emitting from %s to %s", stream->input_name, stream->tag); ret = in_emitter_add_record(stream->tag, flb_sds_len(stream->tag), buf_data, buf_size, - ctx->ins_emitter); + ctx->ins_emitter, ctx->i_ins); return ret; } @@ -526,7 +526,8 @@ static void partial_timer_cb(struct flb_config *config, void *data) ret = in_emitter_add_record(packer->tag, flb_sds_len(packer->tag), packer->log_encoder.output_buffer, packer->log_encoder.output_length, - ctx->ins_emitter); + ctx->ins_emitter, + ctx->i_ins); if (ret < 0) { /* this shouldn't happen in normal execution */ flb_plg_warn(ctx->ins, @@ -741,6 +742,15 @@ static int cb_ml_filter(const void *data, size_t bytes, return FLB_FILTER_NOTOUCH; } + if (ctx->i_ins == NULL){ + ctx->i_ins = i_ins; + } + if (ctx->i_ins != i_ins) { + flb_plg_trace(ctx->ins, "input instance changed from %s to %s", + ctx->i_ins->name, i_ins->name); + ctx->i_ins = i_ins; + } + /* 'partial_message' mode */ if (ctx->partial_mode == FLB_TRUE) { return ml_filter_partial(data, bytes, tag, tag_len, diff --git a/plugins/filter_multiline/ml.h b/plugins/filter_multiline/ml.h index 59bf6c7e826..cae8fb64166 100644 --- a/plugins/filter_multiline/ml.h +++ b/plugins/filter_multiline/ml.h @@ -73,6 +73,7 @@ struct ml_ctx { size_t emitter_mem_buf_limit; /* Emitter buffer limit */ struct flb_input_instance *ins_emitter; /* emitter input plugin instance */ struct flb_config *config; /* Fluent Bit context */ + struct flb_input_instance *i_ins; /* Fluent Bit input instance (last used)*/ #ifdef FLB_HAVE_METRICS struct cmt_counter *cmt_emitted; @@ -82,6 +83,7 @@ struct ml_ctx { /* Register external function to emit records, check 'plugins/in_emitter' */ int in_emitter_add_record(const char *tag, int tag_len, const char *buf_data, size_t buf_size, - struct flb_input_instance *in); + struct flb_input_instance *in, + struct flb_input_instance *i_ins); #endif diff --git a/plugins/filter_rewrite_tag/rewrite_tag.c b/plugins/filter_rewrite_tag/rewrite_tag.c index 01b0f168fe2..c8bfe029350 100644 --- a/plugins/filter_rewrite_tag/rewrite_tag.c +++ b/plugins/filter_rewrite_tag/rewrite_tag.c @@ -355,7 +355,8 @@ static int ingest_inline(struct flb_rewrite_tag *ctx, */ static int process_record(const char *tag, int tag_len, msgpack_object map, const void *buf, size_t buf_size, int *keep, - struct flb_rewrite_tag *ctx, int *matched) + struct flb_rewrite_tag *ctx, int *matched, + struct flb_input_instance *i_ins) { int ret; flb_sds_t out_tag; @@ -404,7 +405,7 @@ static int process_record(const char *tag, int tag_len, msgpack_object map, if (!ret) { /* Emit record with new tag */ ret = in_emitter_add_record(out_tag, flb_sds_len(out_tag), buf, buf_size, - ctx->ins_emitter); + ctx->ins_emitter, i_ins); } else { ret = 0; @@ -489,7 +490,7 @@ static int cb_rewrite_tag_filter(const void *data, size_t bytes, * If a record was emitted, the variable 'keep' will define if the record must * be preserved or not. */ - is_emitted = process_record(tag, tag_len, map, (char *) data + pre, off - pre, &keep, ctx, &is_matched); + is_emitted = process_record(tag, tag_len, map, (char *) data + pre, off - pre, &keep, ctx, &is_matched, i_ins); if (is_emitted == FLB_TRUE) { /* A record with the new tag was emitted */ emitted_num++; diff --git a/plugins/filter_rewrite_tag/rewrite_tag.h b/plugins/filter_rewrite_tag/rewrite_tag.h index 11c0535fde1..d73b49f12eb 100644 --- a/plugins/filter_rewrite_tag/rewrite_tag.h +++ b/plugins/filter_rewrite_tag/rewrite_tag.h @@ -57,7 +57,8 @@ struct flb_rewrite_tag { /* Register external function to emit records, check 'plugins/in_emitter' */ int in_emitter_add_record(const char *tag, int tag_len, const char *buf_data, size_t buf_size, - struct flb_input_instance *in); + struct flb_input_instance *in, + struct flb_input_instance *i_ins); int in_emitter_get_collector_id(struct flb_input_instance *in); diff --git a/plugins/in_emitter/emitter.c b/plugins/in_emitter/emitter.c index 62886d1346c..8092a7954ee 100644 --- a/plugins/in_emitter/emitter.c +++ b/plugins/in_emitter/emitter.c @@ -31,6 +31,9 @@ #define DEFAULT_EMITTER_RING_BUFFER_FLUSH_FREQUENCY 2000 +/* return values */ +#define FLB_EMITTER_BUSY -2 + struct em_chunk { flb_sds_t tag; struct msgpack_sbuffer mp_sbuf; /* msgpack sbuffer */ @@ -38,11 +41,18 @@ struct em_chunk { struct mk_list _head; }; +struct input_ref { + struct flb_input_instance *i_ins; + struct mk_list _head; +}; + struct flb_emitter { + int coll_fd; /* collector id */ struct mk_list chunks; /* list of all pending chunks */ struct flb_input_instance *ins; /* input instance */ struct flb_ring_buffer *msgs; /* ring buffer for cross-thread messages */ int ring_buffer_size; /* size of the ring buffer */ + struct mk_list i_ins_list; /* instance list of linked/sending inputs */ }; struct em_chunk *em_chunk_create(const char *tag, int tag_len, @@ -85,6 +95,12 @@ int static do_in_emitter_add_record(struct em_chunk *ec, struct flb_emitter *ctx = (struct flb_emitter *) in->context; int ret; + if (flb_input_buf_paused(ctx->ins) == FLB_TRUE) { + flb_plg_debug(ctx->ins, "_emitter %s paused. Not processing records.", + ctx->ins->name); + return FLB_EMITTER_BUSY; + } + /* Associate this backlog chunk to this instance into the engine */ ret = flb_input_log_append(in, ec->tag, flb_sds_len(ec->tag), @@ -97,7 +113,6 @@ int static do_in_emitter_add_record(struct em_chunk *ec, em_chunk_destroy(ec); return -1; } - /* Release the echunk */ em_chunk_destroy(ec); return 0; } @@ -108,15 +123,51 @@ int static do_in_emitter_add_record(struct em_chunk *ec, */ int in_emitter_add_record(const char *tag, int tag_len, const char *buf_data, size_t buf_size, - struct flb_input_instance *in) + struct flb_input_instance *in, + struct flb_input_instance *i_ins) { struct em_chunk temporary_chunk; struct mk_list *head; + struct input_ref *i_ref; + bool ref_found; + struct mk_list *tmp; + struct em_chunk *ec; struct flb_emitter *ctx; ctx = (struct flb_emitter *) in->context; ec = NULL; + /* Iterate over list of already known (source) inputs */ + /* If new, add it to the list to be able to pause it later on */ + ref_found = false; + mk_list_foreach_safe(head, tmp, &ctx->i_ins_list) { + i_ref = mk_list_entry(head, struct input_ref, _head); + if(i_ref->i_ins == i_ins){ + ref_found = true; + break; + } + } + if (!ref_found) { + i_ref = flb_malloc(sizeof(struct input_ref)); + if (!i_ref) { + flb_errno(); + return FLB_FILTER_NOTOUCH; + } + i_ref->i_ins = i_ins; + mk_list_add(&i_ref->_head, &ctx->i_ins_list); + /* If in_emitter is paused, but new input plugin is not paused, pause it */ + if (flb_input_buf_paused(ctx->ins) == FLB_TRUE && + flb_input_buf_paused(i_ins) == FLB_FALSE) { + flb_input_pause(i_ins); + } + } + + + /* Restricted by mem_buf_limit */ + if (flb_input_buf_paused(ctx->ins) == FLB_TRUE) { + flb_plg_debug(ctx->ins, "emitter memory buffer limit reached. Not accepting record."); + return FLB_EMITTER_BUSY; + } /* Use the ring buffer first if it exists */ if (ctx->msgs) { @@ -161,8 +212,7 @@ int in_emitter_add_record(const char *tag, int tag_len, /* Append raw msgpack data */ msgpack_sbuffer_write(&ec->mp_sbuf, buf_data, buf_size); - - return do_in_emitter_add_record(ec, in); + return 0; } /* @@ -191,6 +241,34 @@ static int in_emitter_ingest_ring_buffer(struct flb_input_instance *in, return ret; } +static int cb_queue_chunks(struct flb_input_instance *in, + struct flb_config *config, void *data) +{ + int ret; + struct mk_list *tmp; + struct mk_list *head; + struct em_chunk *echunk; + struct flb_emitter *ctx; + + /* Get context */ + ctx = (struct flb_emitter *) data; + + /* Try to enqueue chunks under our limits */ + mk_list_foreach_safe(head, tmp, &ctx->chunks) { + echunk = mk_list_entry(head, struct em_chunk, _head); + + /* Associate this backlog chunk to this instance into the engine */ + ret = do_in_emitter_add_record(echunk, in); + if (ret == -1) { + flb_error("[in_emitter] error registering chunk with tag: %s", + echunk->tag); + continue; + } + } + + return 0; +} + static int in_emitter_start_ring_buffer(struct flb_input_instance *in, struct flb_emitter *ctx) { if (ctx->ring_buffer_size <= 0) { @@ -232,6 +310,8 @@ static int cb_emitter_init(struct flb_input_instance *in, ctx->ins = in; mk_list_init(&ctx->chunks); + mk_list_init(&ctx->i_ins_list); + ret = flb_input_config_map_set(in, (void *) ctx); if (ret == -1) { @@ -257,6 +337,15 @@ static int cb_emitter_init(struct flb_input_instance *in, return -1; } } + else{ + ret = flb_input_set_collector_time(in, cb_queue_chunks, 0, 25000000, config); + if (ret < 0) { + flb_error("[in_emitter] could not create collector"); + flb_free(ctx); + return -1; + } + ctx->coll_fd = ret; + } /* export plugin context */ flb_input_set_context(in, ctx); @@ -264,6 +353,36 @@ static int cb_emitter_init(struct flb_input_instance *in, return 0; } +static void cb_emitter_pause(void *data, struct flb_config *config) +{ + struct flb_emitter *ctx = data; + struct mk_list *tmp; + struct mk_list *head; + struct input_ref *i_ref; + + /* Pause all known senders */ + flb_input_collector_pause(ctx->coll_fd, ctx->ins); + mk_list_foreach_safe(head, tmp, &ctx->i_ins_list) { + i_ref = mk_list_entry(head, struct input_ref, _head); + flb_input_pause(i_ref->i_ins); + } +} + +static void cb_emitter_resume(void *data, struct flb_config *config) +{ + struct flb_emitter *ctx = data; + struct mk_list *tmp; + struct mk_list *head; + struct input_ref *i_ref; + + /* Resume all known senders */ + flb_input_collector_resume(ctx->coll_fd, ctx->ins); + mk_list_foreach_safe(head, tmp, &ctx->i_ins_list) { + i_ref = mk_list_entry(head, struct input_ref, _head); + flb_input_resume(i_ref->i_ins); + } +} + static int cb_emitter_exit(void *data, struct flb_config *config) { struct mk_list *tmp; @@ -271,9 +390,9 @@ static int cb_emitter_exit(void *data, struct flb_config *config) struct flb_emitter *ctx = data; struct em_chunk *echunk; struct em_chunk ec; + struct input_ref *i_ref; int ret; - mk_list_foreach_safe(head, tmp, &ctx->chunks) { echunk = mk_list_entry(head, struct em_chunk, _head); mk_list_del(&echunk->_head); @@ -289,6 +408,13 @@ static int cb_emitter_exit(void *data, struct flb_config *config) flb_ring_buffer_destroy(ctx->msgs); } + mk_list_foreach_safe(head,tmp, &ctx->i_ins_list) { + i_ref = mk_list_entry(head, struct input_ref, _head); + mk_list_del(&i_ref->_head); + flb_free(i_ref); + } + + flb_free(ctx); return 0; } @@ -312,8 +438,8 @@ struct flb_input_plugin in_emitter_plugin = { .cb_ingest = NULL, .cb_flush_buf = NULL, .config_map = config_map, - .cb_pause = NULL, - .cb_resume = NULL, + .cb_pause = cb_emitter_pause, + .cb_resume = cb_emitter_resume, .cb_exit = cb_emitter_exit, /* This plugin can only be configured and invoked by the Engine only */ diff --git a/src/flb_input.c b/src/flb_input.c index a990a9d2805..7b614ccdb44 100644 --- a/src/flb_input.c +++ b/src/flb_input.c @@ -1729,6 +1729,7 @@ int flb_input_resume(struct flb_input_instance *ins) flb_input_thread_instance_resume(ins); } else { + flb_info("[input] resume %s", flb_input_name(ins)); ins->p->cb_resume(ins->context, ins->config); } } diff --git a/tests/runtime/filter_multiline.c b/tests/runtime/filter_multiline.c index 18253a5b2c7..ed6ffb6b7cb 100644 --- a/tests/runtime/filter_multiline.c +++ b/tests/runtime/filter_multiline.c @@ -2,6 +2,7 @@ #include #include +#include #include "flb_tests_runtime.h" struct filter_test { @@ -120,7 +121,34 @@ static int cb_check_str_list(void *record, size_t size, void *data) return 0; } +void wait_with_timeout(uint32_t timeout_ms, int *output_num, int expected) +{ + struct flb_time start_time; + struct flb_time end_time; + struct flb_time diff_time; + uint64_t elapsed_time_flb = 0; + + flb_time_get(&start_time); + + while (true) { + *output_num = get_output_num(); + + if (*output_num == expected) { + break; + } + + flb_time_msleep(100); + flb_time_get(&end_time); + flb_time_diff(&end_time, &start_time, &diff_time); + elapsed_time_flb = flb_time_to_nanosec(&diff_time) / 1000000; + if (elapsed_time_flb > timeout_ms) { + flb_warn("[timeout] elapsed_time: %ld", elapsed_time_flb); + // Reached timeout. + break; + } + } +} static struct filter_test *filter_test_create(struct flb_lib_out_cb *data) { @@ -682,6 +710,100 @@ static void flb_test_ml_buffered_16_streams() filter_test_destroy(ctx); } +/* This test will test the pausing of in_emitter */ +static void flb_test_ml_buffered_16_streams_pausing() +{ + struct flb_lib_out_cb cb_data; + struct filter_test *ctx; + int i_ffds[16] = {0}; + int ffd_num = sizeof(i_ffds)/sizeof(int); + int ret; + int i; + int j; + int bytes; + int len; + char line_buf[2048] = {0}; + char tag_buf[32] = {0}; + int line_num; + int num; + + char *expected_strs[] = {"Exception in thread main java.lang.IllegalStateException: ..null property\\n at com.example.myproject.Author.getBookIds(xx.java:38)\\n at com.example.myproject.Bootstrap.main(Bootstrap.java:14)\\nCaused by: java.lang.NullPointerException\\n at com.example.myproject.Book.getId(Book.java:22)\\n at com.example.myproject.Author.getBookIds(Author.java:35)\\n ... 1 more"}; + + struct str_list expected = { + .size = sizeof(expected_strs)/sizeof(char*), + .lists = &expected_strs[0], + .ignore_min_line_num = 64, + }; + + char *ml_logs[] = {"Exception in thread main java.lang.IllegalStateException: ..null property", + " at com.example.myproject.Author.getBookIds(xx.java:38)", + " at com.example.myproject.Bootstrap.main(Bootstrap.java:14)", + "Caused by: java.lang.NullPointerException", + " at com.example.myproject.Book.getId(Book.java:22)", + " at com.example.myproject.Author.getBookIds(Author.java:35)", + " ... 1 more", + "single line"}; + + cb_data.cb = cb_check_str_list; + cb_data.data = (void *)&expected; + + clear_output_num(); + + line_num = sizeof(ml_logs)/sizeof(char*); + + /* Create test context */ + ctx = filter_test_create((void *) &cb_data); + if (!ctx) { + exit(EXIT_FAILURE); + } + flb_service_set(ctx->flb, + "Flush", "0.100000000", + "Grace", "2", + NULL); + + i_ffds[0] = ctx->i_ffd; + for (i=1; iflb, (char *) "lib", NULL); + TEST_CHECK(i_ffds[i] >= 0); + sprintf(&tag_buf[0], "test%d", i); + flb_input_set(ctx->flb, i_ffds[i], "tag", tag_buf, NULL); + } + + /* Configure filter */ + /* Set mem_buf_limit small, so in_emitter will be paused */ + ret = flb_filter_set(ctx->flb, ctx->f_ffd, + "multiline.key_content", "log", + "multiline.parser", "java", + "buffer", "on", + "debug_flush", "on", + "emitter_mem_buf_limit", "1k", + NULL); + TEST_CHECK(ret == 0); + + + /* Start the engine */ + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + for (i=0; iflb, i_ffds[j], &line_buf[0], len); + TEST_CHECK(bytes == len); + } + } + wait_with_timeout(20000, &num, ffd_num); + + if (!TEST_CHECK(num > 0)) { + TEST_MSG("output error. got %d expect more than 0 records.", num); + /* The internal flb_lib_push cannot be paused, so records may be lost */ + /* However, there should be at least some records */ + } + + filter_test_destroy(ctx); +} + @@ -695,5 +817,7 @@ TEST_LIST = { {"flb_test_multiline_partial_message_concat" , flb_test_multiline_partial_message_concat }, {"flb_test_multiline_partial_message_concat_two_ids" , flb_test_multiline_partial_message_concat_two_ids }, + + {"ml_buffered_16_streams_pausing" , flb_test_ml_buffered_16_streams_pausing }, {NULL, NULL} };