diff --git a/plugins/in_tail/tail.c b/plugins/in_tail/tail.c index a1fa86b0a0f..6fcad289c40 100644 --- a/plugins/in_tail/tail.c +++ b/plugins/in_tail/tail.c @@ -405,6 +405,13 @@ static struct flb_config_map config_map[] = { "wait period time in seconds to flush queued unfinished split lines." }, +#ifdef FLB_HAVE_REGEX + { + FLB_CONFIG_MAP_STR, "docker_mode_parser", NULL, + 0, FLB_FALSE, 0, + "specify the parser name to fetch log first line for muliline log" + }, +#endif { FLB_CONFIG_MAP_STR, "path_key", NULL, 0, FLB_TRUE, offsetof(struct flb_tail_config, path_key), diff --git a/plugins/in_tail/tail_config.h b/plugins/in_tail/tail_config.h index c479202458a..a5f24bafe7a 100644 --- a/plugins/in_tail/tail_config.h +++ b/plugins/in_tail/tail_config.h @@ -98,6 +98,7 @@ struct flb_tail_config { /* Docker mode */ int docker_mode; /* Docker mode enabled ? */ int docker_mode_flush; /* Docker mode flush/wait */ + struct flb_parser *docker_mode_parser; /* Parser for separate multiline logs */ /* Lists head for files consumed statically (read) and by events (inotify) */ struct mk_list files_static; diff --git a/plugins/in_tail/tail_dockermode.c b/plugins/in_tail/tail_dockermode.c index a24de1a3032..10f5c47b215 100644 --- a/plugins/in_tail/tail_dockermode.c +++ b/plugins/in_tail/tail_dockermode.c @@ -39,6 +39,20 @@ int flb_tail_dmode_create(struct flb_tail_config *ctx, return -1; } +#ifdef FLB_HAVE_REGEX + /* First line Parser */ + tmp = flb_input_get_property("docker_mode_parser", ins); + if (tmp) { + ctx->docker_mode_parser = flb_parser_get(tmp, config); + if (!ctx->docker_mode_parser) { + flb_plg_error(ctx->ins, "parser '%s' is not registered", tmp); + } + } + else { + ctx->docker_mode_parser = NULL; + } +#endif + tmp = flb_input_get_property("docker_mode_flush", ins); if (!tmp) { ctx->docker_mode_flush = FLB_TAIL_DMODE_FLUSH; @@ -224,11 +238,38 @@ int flb_tail_dmode_process_content(time_t now, char* line, size_t line_len, char **repl_line, size_t *repl_line_len, struct flb_tail_file *file, - struct flb_tail_config *ctx) + struct flb_tail_config *ctx, + msgpack_sbuffer *mp_sbuf, msgpack_packer *mp_pck + ) { char* val = NULL; size_t val_len; int ret; + void *out_buf = NULL; + size_t out_size; + struct flb_time out_time = {0}; + *repl_line = NULL; + *repl_line_len = 0; + flb_sds_t tmp; + flb_sds_t tmp_copy; + +#ifdef FLB_HAVE_REGEX + if (flb_sds_len(file->dmode_lastline) > 0 && file->dmode_complete) { + if (ctx->docker_mode_parser) { + ret = flb_parser_do(ctx->docker_mode_parser, line, line_len, + &out_buf, &out_size, &out_time); + flb_free(out_buf); + + /* + * Buffered log should be flushed out + * as current line meets first-line requirement + */ + if(ret >= 0) { + flb_tail_dmode_flush(mp_sbuf, mp_pck, file, ctx); + } + } + } +#endif ret = modify_json_cond(line, line_len, &val, &val_len, @@ -236,17 +277,41 @@ int flb_tail_dmode_process_content(time_t now, unesc_ends_with_nl, prepend_sds_to_str, file->dmode_buf); if (ret >= 0) { + /* line is a valid json */ flb_sds_len_set(file->dmode_lastline, 0); - if (ret == 0) { - file->dmode_buf = flb_sds_cat(file->dmode_buf, val, val_len); - file->dmode_lastline = flb_sds_copy(file->dmode_lastline, line, line_len); - file->dmode_flush_timeout = now + (ctx->docker_mode_flush - 1); - return ret; + /* concatenate current log line with buffered one */ + tmp = flb_sds_cat(file->dmode_buf, val, val_len); + if (!tmp) { + flb_errno(); + return -1; + } + + tmp_copy = flb_sds_copy(file->dmode_lastline, line, line_len); + if (!tmp_copy) { + flb_errno(); + return -1; } - flb_sds_len_set(file->dmode_buf, 0); - file->dmode_flush_timeout = 0; + file->dmode_buf = tmp; + file->dmode_lastline = tmp_copy; + file->dmode_flush_timeout = now + (ctx->docker_mode_flush - 1); + + if (ret == 0) { + /* Line not ended with newline */ + file->dmode_complete = false; + } + else { + /* Line ended with newline */ + file->dmode_complete = true; +#ifdef FLB_HAVE_REGEX + if (!ctx->docker_mode_parser) { + flb_tail_dmode_flush(mp_sbuf, mp_pck, file, ctx); + } +#else + flb_tail_dmode_flush(mp_sbuf, mp_pck, file, ctx); +#endif + } } return ret; } diff --git a/plugins/in_tail/tail_dockermode.h b/plugins/in_tail/tail_dockermode.h index 76ac18de1ea..8fdedccfdcc 100644 --- a/plugins/in_tail/tail_dockermode.h +++ b/plugins/in_tail/tail_dockermode.h @@ -30,7 +30,8 @@ int flb_tail_dmode_process_content(time_t now, char* line, size_t line_len, char **repl_line, size_t *repl_line_len, struct flb_tail_file *file, - struct flb_tail_config *ctx); + struct flb_tail_config *ctx, + msgpack_sbuffer *mp_sbuf, msgpack_packer *mp_pck); void flb_tail_dmode_flush(msgpack_sbuffer *mp_sbuf, msgpack_packer *mp_pck, struct flb_tail_file *file, struct flb_tail_config *ctx); int flb_tail_dmode_pending_flush(struct flb_input_instance *ins, diff --git a/plugins/in_tail/tail_file.c b/plugins/in_tail/tail_file.c index 955ece99348..841ec04aa93 100644 --- a/plugins/in_tail/tail_file.c +++ b/plugins/in_tail/tail_file.c @@ -281,7 +281,7 @@ static int process_content(struct flb_tail_file *file, off_t *bytes) if (ctx->docker_mode) { ret = flb_tail_dmode_process_content(now, line, line_len, &repl_line, &repl_line_len, - file, ctx); + file, ctx, out_sbuf, out_pck); if (ret >= 0) { if (repl_line == line) { repl_line = NULL; @@ -290,9 +290,8 @@ static int process_content(struct flb_tail_file *file, off_t *bytes) line = repl_line; line_len = repl_line_len; } - if (ret == 0) { - goto go_next; - } + /* Skip normal parsers flow */ + goto go_next; } else { flb_tail_dmode_flush(out_sbuf, out_pck, file, ctx); @@ -682,6 +681,7 @@ int flb_tail_file_append(char *path, struct stat *st, int mode, file->mult_skipping = FLB_FALSE; msgpack_sbuffer_init(&file->mult_sbuf); file->dmode_flush_timeout = 0; + file->dmode_complete = true; file->dmode_buf = flb_sds_create_size(ctx->docker_mode == FLB_TRUE ? 65536 : 0); file->dmode_lastline = flb_sds_create_size(ctx->docker_mode == FLB_TRUE ? 20000 : 0); #ifdef FLB_HAVE_SQLDB diff --git a/plugins/in_tail/tail_file_internal.h b/plugins/in_tail/tail_file_internal.h index 83999d4bcdb..a45872c6795 100644 --- a/plugins/in_tail/tail_file_internal.h +++ b/plugins/in_tail/tail_file_internal.h @@ -69,6 +69,7 @@ struct flb_tail_file { time_t dmode_flush_timeout; /* time when docker mode started */ flb_sds_t dmode_buf; /* buffer for docker mode */ flb_sds_t dmode_lastline; /* last incomplete line */ + bool dmode_complete; /* buffer contains completed log */ /* buffering */ off_t parsed; diff --git a/tests/runtime/CMakeLists.txt b/tests/runtime/CMakeLists.txt index 119125f8482..78637f682c4 100644 --- a/tests/runtime/CMakeLists.txt +++ b/tests/runtime/CMakeLists.txt @@ -28,6 +28,7 @@ if(FLB_OUT_LIB) FLB_RT_TEST(FLB_IN_HEAD "in_head.c") FLB_RT_TEST(FLB_IN_DUMMY "in_dummy.c") FLB_RT_TEST(FLB_IN_RANDOM "in_random.c") + FLB_RT_TEST(FLB_IN_TAIL "in_tail.c") endif() # Filter Plugins diff --git a/tests/runtime/data/tail/log/dockermode.log b/tests/runtime/data/tail/log/dockermode.log new file mode 100644 index 00000000000..a5a05ee467d --- /dev/null +++ b/tests/runtime/data/tail/log/dockermode.log @@ -0,0 +1,3 @@ +{"log":"Single log\n"} +{"log":"Second log\n"} +{"log":"Third log\n"} diff --git a/tests/runtime/data/tail/log/dockermode_multiple_lines.log b/tests/runtime/data/tail/log/dockermode_multiple_lines.log new file mode 100644 index 00000000000..564dabbb326 --- /dev/null +++ b/tests/runtime/data/tail/log/dockermode_multiple_lines.log @@ -0,0 +1,6 @@ +{"log":"2020-03-24 Multiple lines: \n"} +{"log":"first bullet point\n"} +{"log":"second bullet point\n"} +{"log":"third bullet point\n"} +{"log":"fourth bullet point\n"} +{"log":"2020-03-24 Single line\n"} diff --git a/tests/runtime/data/tail/log/dockermode_splitted_line.log b/tests/runtime/data/tail/log/dockermode_splitted_line.log new file mode 100644 index 00000000000..8f80f1b32b7 --- /dev/null +++ b/tests/runtime/data/tail/log/dockermode_splitted_line.log @@ -0,0 +1,6 @@ +{"log":"2020-03-24 Single "} +{"log":"li"} +{"log":"ne\n"} +{"log":"2020-03-24 Another "} +{"log":"single "} +{"log":"line\n"} diff --git a/tests/runtime/data/tail/log/dockermode_splitted_multiple_lines.log b/tests/runtime/data/tail/log/dockermode_splitted_multiple_lines.log new file mode 100644 index 00000000000..2c7d36dd7ad --- /dev/null +++ b/tests/runtime/data/tail/log/dockermode_splitted_multiple_lines.log @@ -0,0 +1,8 @@ +{"log":"2020-03-24 Multiple lines: \n"} +{"log":"first bullet point\n"} +{"log":"second bullet point\n"} +{"log":"third "} +{"log":"bullet "} +{"log":"point\n"} +{"log":"fourth bullet point\n"} +{"log":"2020-03-24 Single line\n"} diff --git a/tests/runtime/data/tail/out/dockermode.out b/tests/runtime/data/tail/out/dockermode.out new file mode 100644 index 00000000000..a5a05ee467d --- /dev/null +++ b/tests/runtime/data/tail/out/dockermode.out @@ -0,0 +1,3 @@ +{"log":"Single log\n"} +{"log":"Second log\n"} +{"log":"Third log\n"} diff --git a/tests/runtime/data/tail/out/dockermode_multiple_lines.out b/tests/runtime/data/tail/out/dockermode_multiple_lines.out new file mode 100644 index 00000000000..27fba1b980c --- /dev/null +++ b/tests/runtime/data/tail/out/dockermode_multiple_lines.out @@ -0,0 +1,2 @@ +{"log":"2020-03-24 Multiple lines: \nfirst bullet point\nsecond bullet point\nthird bullet point\nfourth bullet point\n"} +{"log":"2020-03-24 Single line\n"} diff --git a/tests/runtime/data/tail/out/dockermode_splitted_line.out b/tests/runtime/data/tail/out/dockermode_splitted_line.out new file mode 100644 index 00000000000..14d0aa8a29d --- /dev/null +++ b/tests/runtime/data/tail/out/dockermode_splitted_line.out @@ -0,0 +1,2 @@ +{"log":"2020-03-24 Single line\n"} +{"log":"2020-03-24 Another single line\n"} diff --git a/tests/runtime/data/tail/out/dockermode_splitted_multiple_lines.out b/tests/runtime/data/tail/out/dockermode_splitted_multiple_lines.out new file mode 100644 index 00000000000..27fba1b980c --- /dev/null +++ b/tests/runtime/data/tail/out/dockermode_splitted_multiple_lines.out @@ -0,0 +1,2 @@ +{"log":"2020-03-24 Multiple lines: \nfirst bullet point\nsecond bullet point\nthird bullet point\nfourth bullet point\n"} +{"log":"2020-03-24 Single line\n"} diff --git a/tests/runtime/data/tail/parsers.conf b/tests/runtime/data/tail/parsers.conf new file mode 100644 index 00000000000..28bde475196 --- /dev/null +++ b/tests/runtime/data/tail/parsers.conf @@ -0,0 +1,10 @@ +[PARSER] + Name docker + Format json + Time_Key time + Time_Format %Y-%m-%dT%H:%M:%S.%L%z + +[PARSER] + Name docker_multiline + Format regex + Regex (?^{"log":"\d{4}-\d{2}-\d{2}.*) diff --git a/tests/runtime/in_tail.c b/tests/runtime/in_tail.c new file mode 100644 index 00000000000..9ae0d3f0128 --- /dev/null +++ b/tests/runtime/in_tail.c @@ -0,0 +1,294 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2016 Treasure Data Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* +Approach for this tests is basing on filter_kubernetes tests +*/ + +#include +#include +#include +#include +#include +#include "flb_tests_runtime.h" + + + +#define DPATH FLB_TESTS_DATA_PATH "/data/tail" +#define MAX_LINES 32 + +int64_t result_time; +struct tail_test_result { + const char *target; + int nMatched; +}; + +struct tail_file_lines { + char *lines[MAX_LINES]; + int lines_c; +}; + + +static inline int64_t set_result(int64_t v) +{ + int64_t old = __sync_lock_test_and_set(&result_time, v); + return old; +} + + +static int file_to_buf(const char *path, char **out_buf, size_t *out_size) +{ + int ret; + long bytes; + char *buf; + FILE *fp; + struct stat st; + + ret = stat(path, &st); + if (ret == -1) { + return -1; + } + + fp = fopen(path, "r"); + if (!fp) { + return -1; + } + + buf = flb_malloc(st.st_size); + if (!buf) { + flb_errno(); + fclose(fp); + return -1; + } + + bytes = fread(buf, st.st_size, 1, fp); + if (bytes != 1) { + flb_errno(); + flb_free(buf); + fclose(fp); + return -1; + } + + fclose(fp); + *out_buf = buf; + *out_size = st.st_size; + + return 0; +} + +/* Given a target, lookup the .out file and return it content in a tail_file_lines structure */ +static struct tail_file_lines *get_out_file_content(const char *target) +{ + int ret; + char file[PATH_MAX]; + char *p; + char *out_buf; + size_t out_size; + struct tail_file_lines *file_lines = flb_malloc(sizeof (struct tail_file_lines)); + file_lines->lines_c = 0; + + snprintf(file, sizeof(file) - 1, DPATH "/out/%s.out", target); + + ret = file_to_buf(file, &out_buf, &out_size); + TEST_CHECK_(ret == 0, "getting output file content: %s", file); + if (ret != 0) { + file_lines->lines_c = 0; + return file_lines; + } + + file_lines->lines[file_lines->lines_c++] = out_buf; + + for (int i=0; ilines_c < MAX_LINES) { + file_lines->lines[file_lines->lines_c++] = p; + } + } + } + + // printf("Just before return: %s\n", file_lines.lines[0]); + return file_lines; +} + +static int cb_check_result(void *record, size_t size, void *data) +{ + struct tail_test_result *result; + struct tail_file_lines *out; + + result = (struct tail_test_result *) data; + + char *check; + + out = get_out_file_content(result->target); + // printf("What we got from function: %s\n", out.lines[0]); + if (!out->lines_c) { + goto exit; + } + /* + * Our validation is: check that the one of the output lines + * in the output record. + */ + for (int i=0; ilines_c; i++) { + check = strstr(record, out->lines[i]); + if (check != NULL) { + result->nMatched++; + goto exit; + } + } + +exit: + if (size > 0) { + flb_free(record); + } + if (out->lines_c) { + flb_free(out->lines[0]); + flb_free(out); + } + return 0; +} + +void do_test(char *system, const char *target, int tExpected, int nExpected, ...) +{ + int64_t ret; + flb_ctx_t *ctx = NULL; + int in_ffd; + int out_ffd; + va_list va; + char *key; + char *value; + char path[PATH_MAX]; + struct tail_test_result result = {0}; + + result.nMatched = 0; + result.target = target; + + struct flb_lib_out_cb cb; + cb.cb = cb_check_result; + cb.data = &result; + + /* initialize */ + set_result(0); + + ctx = flb_create(); + + ret = flb_service_set(ctx, + "Log_Level", "error", + "Parsers_File", DPATH "/parsers.conf", + NULL); + TEST_CHECK_(ret == 0, "setting service options"); + + in_ffd = flb_input(ctx, (char *) system, NULL); + TEST_CHECK(in_ffd >= 0); + TEST_CHECK(flb_input_set(ctx, in_ffd, "tag", "test", NULL) == 0); + + /* Compose path based on target */ + snprintf(path, sizeof(path) - 1, DPATH "/log/%s.log", target); + TEST_CHECK_(access(path, R_OK) == 0, "accessing log file: %s", path); + + TEST_CHECK(flb_input_set(ctx, in_ffd, + "Path", path, + "Docker_Mode", "On", + "Parser", "docker", + NULL) == 0); + + va_start(va, nExpected); + while ((key = va_arg(va, char *))) { + value = va_arg(va, char *); + TEST_CHECK(value != NULL); + TEST_CHECK(flb_input_set(ctx, in_ffd, key, value, NULL) == 0); + } + va_end(va); + + out_ffd = flb_output(ctx, (char *) "lib", &cb); + TEST_CHECK(out_ffd >= 0); + TEST_CHECK(flb_output_set(ctx, out_ffd, + "match", "test", + "format", "json", + NULL) == 0); + + TEST_CHECK(flb_service_set(ctx, "Flush", "0.5", + "Grace", "1", + NULL) == 0); + + /* Start test */ + /* Start the engine */ + ret = flb_start(ctx); + TEST_CHECK_(ret == 0, "starting engine"); + + /* Poll for up to 2 seconds or until we got a match */ + for (ret = 0; ret < tExpected && result.nMatched < nExpected; ret++) { + usleep(1000); + } + + TEST_CHECK(result.nMatched == nExpected); + TEST_MSG("result.nMatched: %i\nnExpected: %i", result.nMatched, nExpected); + + ret = flb_stop(ctx); + TEST_CHECK_(ret == 0, "stopping engine"); + + if (ctx) { + flb_destroy(ctx); + } +} + +void flb_test_in_tail_dockermode() +{ + do_test("tail", "dockermode", 20000, 3, + NULL); +} + +void flb_test_in_tail_dockermode_splitted_line() +{ + do_test("tail", "dockermode_splitted_line", 20000, 2, + NULL); +} + +void flb_test_in_tail_dockermode_multiple_lines() +{ + do_test("tail", "dockermode_multiple_lines", 20000, 2, + "Docker_Mode_Parser", "docker_multiline", + NULL); +} + +void flb_test_in_tail_dockermode_splitted_multiple_lines() +{ + do_test("tail", "dockermode_splitted_multiple_lines", 20000, 2, + "Docker_Mode_Parser", "docker_multiline", + NULL); +} + + +/* Test list */ +TEST_LIST = { +#ifdef in_tail + {"in_tail_dockermode", flb_test_in_tail_dockermode}, + {"in_tail_dockermode_splitted_line", flb_test_in_tail_dockermode_splitted_line}, + {"in_tail_dockermode_multiple_lines", flb_test_in_tail_dockermode_multiple_lines}, + {"in_tail_dockermode_splitted_multiple_lines", flb_test_in_tail_dockermode_splitted_multiple_lines}, +#endif + {NULL, NULL} +};