From 50c90858b695b68f45d5b17a5c9b023e468c1434 Mon Sep 17 00:00:00 2001 From: Takahiro Yamashita Date: Sun, 5 Mar 2023 16:11:18 +0900 Subject: [PATCH 1/3] config: support sp.convert_from_str_to_num Signed-off-by: Takahiro Yamashita --- include/fluent-bit/flb_config.h | 2 ++ src/flb_config.c | 4 ++++ 2 files changed, 6 insertions(+) diff --git a/include/fluent-bit/flb_config.h b/include/fluent-bit/flb_config.h index f46faeda30c..bf9e5ffff03 100644 --- a/include/fluent-bit/flb_config.h +++ b/include/fluent-bit/flb_config.h @@ -235,6 +235,7 @@ struct flb_config { #ifdef FLB_HAVE_STREAM_PROCESSOR char *stream_processor_file; /* SP configuration file */ void *stream_processor_ctx; /* SP context */ + int stream_processor_str_conv; /* SP enable converting from string to number */ /* * Temporal list to hold tasks defined before the SP context is created @@ -307,6 +308,7 @@ enum conf_type { #define FLB_CONF_STR_PARSERS_FILE "Parsers_File" #define FLB_CONF_STR_PLUGINS_FILE "Plugins_File" #define FLB_CONF_STR_STREAMS_FILE "Streams_File" +#define FLB_CONF_STR_STREAMS_STR_CONV "sp.convert_from_str_to_num" #define FLB_CONF_STR_CONV_NAN "json.convert_nan_to_null" /* FLB_HAVE_HTTP_SERVER */ diff --git a/src/flb_config.c b/src/flb_config.c index 378856aa16a..3eba31169e2 100644 --- a/src/flb_config.c +++ b/src/flb_config.c @@ -164,6 +164,9 @@ struct flb_service_config service_configs[] = { {FLB_CONF_STR_STREAMS_FILE, FLB_CONF_TYPE_STR, offsetof(struct flb_config, stream_processor_file)}, + {FLB_CONF_STR_STREAMS_STR_CONV, + FLB_CONF_TYPE_BOOL, + offsetof(struct flb_config, stream_processor_str_conv)}, #endif #ifdef FLB_HAVE_CHUNK_TRACE @@ -273,6 +276,7 @@ struct flb_config *flb_config_init() #ifdef FLB_HAVE_STREAM_PROCESSOR flb_slist_create(&config->stream_processor_tasks); + config->stream_processor_str_conv = FLB_TRUE; #endif /* Set default coroutines stack size */ From e209745f5e4c78ed66c049792025f30f3a6e3bf2 Mon Sep 17 00:00:00 2001 From: Takahiro Yamashita Date: Sun, 12 Mar 2023 08:49:23 +0900 Subject: [PATCH 2/3] sp: prevent converting from string to num if config is set Signed-off-by: Takahiro Yamashita --- include/fluent-bit/stream_processor/flb_sp.h | 2 +- src/stream_processor/flb_sp.c | 19 +++++++++++-------- 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/include/fluent-bit/stream_processor/flb_sp.h b/include/fluent-bit/stream_processor/flb_sp.h index eacc929fe35..455909ab1d3 100644 --- a/include/fluent-bit/stream_processor/flb_sp.h +++ b/include/fluent-bit/stream_processor/flb_sp.h @@ -171,7 +171,7 @@ int sp_process_data(const char *tag, int tag_len, int sp_process_data_aggr(const char *buf_data, size_t buf_size, const char *tag, int tag_len, struct flb_sp_task *task, - struct flb_sp *sp); + struct flb_sp *sp, int convert_str_to_num); void package_results(const char *tag, int tag_len, char **out_buf, size_t *out_size, struct flb_sp_task *task); diff --git a/src/stream_processor/flb_sp.c b/src/stream_processor/flb_sp.c index 21fa852e97b..bd04d9e26b8 100644 --- a/src/stream_processor/flb_sp.c +++ b/src/stream_processor/flb_sp.c @@ -339,7 +339,8 @@ static int string_to_number(const char *str, int len, int64_t *i, double *d) * * This function aims to take care of strings representing a value too. */ -static int object_to_number(msgpack_object obj, int64_t *i, double *d) +static int object_to_number(msgpack_object obj, int64_t *i, double *d, + int convert_str_to_num) { int ret; int64_t i_out; @@ -356,7 +357,7 @@ static int object_to_number(msgpack_object obj, int64_t *i, double *d) *d = obj.via.f64; return FLB_STR_FLOAT; } - else if (obj.type == MSGPACK_OBJECT_STR) { + else if (obj.type == MSGPACK_OBJECT_STR && convert_str_to_num == FLB_TRUE) { /* A numeric representation of a string should not exceed 19 chars */ if (obj.via.str.size > 19) { return -1; @@ -1220,7 +1221,8 @@ void package_results(const char *tag, int tag_len, } static struct aggregate_node * sp_process_aggregate_data(struct flb_sp_task *task, - msgpack_object map) + msgpack_object map, + int convert_str_to_num) { int i; int ret; @@ -1279,7 +1281,7 @@ static struct aggregate_node * sp_process_aggregate_data(struct flb_sp_task *tas values_found++; /* Convert string to number if that is possible */ - ret = object_to_number(sval->o, &ival, &dval); + ret = object_to_number(sval->o, &ival, &dval, convert_str_to_num); if (ret == -1) { if (sval->o.type == MSGPACK_OBJECT_STR) { gb_nums[key_id].type = FLB_SP_STRING; @@ -1376,7 +1378,8 @@ static struct aggregate_node * sp_process_aggregate_data(struct flb_sp_task *tas int sp_process_data_aggr(const char *buf_data, size_t buf_size, const char *tag, int tag_len, struct flb_sp_task *task, - struct flb_sp *sp) + struct flb_sp *sp, + int convert_str_to_num) { int i; int ok; @@ -1435,7 +1438,7 @@ int sp_process_data_aggr(const char *buf_data, size_t buf_size, } } - aggr_node = sp_process_aggregate_data(task, map); + aggr_node = sp_process_aggregate_data(task, map, convert_str_to_num); if (!aggr_node) { continue; @@ -1491,7 +1494,7 @@ int sp_process_data_aggr(const char *buf_data, size_t buf_size, ival = 0; dval = 0.0; if (ckey->aggr_func != FLB_SP_NOP) { - ret = object_to_number(sval->o, &ival, &dval); + ret = object_to_number(sval->o, &ival, &dval, convert_str_to_num); if (ret == -1) { /* Value cannot be represented as a number */ key_id++; @@ -1981,7 +1984,7 @@ int flb_sp_do(struct flb_sp *sp, struct flb_input_instance *in, if (task->aggregate_keys == FLB_TRUE) { ret = sp_process_data_aggr(buf_data, buf_size, tag, tag_len, - task, sp); + task, sp, in->config->stream_processor_str_conv); if (ret == -1) { flb_error("[sp] error processing records for '%s'", From 5f9d20b142b7d95a93b195d22183c84208024e72 Mon Sep 17 00:00:00 2001 From: Takahiro Yamashita Date: Sun, 19 Mar 2023 08:52:28 +0900 Subject: [PATCH 3/3] tests: internal: sp: add test case for conv_from_str_to_num Signed-off-by: Takahiro Yamashita --- tests/internal/stream_processor.c | 139 +++++++++++++++++++++++++++++- 1 file changed, 136 insertions(+), 3 deletions(-) diff --git a/tests/internal/stream_processor.c b/tests/internal/stream_processor.c index 1d77edf9cce..e8635f38cde 100644 --- a/tests/internal/stream_processor.c +++ b/tests/internal/stream_processor.c @@ -23,10 +23,12 @@ #include #include #include +#include #include #include #include #include +#include #include "flb_tests_internal.h" #include "include/sp_invalid_queries.h" @@ -107,7 +109,7 @@ int flb_sp_do_test(struct flb_sp *sp, struct flb_sp_task *task, if (task->aggregate_keys == FLB_TRUE) { ret = sp_process_data_aggr(data_buf->buffer, data_buf->size, tag, tag_len, - task, sp); + task, sp, FLB_TRUE); if (ret == -1) { flb_error("[sp] error error processing records for '%s'", task->name); @@ -119,8 +121,7 @@ int flb_sp_do_test(struct flb_sp *sp, struct flb_sp_task *task, task->name); return -1; } - - if (task->window.type == FLB_SP_WINDOW_DEFAULT) { + if (task->window.type == FLB_SP_WINDOW_DEFAULT || task->window.type == FLB_SP_WINDOW_TUMBLING) { package_results(tag, tag_len, &out_buf->buffer, &out_buf->size, task); } @@ -744,11 +745,143 @@ static void test_snapshot() #endif } +static void test_conv_from_str_to_num() +{ + struct flb_config *config = NULL; + struct flb_sp *sp = NULL; + struct flb_sp_task *task = NULL; + struct sp_buffer out_buf; + struct sp_buffer data_buf; + msgpack_sbuffer sbuf; + msgpack_packer pck; + msgpack_unpacked result; + size_t off = 0; + char json[4096] = {0}; + int ret; + +#ifdef _WIN32 + WSADATA wsa_data; + + WSAStartup(0x0201, &wsa_data); +#endif + out_buf.buffer = NULL; + + config = flb_config_init(); + config->evl = mk_event_loop_create(256); + + ret = flb_storage_create(config); + if (!TEST_CHECK(ret == 0)) { + TEST_MSG("flb_storage_create failed"); + flb_config_exit(config); + return; + } + + sp = flb_sp_create(config); + if (!TEST_CHECK(sp != NULL)) { + TEST_MSG("[sp test] cannot create stream processor context"); + goto test_conv_from_str_to_num_end; + } + + task = flb_sp_task_create(sp, "tail.0", "CREATE STREAM test WITH (tag=\'test\') AS SELECT word, num, COUNT(*) FROM STREAM:tail.0 WINDOW TUMBLING (1 SECOND) GROUP BY word, num;"); + if (!TEST_CHECK(task != NULL)) { + TEST_MSG("[sp test] wrong check 'conv', fix it!"); + goto test_conv_from_str_to_num_end; + } + + /* Create input data */ + msgpack_sbuffer_init(&sbuf); + msgpack_packer_init(&pck, &sbuf, msgpack_sbuffer_write); + msgpack_pack_array(&pck, 2); + flb_pack_time_now(&pck); + msgpack_pack_map(&pck, 2); + + msgpack_pack_str(&pck, 4); + msgpack_pack_str_body(&pck, "word", 4); + msgpack_pack_str(&pck, 4); + msgpack_pack_str_body(&pck, "hoge", 4); + + msgpack_pack_str(&pck, 3); + msgpack_pack_str_body(&pck, "num", 3); + msgpack_pack_str(&pck, 6); + msgpack_pack_str_body(&pck, "123456", 6); + + data_buf.buffer = sbuf.data; + data_buf.size = sbuf.size; + + out_buf.buffer = NULL; + out_buf.size = 0; + + /* Exec stream processor */ + ret = flb_sp_do_test(sp, task, "tail.0", strlen("tail.0"), &data_buf, &out_buf); + if (!TEST_CHECK(ret == 0)) { + TEST_MSG("flb_sp_do_test failed"); + msgpack_sbuffer_destroy(&sbuf); + goto test_conv_from_str_to_num_end; + } + + if (!TEST_CHECK(out_buf.size > 0)) { + TEST_MSG("out_buf size is 0"); + msgpack_sbuffer_destroy(&sbuf); + goto test_conv_from_str_to_num_end; + } + + + /* Check output buffer. It should contain a number 123456 not a string "123456" */ + + msgpack_unpacked_init(&result); + ret = msgpack_unpack_next(&result, out_buf.buffer, out_buf.size, &off); + if (!TEST_CHECK(ret == MSGPACK_UNPACK_SUCCESS)) { + TEST_MSG("failed to unpack ret=%d", ret); + msgpack_unpacked_destroy(&result); + msgpack_sbuffer_destroy(&sbuf); + goto test_conv_from_str_to_num_end; + } + + ret = flb_msgpack_to_json(&json[0], sizeof(json), &result.data); + if (!TEST_CHECK(ret > 0)) { + TEST_MSG("flb_msgpack_to_json failed"); + msgpack_unpacked_destroy(&result); + msgpack_sbuffer_destroy(&sbuf); + goto test_conv_from_str_to_num_end; + } + + if (!TEST_CHECK(strstr(json,"123456") != NULL)) { + TEST_MSG("number not found"); + msgpack_unpacked_destroy(&result); + msgpack_sbuffer_destroy(&sbuf); + goto test_conv_from_str_to_num_end; + } + if (!TEST_CHECK(strstr(json,"\"123456\"") == NULL)) { + TEST_MSG("output should be number type"); + msgpack_unpacked_destroy(&result); + msgpack_sbuffer_destroy(&sbuf); + goto test_conv_from_str_to_num_end; + } + + msgpack_unpacked_destroy(&result); + msgpack_sbuffer_destroy(&sbuf); + + test_conv_from_str_to_num_end: + if (out_buf.buffer != NULL) { + flb_free(out_buf.buffer); + } + +#ifdef _WIN32 + WSACleanup(); +#endif + if (sp != NULL) { + flb_sp_destroy(sp); + } + flb_storage_destroy(config); + flb_config_exit(config); +} + TEST_LIST = { { "invalid_queries", invalid_queries}, { "select_keys", test_select_keys}, { "select_subkeys", test_select_subkeys}, { "window", test_window}, { "snapshot", test_snapshot}, + { "conv_from_str_to_num", test_conv_from_str_to_num}, { NULL } };