Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

in_kafka: boost throughput #9800

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
in_kafka: make pull timeout configurable
Signed-off-by: CoreidCC <sws-github@coreid.cc>
coreidcc committed Jan 14, 2025

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
commit cd5375f4241a18935b9b008f4f7ab66b9c584cc5
24 changes: 16 additions & 8 deletions plugins/in_kafka/in_kafka.c
Original file line number Diff line number Diff line change
@@ -182,8 +182,9 @@ static int in_kafka_collect(struct flb_input_instance *ins,


if(!ctx->enable_auto_commit) {
/* TO-DO: commit the record based on `ret` */
rd_kafka_commit(ctx->kafka.rk, NULL, 0);
if (ret == FLB_EVENT_ENCODER_SUCCESS) {
rd_kafka_commit(ctx->kafka.rk, NULL, 0);
}
}

/* Break from the loop when reaching the limit of polling if available */
@@ -225,6 +226,7 @@ static int in_kafka_init(struct flb_input_instance *ins,
char errstr[512];
(void) data;
char conf_val[16];
size_t dsize;

/* Allocate space for the configuration context */
ctx = flb_malloc(sizeof(struct flb_in_kafka_config));
@@ -252,13 +254,20 @@ static int in_kafka_init(struct flb_input_instance *ins,
* -> minimize the delay we might create
* b) run in our own thread:
* -> optimize for throuput and relay on 'fetch.wait.max.ms'
* which is set to 500 by default default. lets set it to
* twice that so that increasing fetch.wait.max.ms still
* has an effect.
* which is set to 500 by default default. wa algin our
* timeout with what is set for 'fetch.wait.max.ms'
*/
ctx->poll_timeount_ms = 1;
if(ins->is_threaded) {
ctx->poll_timeount_ms = 1000;
if (ins->is_threaded) {
ctx->poll_timeount_ms = 550; // ensure kafa triggers timeout

// align our timeout with what was configured for fetch.wait.max.ms
dsize = sizeof(conf_val);
res = rd_kafka_conf_get(kafka_conf, "fetch.wait.max.ms", conf_val, &dsize);
if (res == RD_KAFKA_CONF_OK && dsize <= sizeof(conf_val)) {
// add 50ms so kafa triggers timout
ctx->poll_timeount_ms = atoi(conf_val) + 50;
}
}

if (ctx->buffer_max_size > 0) {
@@ -451,7 +460,6 @@ static struct flb_config_map config_map[] = {
0, FLB_TRUE, offsetof(struct flb_in_kafka_config, enable_auto_commit),
"Rely on kafka auto-commit and commit messages in batches"
},
/* EOF */
{0}
};

1 change: 1 addition & 0 deletions plugins/in_kafka/in_kafka.h
Original file line number Diff line number Diff line change
@@ -33,6 +33,7 @@
#define FLB_IN_KAFKA_UNLIMITED (size_t)-1
#define FLB_IN_KAFKA_BUFFER_MAX_SIZE "4M"
#define FLB_IN_KAFKA_ENABLE_AUTO_COMMIT "false"
#define FLB_IN_KAFKA_POLL_TIMEOUT_MS "550" // same as kafka fetch.wait.max.ms + 10%

enum {
FLB_IN_KAFKA_FORMAT_NONE,