diff --git a/include/fluent-bit/flb_output.h b/include/fluent-bit/flb_output.h index 3fab2576d62..eab0c983701 100644 --- a/include/fluent-bit/flb_output.h +++ b/include/fluent-bit/flb_output.h @@ -627,6 +627,7 @@ struct flb_output_flush *flb_output_flush_create(struct flb_task *task, struct cmt *metrics_context; struct ctrace *trace_context; size_t chunk_offset; + struct cmt *cmt_out_context = NULL; /* Custom output coroutine info */ out_flush = (struct flb_output_flush *) flb_calloc(1, sizeof(struct flb_output_flush)); @@ -715,13 +716,25 @@ struct flb_output_flush *flb_output_flush_create(struct flb_task *task, flb_sds_len(evc->tag), (char *) metrics_context, 0, - NULL, + (void **)&cmt_out_context, NULL); if (ret == 0) { - ret = cmt_encode_msgpack_create(metrics_context, - &serialized_context_buffer, - &serialized_context_size); + if (cmt_out_context != NULL) { + ret = cmt_encode_msgpack_create(cmt_out_context, + &serialized_context_buffer, + &serialized_context_size); + + if (cmt_out_context != metrics_context) { + cmt_destroy(cmt_out_context); + } + + } + else { + ret = cmt_encode_msgpack_create(metrics_context, + &serialized_context_buffer, + &serialized_context_size); + } cmt_destroy(metrics_context); diff --git a/tests/runtime/processor_metrics_selector.c b/tests/runtime/processor_metrics_selector.c index f25e9751a58..78f64e83b14 100644 --- a/tests/runtime/processor_metrics_selector.c +++ b/tests/runtime/processor_metrics_selector.c @@ -549,6 +549,70 @@ void flb_test_selector_substring_exclude(void) flb_destroy(ctx); } +void flb_test_selector_can_modify_output(void) +{ + int ret; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + struct flb_processor *proc; + struct flb_processor_unit *pu; + struct cfl_variant var = { + .type = CFL_VARIANT_STRING, + .data.as_string = "/kubernetes/", + }; + struct cfl_variant action = { + .type = CFL_VARIANT_STRING, + .data.as_string = "include", + }; + + ctx = flb_create(); + flb_service_set(ctx, + "Flush", "0.200000000", + "Grace", "2", + NULL); + + proc = flb_processor_create(ctx->config, "unit_test", NULL, 0); + TEST_CHECK(proc != NULL); + + pu = flb_processor_unit_create(proc, FLB_PROCESSOR_METRICS, "metrics_selector"); + TEST_CHECK(pu != NULL); + ret = flb_processor_unit_set_property(pu, "metric_name", &var); + TEST_CHECK(ret == 0); + ret = flb_processor_unit_set_property(pu, "action", &action); + TEST_CHECK(ret == 0); + + /* Input */ + in_ffd = flb_input(ctx, (char *) "event_type", NULL); + TEST_CHECK(in_ffd >= 0); + ret = flb_input_set(ctx, in_ffd, "tag", "test", NULL); + TEST_CHECK(ret == 0); + ret = flb_input_set(ctx, in_ffd, "type", "metrics", NULL); + TEST_CHECK(ret == 0); + ret = flb_input_set(ctx, in_ffd, "interval_sec", "1", NULL); + TEST_CHECK(ret == 0); + + out_ffd = flb_output(ctx, (char *) "stdout", NULL); + TEST_CHECK(out_ffd >= 0); + ret = flb_output_set(ctx, out_ffd, "match", "test", NULL); + TEST_CHECK(ret == 0); + ret = flb_output_set(ctx, out_ffd, "format", "msgpack", NULL); + TEST_CHECK(ret == 0); + + /* set up processor */ + ret = flb_output_set_processor(ctx, out_ffd, proc); + TEST_CHECK(ret == 0); + + clear_output_num(); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + flb_time_msleep(1500); /* waiting flush */ + + flb_stop(ctx); + flb_destroy(ctx); +} #endif /* Test list */ @@ -560,6 +624,7 @@ TEST_LIST = { {"prefix_exclude", flb_test_selector_prefix_exclude}, {"substring_include", flb_test_selector_substring_include}, {"substring_exclude", flb_test_selector_substring_exclude}, + {"can_modify_output", flb_test_selector_can_modify_output}, #endif {NULL, NULL} };