diff --git a/plugins/in_kafka/in_kafka.c b/plugins/in_kafka/in_kafka.c index 972ae41706d..25f11295fd6 100644 --- a/plugins/in_kafka/in_kafka.c +++ b/plugins/in_kafka/in_kafka.c @@ -167,6 +167,13 @@ static int in_kafka_collect(struct flb_input_instance *ins, break; } + if (rkm->err) { + flb_plg_warn(ins, "consumer error: %s\n", + rd_kafka_message_errstr(rkm)); + rd_kafka_message_destroy(rkm); + continue; + } + flb_plg_debug(ins, "kafka message received"); ret = process_message(ctx, rkm); @@ -175,12 +182,20 @@ static int in_kafka_collect(struct flb_input_instance *ins, /* TO-DO: commit the record based on `ret` */ rd_kafka_commit(ctx->kafka.rk, NULL, 0); + + /* Break from the loop when reaching the limit of polling if available */ + if (ctx->polling_threshold != FLB_IN_KAFKA_UNLIMITED && + ctx->log_encoder->output_length > ctx->polling_threshold + 512) { + break; + } } if (ret == FLB_EVENT_ENCODER_SUCCESS) { - flb_input_log_append(ins, NULL, 0, - ctx->log_encoder->output_buffer, - ctx->log_encoder->output_length); + if (ctx->log_encoder->output_length > 0) { + flb_input_log_append(ins, NULL, 0, + ctx->log_encoder->output_buffer, + ctx->log_encoder->output_length); + } ret = 0; } else { @@ -203,8 +218,10 @@ static int in_kafka_init(struct flb_input_instance *ins, rd_kafka_conf_t *kafka_conf = NULL; rd_kafka_topic_partition_list_t *kafka_topics = NULL; rd_kafka_resp_err_t err; + rd_kafka_conf_res_t res; char errstr[512]; (void) data; + char conf_val[16]; /* Allocate space for the configuration context */ ctx = flb_malloc(sizeof(struct flb_in_kafka_config)); @@ -226,6 +243,31 @@ static int in_kafka_init(struct flb_input_instance *ins, goto init_error; } + if (ctx->buffer_max_size > 0) { + ctx->polling_threshold = ctx->buffer_max_size; + + snprintf(conf_val, sizeof(conf_val), "%zu", ctx->polling_threshold - 512); + res = rd_kafka_conf_set(kafka_conf, "fetch.max.bytes", conf_val, + errstr, sizeof(errstr)); + if (res != RD_KAFKA_CONF_OK) { + flb_plg_error(ins, "Failed to set up fetch.max.bytes: %s, val = %s", + rd_kafka_err2str(err), conf_val); + goto init_error; + } + + snprintf(conf_val, sizeof(conf_val), "%zu", ctx->polling_threshold); + res = rd_kafka_conf_set(kafka_conf, "receive.message.max.bytes", conf_val, + errstr, sizeof(errstr)); + if (res != RD_KAFKA_CONF_OK) { + flb_plg_error(ins, "Failed to set up receive.message.max.bytes: %s, val = %s", + rd_kafka_err2str(err), conf_val); + goto init_error; + } + } + else { + ctx->polling_threshold = FLB_IN_KAFKA_UNLIMITED; + } + ctx->kafka.rk = rd_kafka_new(RD_KAFKA_CONSUMER, kafka_conf, errstr, sizeof(errstr)); @@ -281,6 +323,8 @@ static int in_kafka_init(struct flb_input_instance *ins, goto init_error; } + ctx->coll_fd = ret; + ctx->log_encoder = flb_log_event_encoder_create(FLB_LOG_EVENT_FORMAT_DEFAULT); if (ctx->log_encoder == NULL) { @@ -306,6 +350,20 @@ static int in_kafka_init(struct flb_input_instance *ins, return -1; } +static void in_kafka_pause(void *data, struct flb_config *config) +{ + struct flb_in_kafka_config *ctx = data; + + flb_input_collector_pause(ctx->coll_fd, ctx->ins); +} + +static void in_kafka_resume(void *data, struct flb_config *config) +{ + struct flb_in_kafka_config *ctx = data; + + flb_input_collector_resume(ctx->coll_fd, ctx->ins); +} + /* Cleanup serial input */ static int in_kafka_exit(void *in_context, struct flb_config *config) { @@ -365,6 +423,11 @@ static struct flb_config_map config_map[] = { 0, FLB_FALSE, 0, "Set the librdkafka options" }, + { + FLB_CONFIG_MAP_SIZE, "buffer_max_size", FLB_IN_KAFKA_BUFFER_MAX_SIZE, + 0, FLB_TRUE, offsetof(struct flb_in_kafka_config, buffer_max_size), + "Set the maximum size of chunk" + }, /* EOF */ {0} }; @@ -377,6 +440,8 @@ struct flb_input_plugin in_kafka_plugin = { .cb_pre_run = NULL, .cb_collect = in_kafka_collect, .cb_flush_buf = NULL, + .cb_pause = in_kafka_pause, + .cb_resume = in_kafka_resume, .cb_exit = in_kafka_exit, .config_map = config_map }; diff --git a/plugins/in_kafka/in_kafka.h b/plugins/in_kafka/in_kafka.h index 2992efff152..b56d9c66893 100644 --- a/plugins/in_kafka/in_kafka.h +++ b/plugins/in_kafka/in_kafka.h @@ -30,6 +30,8 @@ #define FLB_IN_KAFKA_DEFAULT_POLL_MS "500" #define FLB_IN_KAFKA_DEFAULT_FORMAT "none" +#define FLB_IN_KAFKA_UNLIMITED (size_t)-1 +#define FLB_IN_KAFKA_BUFFER_MAX_SIZE "4M" enum { FLB_IN_KAFKA_FORMAT_NONE, @@ -43,6 +45,9 @@ struct flb_in_kafka_config { int poll_ms; int format; char *format_str; + int coll_fd; + size_t buffer_max_size; /* Maximum size of chunk allocation */ + size_t polling_threshold; }; #endif