-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
filter_multiline: implement Docker partial_message support #5037
Conversation
@edsiper there won't be another 1.8 version right? So I just need to submit a PR to master for this instead.. |
I think the plan is still some maintenance releases @PettitWesley, I've submitted a few PRs for 1.8 for example. |
multiline core feature already supports partial docker messages, what is different on this PR ? |
@edsiper The logic is very different between your existing code for parsing split docker logs in json log files: https://github.com/fluent/fluent-bit/blob/master/src/multiline/flb_ml_parser_docker.c And my code which is meant to be used with the fluentd docker log driver. The code I have added here is similar to the fluentd-plugin-concat and the underlying logic is much different. See this comment on my design and the sections of my design: #4309 (comment) https://github.com/fluent-plugins-nursery/fluent-plugin-concat |
fb9362d
to
c5f3c57
Compare
Signed-off-by: Wesley Pettit <[email protected]>
Signed-off-by: Wesley Pettit <[email protected]>
c5f3c57
to
2e81ef4
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good! Added some comments for clairification
|
||
sched = flb_sched_ctx_get(); | ||
|
||
ret = flb_sched_timer_cb_create(sched, FLB_SCHED_TIMER_CB_PERM, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we have this be the (flush_ms / x) where (flush_ms / x) is less than grace? And the cb timer altered to only run if in shutdown phase or every x runs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The flush timer can't help on shutdown. I need another solution for that (which will come in a separate PR and I will do a separate write up in an issue on that for Eduardo, ping me if you want a verbal explanation before I get to it).
Dividing by X is potentially a good idea since then we are more likely to flush closer to when a log has exceeded flush_ms. I'll change this, I'm gonna make X be 2.
plugins/filter_multiline/ml_concat.h
Outdated
msgpack_object_kv *get_key(msgpack_object *map, char *check_for_key); | ||
int is_partial(msgpack_object *map); | ||
int is_partial_last(msgpack_object *map); | ||
int get_partial_id(msgpack_object *map, | ||
char **partial_id_str, | ||
size_t *partial_id_size); | ||
struct split_message_packer *get_packer(struct mk_list *packers, const char *tag, | ||
char *input_name, | ||
char *partial_id_str, size_t partial_id_size); | ||
struct split_message_packer *create_packer(const char *tag, char *input_name, | ||
char *partial_id_str, size_t partial_id_size, | ||
msgpack_object *map, char *multiline_key_content, | ||
struct flb_time *tm); | ||
int split_message_packer_write(struct split_message_packer *packer, | ||
msgpack_object *map, char *multiline_key_content); | ||
void split_message_packer_complete(struct split_message_packer *packer); | ||
void split_message_packer_destroy(struct split_message_packer *packer); | ||
void append_complete_record(char *data, size_t bytes, msgpack_packer *tmp_pck); | ||
unsigned long long current_timestamp(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you may want to prefix these function names with flb_ml_concat_
or similar to avoid polluting the global namespace.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call. I keep forgetting about this. I wish C functions weren't in a global namespace...
for(i=0; i < map->via.map.size; i++) { | ||
if ((kv+i) == split_kv) { | ||
continue; | ||
} | ||
|
||
key = (kv+i)->key; | ||
if (key.type == MSGPACK_OBJECT_BIN) { | ||
key_str = (char *) key.via.bin.ptr; | ||
key_str_size = key.via.bin.size; | ||
check_key = FLB_TRUE; | ||
} | ||
if (key.type == MSGPACK_OBJECT_STR) { | ||
key_str = (char *) key.via.str.ptr; | ||
key_str_size = key.via.str.size; | ||
check_key = FLB_TRUE; | ||
} | ||
|
||
len = FLB_MULTILINE_PARTIAL_PREFIX_LEN; | ||
if (key_str_size < len) { | ||
len = key_str_size; | ||
} | ||
|
||
if (check_key == FLB_TRUE) { | ||
if (strncmp(FLB_MULTILINE_PARTIAL_PREFIX, key_str, len) == 0) { | ||
/* don't pack the partial keys */ | ||
continue; | ||
} | ||
} | ||
|
||
map_size++; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Repeated code 3 times. Would be nice to somehow extract for maintenance purposes. Maybe difficult.
plugins/filter_multiline/ml_concat.c
Outdated
val = kv->val; | ||
if (val.type == MSGPACK_OBJECT_BIN) { | ||
val_str = (char *) val.via.bin.ptr; | ||
val_str_size = val.via.bin.size; | ||
} | ||
if (val.type == MSGPACK_OBJECT_STR) { | ||
val_str = (char *) val.via.str.ptr; | ||
val_str_size = val.via.str.size; | ||
} | ||
|
||
flb_sds_cat_safe(&packer->buf, val_str, val_str_size); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe recommend discarding if not of type binary or string. This should, I guess, never happen because we trust the contents of the multiline value to be a string/binary, but what if it unexpectedly happens to be something else?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call, that's safer.
if (packer->mp_sbuf.data) { | ||
msgpack_sbuffer_destroy(&packer->mp_sbuf); | ||
} | ||
|
||
flb_free(packer); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does packer->mp_pck need to be freed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AFAICT it does not. The buffer is the actual memory, the packer seems to be like a helper. The existing code for multiline has a packer and a buffer and it only frees the buffer. Also I tested this with valgrind and it didn't show a memory leak.
/* record passed from filter as-is */ | ||
msgpack_pack_array(&tmp_pck, 2); | ||
flb_time_append_to_msgpack(&tm, &tmp_pck, 0); | ||
msgpack_pack_object(&tmp_pck, *obj); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering if for consistency, these non_partial logs should be re-emitted. That way if the multiline filter is not the first filter, all the logs will have prior filters applied twice, rather than only the multiline logs.
Also to consider is efficiency, which advocates for the current solution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nevermind. See that emit only occurs after timeout, so it shouldn't regularly happen. Current solution seems fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought about this too myself... but returning directly from the filter is more efficient... also I think most of hte filters can be safely passed through twice... this is also something I can fix if it turns out it does have weird side effects and someone complains.
There is an issue right now open that under high throughput the emitter fills up quickly... and remember the use case here is explicitly for huge logs, so I htink I want to stick with only using the emitter when needed.
plugins/filter_multiline/ml.c
Outdated
} | ||
mk_list_add(&packer->_head, &ctx->split_message_packers); | ||
} | ||
ret = split_message_packer_write(packer, obj, "log"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
May prefer some #define for "log" string.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and also on line 609
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh boy. This is actually a bug. Thanks. The key is not guaranteed to be "log"... it is user configured.
*out_buf = tmp_sbuf.data; | ||
*out_bytes = tmp_sbuf.size; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what happens to the old out_buf, and out_bytes? This gets cleaned up properly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
may also need to call free(tmp_sbuf)
The data is still used, so msgpack_sbuffer_free might not be able to be used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the way this works is that you return the memory to the caller, and then the caller frees. If you look in the code in I think flb_filter.c it frees the returned buf. All of the filters work this way.
split_message_packer_complete(packer); | ||
/* re-emit record with original tag */ | ||
flb_plg_trace(ctx->ins, "emitting from %s to %s", packer->input_name, packer->tag); | ||
ret = in_emitter_add_record(packer->tag, flb_sds_len(packer->tag), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not clear how cycles are avoided.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Check the beginning of cb_ml_filter:
if (i_ins == ctx->ins_emitter) {
flb_plg_trace(ctx->ins, "not processing records from the emitter");
return FLB_FILTER_NOTOUCH;
}
} else { | ||
flb_plg_error(ins, "'Mode' must be '%s' or '%s'", | ||
FLB_MULTILINE_MODE_PARTIAL_MESSAGE, | ||
FLB_MULTILINE_MODE_PARSER); | ||
return -1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is flb_free(ctx) needed here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Its stylistically better yes. Strictly speaking, if you fail plugin init, then Fluent Bit shuts down, and so freeing memory isn't needed. Lots of the plugins don't free on init failure.
Signed-off-by: Wesley Pettit <[email protected]>
91478eb
to
ea04e96
Compare
@edsiper @koleini @fujimotos can we get this reviewed so that we can merge this? We get customers waiting for this fix. |
@lubingfeng Thanks for checking. Let me take a second look. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The minor changes look good, and comments towards most of the other issues make sense!
I personally feel that the multiline function prefix should be something like flb_ml_concat_..., rather than just ml to follow the prefix convention set forth in flb_ml_parser.h/c, flb_ml_rule.h/c and other fluent bit files, and also to avoid naming collision with other libraries via the flb_ prefix. This is up to you and Eduardo though.
Not sure if you care about this style. But if you do, line 278 has an invisible extra indent in ml.c: https://github.com/fluent/fluent-bit/pull/5037/files#diff-d882420079c3f1dedc230e33a45fb81bf06a4deae9e2df11bd8ecaf509c73b0dR278
Lines 359 to 363 in ml_concat.c have some extra indents:
https://github.com/fluent/fluent-bit/pull/5037/files#diff-f5839e19406e810f65cbda8799c752423d8fb3c5ff50de0e220864d56d7b7c87R359-R363
Approving.
Approved with some optional change requests @lubingfeng |
We will release this in 1.9 instead, and I will open a PR there once #4671 is merged |
Signed-off-by: Wesley Pettit [email protected]
Enter
[N/A]
in the box, if an item is not applicable to your change.Testing
Before we can approve your change; please submit the following in a comment:
If this is a change to packaging of containers or native binaries then please confirm it works for all targets.
Documentation
Backporting
Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.