Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions plugins/in_tail/tail.c
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,13 @@ static struct flb_config_map config_map[] = {
"wait period time in seconds to flush queued unfinished split lines."

},
#ifdef FLB_HAVE_REGEX
{
FLB_CONFIG_MAP_STR, "docker_mode_parser", NULL,
0, FLB_FALSE, 0,
"specify the parser name to fetch log first line for muliline log"
},
#endif
{
FLB_CONFIG_MAP_STR, "path_key", NULL,
0, FLB_TRUE, offsetof(struct flb_tail_config, path_key),
Expand Down
1 change: 1 addition & 0 deletions plugins/in_tail/tail_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ struct flb_tail_config {
/* Docker mode */
int docker_mode; /* Docker mode enabled ? */
int docker_mode_flush; /* Docker mode flush/wait */
struct flb_parser *docker_mode_parser; /* Parser for separate multiline logs */

/* Lists head for files consumed statically (read) and by events (inotify) */
struct mk_list files_static;
Expand Down
81 changes: 73 additions & 8 deletions plugins/in_tail/tail_dockermode.c
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,20 @@ int flb_tail_dmode_create(struct flb_tail_config *ctx,
return -1;
}

#ifdef FLB_HAVE_REGEX
/* First line Parser */
tmp = flb_input_get_property("docker_mode_parser", ins);
if (tmp) {
ctx->docker_mode_parser = flb_parser_get(tmp, config);
if (!ctx->docker_mode_parser) {
flb_plg_error(ctx->ins, "parser '%s' is not registered", tmp);
}
}
else {
ctx->docker_mode_parser = NULL;
}
#endif

tmp = flb_input_get_property("docker_mode_flush", ins);
if (!tmp) {
ctx->docker_mode_flush = FLB_TAIL_DMODE_FLUSH;
Expand Down Expand Up @@ -224,29 +238,80 @@ int flb_tail_dmode_process_content(time_t now,
char* line, size_t line_len,
char **repl_line, size_t *repl_line_len,
struct flb_tail_file *file,
struct flb_tail_config *ctx)
struct flb_tail_config *ctx,
msgpack_sbuffer *mp_sbuf, msgpack_packer *mp_pck
)
{
char* val = NULL;
size_t val_len;
int ret;
void *out_buf = NULL;
size_t out_size;
struct flb_time out_time = {0};
*repl_line = NULL;
*repl_line_len = 0;
flb_sds_t tmp;
flb_sds_t tmp_copy;

#ifdef FLB_HAVE_REGEX
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this macro conditional should cover the if statement, otherwise the if does nothing.

if (flb_sds_len(file->dmode_lastline) > 0 && file->dmode_complete) {
if (ctx->docker_mode_parser) {
ret = flb_parser_do(ctx->docker_mode_parser, line, line_len,
&out_buf, &out_size, &out_time);
flb_free(out_buf);

/*
* Buffered log should be flushed out
* as current line meets first-line requirement
*/
if(ret >= 0) {
flb_tail_dmode_flush(mp_sbuf, mp_pck, file, ctx);
}
}
}
#endif

ret = modify_json_cond(line, line_len,
&val, &val_len,
repl_line, repl_line_len,
unesc_ends_with_nl,
prepend_sds_to_str, file->dmode_buf);
if (ret >= 0) {
/* line is a valid json */
flb_sds_len_set(file->dmode_lastline, 0);

if (ret == 0) {
file->dmode_buf = flb_sds_cat(file->dmode_buf, val, val_len);
file->dmode_lastline = flb_sds_copy(file->dmode_lastline, line, line_len);
file->dmode_flush_timeout = now + (ctx->docker_mode_flush - 1);
return ret;
/* concatenate current log line with buffered one */
tmp = flb_sds_cat(file->dmode_buf, val, val_len);
if (!tmp) {
flb_errno();
return -1;
}

tmp_copy = flb_sds_copy(file->dmode_lastline, line, line_len);
if (!tmp_copy) {
flb_errno();
return -1;
}

flb_sds_len_set(file->dmode_buf, 0);
file->dmode_flush_timeout = 0;
file->dmode_buf = tmp;
file->dmode_lastline = tmp_copy;
file->dmode_flush_timeout = now + (ctx->docker_mode_flush - 1);

if (ret == 0) {
/* Line not ended with newline */
file->dmode_complete = false;
}
else {
/* Line ended with newline */
file->dmode_complete = true;
#ifdef FLB_HAVE_REGEX
if (!ctx->docker_mode_parser) {
flb_tail_dmode_flush(mp_sbuf, mp_pck, file, ctx);
}
#else
flb_tail_dmode_flush(mp_sbuf, mp_pck, file, ctx);
#endif
}
}
return ret;
}
Expand Down
3 changes: 2 additions & 1 deletion plugins/in_tail/tail_dockermode.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ int flb_tail_dmode_process_content(time_t now,
char* line, size_t line_len,
char **repl_line, size_t *repl_line_len,
struct flb_tail_file *file,
struct flb_tail_config *ctx);
struct flb_tail_config *ctx,
msgpack_sbuffer *mp_sbuf, msgpack_packer *mp_pck);
void flb_tail_dmode_flush(msgpack_sbuffer *mp_sbuf, msgpack_packer *mp_pck,
struct flb_tail_file *file, struct flb_tail_config *ctx);
int flb_tail_dmode_pending_flush(struct flb_input_instance *ins,
Expand Down
8 changes: 4 additions & 4 deletions plugins/in_tail/tail_file.c
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ static int process_content(struct flb_tail_file *file, off_t *bytes)
if (ctx->docker_mode) {
ret = flb_tail_dmode_process_content(now, line, line_len,
&repl_line, &repl_line_len,
file, ctx);
file, ctx, out_sbuf, out_pck);
if (ret >= 0) {
if (repl_line == line) {
repl_line = NULL;
Expand All @@ -290,9 +290,8 @@ static int process_content(struct flb_tail_file *file, off_t *bytes)
line = repl_line;
line_len = repl_line_len;
}
if (ret == 0) {
goto go_next;
}
/* Skip normal parsers flow */
goto go_next;
}
else {
flb_tail_dmode_flush(out_sbuf, out_pck, file, ctx);
Expand Down Expand Up @@ -682,6 +681,7 @@ int flb_tail_file_append(char *path, struct stat *st, int mode,
file->mult_skipping = FLB_FALSE;
msgpack_sbuffer_init(&file->mult_sbuf);
file->dmode_flush_timeout = 0;
file->dmode_complete = true;
file->dmode_buf = flb_sds_create_size(ctx->docker_mode == FLB_TRUE ? 65536 : 0);
file->dmode_lastline = flb_sds_create_size(ctx->docker_mode == FLB_TRUE ? 20000 : 0);
#ifdef FLB_HAVE_SQLDB
Expand Down
1 change: 1 addition & 0 deletions plugins/in_tail/tail_file_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ struct flb_tail_file {
time_t dmode_flush_timeout; /* time when docker mode started */
flb_sds_t dmode_buf; /* buffer for docker mode */
flb_sds_t dmode_lastline; /* last incomplete line */
bool dmode_complete; /* buffer contains completed log */

/* buffering */
off_t parsed;
Expand Down
1 change: 1 addition & 0 deletions tests/runtime/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ if(FLB_OUT_LIB)
FLB_RT_TEST(FLB_IN_HEAD "in_head.c")
FLB_RT_TEST(FLB_IN_DUMMY "in_dummy.c")
FLB_RT_TEST(FLB_IN_RANDOM "in_random.c")
FLB_RT_TEST(FLB_IN_TAIL "in_tail.c")
endif()

# Filter Plugins
Expand Down
3 changes: 3 additions & 0 deletions tests/runtime/data/tail/log/dockermode.log
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{"log":"Single log\n"}
{"log":"Second log\n"}
{"log":"Third log\n"}
6 changes: 6 additions & 0 deletions tests/runtime/data/tail/log/dockermode_multiple_lines.log
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{"log":"2020-03-24 Multiple lines: \n"}
{"log":"first bullet point\n"}
{"log":"second bullet point\n"}
{"log":"third bullet point\n"}
{"log":"fourth bullet point\n"}
{"log":"2020-03-24 Single line\n"}
6 changes: 6 additions & 0 deletions tests/runtime/data/tail/log/dockermode_splitted_line.log
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{"log":"2020-03-24 Single "}
{"log":"li"}
{"log":"ne\n"}
{"log":"2020-03-24 Another "}
{"log":"single "}
{"log":"line\n"}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{"log":"2020-03-24 Multiple lines: \n"}
{"log":"first bullet point\n"}
{"log":"second bullet point\n"}
{"log":"third "}
{"log":"bullet "}
{"log":"point\n"}
{"log":"fourth bullet point\n"}
{"log":"2020-03-24 Single line\n"}
3 changes: 3 additions & 0 deletions tests/runtime/data/tail/out/dockermode.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{"log":"Single log\n"}
{"log":"Second log\n"}
{"log":"Third log\n"}
2 changes: 2 additions & 0 deletions tests/runtime/data/tail/out/dockermode_multiple_lines.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"log":"2020-03-24 Multiple lines: \nfirst bullet point\nsecond bullet point\nthird bullet point\nfourth bullet point\n"}
{"log":"2020-03-24 Single line\n"}
2 changes: 2 additions & 0 deletions tests/runtime/data/tail/out/dockermode_splitted_line.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"log":"2020-03-24 Single line\n"}
{"log":"2020-03-24 Another single line\n"}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"log":"2020-03-24 Multiple lines: \nfirst bullet point\nsecond bullet point\nthird bullet point\nfourth bullet point\n"}
{"log":"2020-03-24 Single line\n"}
10 changes: 10 additions & 0 deletions tests/runtime/data/tail/parsers.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
[PARSER]
Name docker
Format json
Time_Key time
Time_Format %Y-%m-%dT%H:%M:%S.%L%z

[PARSER]
Name docker_multiline
Format regex
Regex (?<log>^{"log":"\d{4}-\d{2}-\d{2}.*)
Loading