Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions src/multiline/flb_ml.c
Original file line number Diff line number Diff line change
Expand Up @@ -606,12 +606,14 @@ static int ml_append_try_parser(struct flb_ml_parser_ins *parser,
tm, buf, size, map,
&out_buf, &out_size, &release,
&out_time);
/*
* Do not return -1 here. If the sub-parser fails, we should
* still attempt to process the raw text with multiline rules.
* The 'ret' variable is not used beyond this point, so we can
* safely ignore a failure here and let the multiline rules decide.
*/
if (ret < 0) {
/*
* The underlying parser could not consume the line. Propagate the
* failure so the caller can try the next multiline parser in the
* chain (if any) instead of buffering the raw text here.
*/
return -1;
}
break;
case FLB_ML_TYPE_MAP:
ret = ml_append_try_parser_type_map(parser, stream_id, &type,
Expand Down
93 changes: 90 additions & 3 deletions tests/internal/multiline.c
Original file line number Diff line number Diff line change
Expand Up @@ -107,15 +107,40 @@ struct record_check container_mix_input[] = {
{"{\"log\": \"dd-err\\n\", \"stream\": \"stderr\", \"time\": \"2021-02-01T16:45:03.01234z\"}"},
};

/*
* The docker parser should emit each container fragment as soon as the log
* stream provides a newline. CRI lines handled by the chained parser are
* expected to flush immediately even if the docker stream still has buffered
* fragments waiting for a later newline (e.g. "bb" + "cc" + "dd-out\n").
*/
struct record_check container_mix_output[] = {
{"a1\n"},
{"a2\n"},
{"ddee\n"},
{"bbccdd-out\n"},
{"dd-err\n"},
{"single full"},
{"1a. some multiline log"},
{"1b. some multiline log"},
{"bbccdd-out\n"},
{"dd-err\n"},
};

/*
* Regression guard: when docker is the first parser in the chain and a CRI
* record arrives, the docker parser must decline the line so the CRI parser
* can consume it instead of buffering the payload until the flush timer
* expires. The strings below mimic container runtime output without trailing
* newlines as seen in the reported issue.
*/
struct record_check docker_cri_chain_input[] = {
{"2025-09-22T19:07:06.115398289Z stdout F first message"},
{"2025-09-22T19:07:06.116725604Z stdout F second message"},
{"2025-09-22T19:07:08.582112316Z stdout F third message"},
};

struct record_check docker_cri_chain_output[] = {
{"first message"},
{"second message"},
{"third message"},
};

/* Java stacktrace detection */
Expand Down Expand Up @@ -603,6 +628,68 @@ static void test_container_mix()
flb_config_exit(config);
}

static void test_parser_docker_cri_chain()
{
int i;
int len;
int ret;
int entries;
int expected;
uint64_t stream_id;
struct record_check *r;
struct flb_config *config;
struct flb_time tm;
struct flb_ml *ml;
struct flb_ml_parser_ins *mlp_i;
struct expected_result res = {0};

/* Expected results context */
res.key = "log";
res.out_records = docker_cri_chain_output;

/* Initialize environment */
config = flb_config_init();

/* Create docker multiline mode */
ml = flb_ml_create(config, "docker-cri-chain");
TEST_CHECK(ml != NULL);

/* Generate an instance of multiline docker parser */
mlp_i = flb_ml_parser_instance_create(ml, "docker");
TEST_CHECK(mlp_i != NULL);

/* Load instances of the parsers for current 'ml' context */
mlp_i = flb_ml_parser_instance_create(ml, "cri");
TEST_CHECK(mlp_i != NULL);

ret = flb_ml_stream_create(ml, "docker-cri-chain", -1, flush_callback,
(void *) &res, &stream_id);
TEST_CHECK(ret == 0);

entries = sizeof(docker_cri_chain_input) / sizeof(struct record_check);
for (i = 0; i < entries; i++) {
r = &docker_cri_chain_input[i];
len = strlen(r->buf);

flb_time_get(&tm);

/* Package as msgpack */
flb_ml_append_text(ml, stream_id, &tm, r->buf, len);
}

/* Flush any pending data to ensure no buffered records remain */
flb_ml_flush_pending_now(ml);

expected = sizeof(docker_cri_chain_output) / sizeof(struct record_check);
TEST_CHECK(res.current_record == expected);

if (ml) {
flb_ml_destroy(ml);
}

flb_config_exit(config);
}

static void test_parser_java()
{
int i;
Expand Down Expand Up @@ -1468,7 +1555,6 @@ static void test_buffer_limit_truncation()
struct flb_ml *ml;
struct flb_ml_parser *mlp;
struct flb_ml_parser_ins *mlp_i;
struct flb_parser *p;
struct flb_time tm;

/*
Expand Down Expand Up @@ -1561,6 +1647,7 @@ TEST_LIST = {
/* Normal features tests */
{ "parser_docker", test_parser_docker},
{ "parser_cri", test_parser_cri},
{ "parser_docker_cri_chain", test_parser_docker_cri_chain},
{ "parser_java", test_parser_java},
{ "parser_python", test_parser_python},
{ "parser_ruby", test_parser_ruby},
Expand Down
Loading