Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions include/fluent-bit/flb_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ struct flb_config {
#ifdef FLB_HAVE_STREAM_PROCESSOR
char *stream_processor_file; /* SP configuration file */
void *stream_processor_ctx; /* SP context */
int stream_processor_str_conv; /* SP enable converting from string to number */

/*
* Temporal list to hold tasks defined before the SP context is created
Expand Down Expand Up @@ -307,6 +308,7 @@ enum conf_type {
#define FLB_CONF_STR_PARSERS_FILE "Parsers_File"
#define FLB_CONF_STR_PLUGINS_FILE "Plugins_File"
#define FLB_CONF_STR_STREAMS_FILE "Streams_File"
#define FLB_CONF_STR_STREAMS_STR_CONV "sp.convert_from_str_to_num"
#define FLB_CONF_STR_CONV_NAN "json.convert_nan_to_null"

/* FLB_HAVE_HTTP_SERVER */
Expand Down
2 changes: 1 addition & 1 deletion include/fluent-bit/stream_processor/flb_sp.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ int sp_process_data(const char *tag, int tag_len,
int sp_process_data_aggr(const char *buf_data, size_t buf_size,
const char *tag, int tag_len,
struct flb_sp_task *task,
struct flb_sp *sp);
struct flb_sp *sp, int convert_str_to_num);
void package_results(const char *tag, int tag_len,
char **out_buf, size_t *out_size,
struct flb_sp_task *task);
Expand Down
4 changes: 4 additions & 0 deletions src/flb_config.c
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,9 @@ struct flb_service_config service_configs[] = {
{FLB_CONF_STR_STREAMS_FILE,
FLB_CONF_TYPE_STR,
offsetof(struct flb_config, stream_processor_file)},
{FLB_CONF_STR_STREAMS_STR_CONV,
FLB_CONF_TYPE_BOOL,
offsetof(struct flb_config, stream_processor_str_conv)},
#endif

#ifdef FLB_HAVE_CHUNK_TRACE
Expand Down Expand Up @@ -273,6 +276,7 @@ struct flb_config *flb_config_init()

#ifdef FLB_HAVE_STREAM_PROCESSOR
flb_slist_create(&config->stream_processor_tasks);
config->stream_processor_str_conv = FLB_TRUE;
#endif

/* Set default coroutines stack size */
Expand Down
19 changes: 11 additions & 8 deletions src/stream_processor/flb_sp.c
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,8 @@ static int string_to_number(const char *str, int len, int64_t *i, double *d)
*
* This function aims to take care of strings representing a value too.
*/
static int object_to_number(msgpack_object obj, int64_t *i, double *d)
static int object_to_number(msgpack_object obj, int64_t *i, double *d,
int convert_str_to_num)
{
int ret;
int64_t i_out;
Expand All @@ -356,7 +357,7 @@ static int object_to_number(msgpack_object obj, int64_t *i, double *d)
*d = obj.via.f64;
return FLB_STR_FLOAT;
}
else if (obj.type == MSGPACK_OBJECT_STR) {
else if (obj.type == MSGPACK_OBJECT_STR && convert_str_to_num == FLB_TRUE) {
/* A numeric representation of a string should not exceed 19 chars */
if (obj.via.str.size > 19) {
return -1;
Expand Down Expand Up @@ -1220,7 +1221,8 @@ void package_results(const char *tag, int tag_len,
}

static struct aggregate_node * sp_process_aggregate_data(struct flb_sp_task *task,
msgpack_object map)
msgpack_object map,
int convert_str_to_num)
{
int i;
int ret;
Expand Down Expand Up @@ -1279,7 +1281,7 @@ static struct aggregate_node * sp_process_aggregate_data(struct flb_sp_task *tas
values_found++;

/* Convert string to number if that is possible */
ret = object_to_number(sval->o, &ival, &dval);
ret = object_to_number(sval->o, &ival, &dval, convert_str_to_num);
if (ret == -1) {
if (sval->o.type == MSGPACK_OBJECT_STR) {
gb_nums[key_id].type = FLB_SP_STRING;
Expand Down Expand Up @@ -1376,7 +1378,8 @@ static struct aggregate_node * sp_process_aggregate_data(struct flb_sp_task *tas
int sp_process_data_aggr(const char *buf_data, size_t buf_size,
const char *tag, int tag_len,
struct flb_sp_task *task,
struct flb_sp *sp)
struct flb_sp *sp,
int convert_str_to_num)
{
int i;
int ok;
Expand Down Expand Up @@ -1435,7 +1438,7 @@ int sp_process_data_aggr(const char *buf_data, size_t buf_size,
}
}

aggr_node = sp_process_aggregate_data(task, map);
aggr_node = sp_process_aggregate_data(task, map, convert_str_to_num);
if (!aggr_node)
{
continue;
Expand Down Expand Up @@ -1491,7 +1494,7 @@ int sp_process_data_aggr(const char *buf_data, size_t buf_size,
ival = 0;
dval = 0.0;
if (ckey->aggr_func != FLB_SP_NOP) {
ret = object_to_number(sval->o, &ival, &dval);
ret = object_to_number(sval->o, &ival, &dval, convert_str_to_num);
if (ret == -1) {
/* Value cannot be represented as a number */
key_id++;
Expand Down Expand Up @@ -1981,7 +1984,7 @@ int flb_sp_do(struct flb_sp *sp, struct flb_input_instance *in,
if (task->aggregate_keys == FLB_TRUE) {
ret = sp_process_data_aggr(buf_data, buf_size,
tag, tag_len,
task, sp);
task, sp, in->config->stream_processor_str_conv);

if (ret == -1) {
flb_error("[sp] error processing records for '%s'",
Expand Down
139 changes: 136 additions & 3 deletions tests/internal/stream_processor.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@
#include <fluent-bit/flb_pack.h>
#include <fluent-bit/flb_error.h>
#include <fluent-bit/flb_router.h>
#include <fluent-bit/flb_storage.h>
#include <fluent-bit/stream_processor/flb_sp.h>
#include <fluent-bit/stream_processor/flb_sp_parser.h>
#include <fluent-bit/stream_processor/flb_sp_stream.h>
#include <fluent-bit/stream_processor/flb_sp_window.h>
#include <msgpack.h>

#include "flb_tests_internal.h"
#include "include/sp_invalid_queries.h"
Expand Down Expand Up @@ -107,7 +109,7 @@ int flb_sp_do_test(struct flb_sp *sp, struct flb_sp_task *task,
if (task->aggregate_keys == FLB_TRUE) {
ret = sp_process_data_aggr(data_buf->buffer, data_buf->size,
tag, tag_len,
task, sp);
task, sp, FLB_TRUE);
if (ret == -1) {
flb_error("[sp] error error processing records for '%s'",
task->name);
Expand All @@ -119,8 +121,7 @@ int flb_sp_do_test(struct flb_sp *sp, struct flb_sp_task *task,
task->name);
return -1;
}

if (task->window.type == FLB_SP_WINDOW_DEFAULT) {
if (task->window.type == FLB_SP_WINDOW_DEFAULT || task->window.type == FLB_SP_WINDOW_TUMBLING) {
package_results(tag, tag_len, &out_buf->buffer, &out_buf->size, task);
}

Expand Down Expand Up @@ -744,11 +745,143 @@ static void test_snapshot()
#endif
}

static void test_conv_from_str_to_num()
{
struct flb_config *config = NULL;
struct flb_sp *sp = NULL;
struct flb_sp_task *task = NULL;
struct sp_buffer out_buf;
struct sp_buffer data_buf;
msgpack_sbuffer sbuf;
msgpack_packer pck;
msgpack_unpacked result;
size_t off = 0;
char json[4096] = {0};
int ret;

#ifdef _WIN32
WSADATA wsa_data;

WSAStartup(0x0201, &wsa_data);
#endif
out_buf.buffer = NULL;

config = flb_config_init();
config->evl = mk_event_loop_create(256);

ret = flb_storage_create(config);
if (!TEST_CHECK(ret == 0)) {
TEST_MSG("flb_storage_create failed");
flb_config_exit(config);
return;
}

sp = flb_sp_create(config);
if (!TEST_CHECK(sp != NULL)) {
TEST_MSG("[sp test] cannot create stream processor context");
goto test_conv_from_str_to_num_end;
}

task = flb_sp_task_create(sp, "tail.0", "CREATE STREAM test WITH (tag=\'test\') AS SELECT word, num, COUNT(*) FROM STREAM:tail.0 WINDOW TUMBLING (1 SECOND) GROUP BY word, num;");
if (!TEST_CHECK(task != NULL)) {
TEST_MSG("[sp test] wrong check 'conv', fix it!");
goto test_conv_from_str_to_num_end;
}

/* Create input data */
msgpack_sbuffer_init(&sbuf);
msgpack_packer_init(&pck, &sbuf, msgpack_sbuffer_write);
msgpack_pack_array(&pck, 2);
flb_pack_time_now(&pck);
msgpack_pack_map(&pck, 2);

msgpack_pack_str(&pck, 4);
msgpack_pack_str_body(&pck, "word", 4);
msgpack_pack_str(&pck, 4);
msgpack_pack_str_body(&pck, "hoge", 4);

msgpack_pack_str(&pck, 3);
msgpack_pack_str_body(&pck, "num", 3);
msgpack_pack_str(&pck, 6);
msgpack_pack_str_body(&pck, "123456", 6);

data_buf.buffer = sbuf.data;
data_buf.size = sbuf.size;

out_buf.buffer = NULL;
out_buf.size = 0;

/* Exec stream processor */
ret = flb_sp_do_test(sp, task, "tail.0", strlen("tail.0"), &data_buf, &out_buf);
if (!TEST_CHECK(ret == 0)) {
TEST_MSG("flb_sp_do_test failed");
msgpack_sbuffer_destroy(&sbuf);
goto test_conv_from_str_to_num_end;
}

if (!TEST_CHECK(out_buf.size > 0)) {
TEST_MSG("out_buf size is 0");
msgpack_sbuffer_destroy(&sbuf);
goto test_conv_from_str_to_num_end;
}


/* Check output buffer. It should contain a number 123456 not a string "123456" */

msgpack_unpacked_init(&result);
ret = msgpack_unpack_next(&result, out_buf.buffer, out_buf.size, &off);
if (!TEST_CHECK(ret == MSGPACK_UNPACK_SUCCESS)) {
TEST_MSG("failed to unpack ret=%d", ret);
msgpack_unpacked_destroy(&result);
msgpack_sbuffer_destroy(&sbuf);
goto test_conv_from_str_to_num_end;
}

ret = flb_msgpack_to_json(&json[0], sizeof(json), &result.data);
if (!TEST_CHECK(ret > 0)) {
TEST_MSG("flb_msgpack_to_json failed");
msgpack_unpacked_destroy(&result);
msgpack_sbuffer_destroy(&sbuf);
goto test_conv_from_str_to_num_end;
}

if (!TEST_CHECK(strstr(json,"123456") != NULL)) {
TEST_MSG("number not found");
msgpack_unpacked_destroy(&result);
msgpack_sbuffer_destroy(&sbuf);
goto test_conv_from_str_to_num_end;
}
if (!TEST_CHECK(strstr(json,"\"123456\"") == NULL)) {
TEST_MSG("output should be number type");
msgpack_unpacked_destroy(&result);
msgpack_sbuffer_destroy(&sbuf);
goto test_conv_from_str_to_num_end;
}

msgpack_unpacked_destroy(&result);
msgpack_sbuffer_destroy(&sbuf);

test_conv_from_str_to_num_end:
if (out_buf.buffer != NULL) {
flb_free(out_buf.buffer);
}

#ifdef _WIN32
WSACleanup();
#endif
if (sp != NULL) {
flb_sp_destroy(sp);
}
flb_storage_destroy(config);
flb_config_exit(config);
}

TEST_LIST = {
{ "invalid_queries", invalid_queries},
{ "select_keys", test_select_keys},
{ "select_subkeys", test_select_subkeys},
{ "window", test_window},
{ "snapshot", test_snapshot},
{ "conv_from_str_to_num", test_conv_from_str_to_num},
{ NULL }
};