Skip to content

Commit

Permalink
Merge pull request #367 from tkornai/in-rdkafka-group-history
Browse files Browse the repository at this point in the history
Add librdkafka based consumer
  • Loading branch information
repeatedly authored Sep 14, 2020
2 parents c72d46b + b65c75d commit 7f4f516
Show file tree
Hide file tree
Showing 2 changed files with 317 additions and 0 deletions.
33 changes: 33 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,39 @@ See also [ruby-kafka README](https://github.com/zendesk/ruby-kafka#consuming-mes

Consuming topic name is used for event tag. So when the target topic name is `app_event`, the tag is `app_event`. If you want to modify tag, use `add_prefix` or `add_suffix` parameter. With `add_prefix kafka`, the tag is `kafka.app_event`.

### Input plugin (@type 'rdkafka_group', supports kafka consumer groups, uses rdkafka-ruby)

:warning: **The in_rdkafka_group consumer was not yet tested under heavy production load. Use it at your own risk!**

With the introduction of the rdkafka-ruby based input plugin we hope to support Kafka brokers above version 2.1 where we saw [compatibility issues](https://github.com/fluent/fluent-plugin-kafka/issues/315) when using the ruby-kafka based @kafka_group input type. The rdkafka-ruby lib wraps the highly performant and production ready librdkafka C lib.

<source>
@type rdkafka_group
topics <listening topics(separate with comma',')>
format <input text type (text|json|ltsv|msgpack)> :default => json
message_key <key (Optional, for text format only, default is message)>
kafka_mesasge_key <key (Optional, If specified, set kafka's message key to this key)>
add_headers <If true, add kafka's message headers to record>
add_prefix <tag prefix (Optional)>
add_suffix <tag suffix (Optional)>
retry_emit_limit <Wait retry_emit_limit x 1s when BuffereQueueLimitError happens. The default is nil and it means waiting until BufferQueueLimitError is resolved>
use_record_time (Deprecated. Use 'time_source record' instead.) <If true, replace event time with contents of 'time' field of fetched record>
time_source <source for message timestamp (now|kafka|record)> :default => now
time_format <string (Optional when use_record_time is used)>

# kafka consumer options
max_wait_time_ms 500
max_batch_size 10000
kafka_configs {
"bootstrap.servers": "brokers <broker1_host>:<broker1_port>,<broker2_host>:<broker2_port>",
"group.id": "<consumer group name>"
}
</source>

See also [rdkafka-ruby](https://github.com/appsignal/rdkafka-ruby) and [librdkafka](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md) for more detailed documentation about Kafka consumer options.

Consuming topic name is used for event tag. So when the target topic name is `app_event`, the tag is `app_event`. If you want to modify tag, use `add_prefix` or `add_suffix` parameter. With `add_prefix kafka`, the tag is `kafka.app_event`.

### Output plugin

This `kafka2` plugin is for fluentd v1 or later. This plugin uses `ruby-kafka` producer for writing data.
Expand Down
284 changes: 284 additions & 0 deletions lib/fluent/plugin/in_rdkafka_group.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,284 @@
require 'fluent/plugin/input'
require 'fluent/time'
require 'fluent/plugin/kafka_plugin_util'

require 'rdkafka'

class Fluent::Plugin::RdKafkaGroupInput < Fluent::Plugin::Input
Fluent::Plugin.register_input('rdkafka_group', self)

helpers :thread

config_param :topics, :string,
:desc => "Listening topics(separate with comma',')."

config_param :format, :string, :default => 'json',
:desc => "Supported format: (json|text|ltsv|msgpack)"
config_param :message_key, :string, :default => 'message',
:desc => "For 'text' format only."
config_param :add_headers, :bool, :default => false,
:desc => "Add kafka's message headers to event record"
config_param :add_prefix, :string, :default => nil,
:desc => "Tag prefix (Optional)"
config_param :add_suffix, :string, :default => nil,
:desc => "Tag suffix (Optional)"
config_param :use_record_time, :bool, :default => false,
:desc => "Replace message timestamp with contents of 'time' field.",
:deprecated => "Use 'time_source record' instead."
config_param :time_source, :enum, :list => [:now, :kafka, :record], :default => :now,
:desc => "Source for message timestamp."
config_param :record_time_key, :string, :default => 'time',
:desc => "Time field when time_source is 'record'"
config_param :time_format, :string, :default => nil,
:desc => "Time format to be used to parse 'time' field."
config_param :kafka_message_key, :string, :default => nil,
:desc => "Set kafka's message key to this field"

config_param :retry_emit_limit, :integer, :default => nil,
:desc => "How long to stop event consuming when BufferQueueLimitError happens. Wait retry_emit_limit x 1s. The default is waiting until BufferQueueLimitError is resolved"
config_param :retry_wait_seconds, :integer, :default => 30
config_param :disable_retry_limit, :bool, :default => false,
:desc => "If set true, it disables retry_limit and make Fluentd retry indefinitely (default: false)"
config_param :retry_limit, :integer, :default => 10,
:desc => "The maximum number of retries for connecting kafka (default: 10)"

config_param :max_wait_time_ms, :integer, :default => 250,
:desc => "How long to block polls in milliseconds until the server sends us data."
config_param :max_batch_size, :integer, :default => 10000,
:desc => "Maximum number of log lines emitted in a single batch."

config_param :kafka_configs, :hash, :default => {},
:desc => "Kafka configuration properties as desribed in https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md"

include Fluent::KafkaPluginUtil::SSLSettings
include Fluent::KafkaPluginUtil::SaslSettings

class ForShutdown < StandardError
end

BufferError = Fluent::Plugin::Buffer::BufferOverflowError

def initialize
super

@time_parser = nil
@retry_count = 1
end

def _config_to_array(config)
config_array = config.split(',').map {|k| k.strip }
if config_array.empty?
raise Fluent::ConfigError, "kafka_group: '#{config}' is a required parameter"
end
config_array
end

def multi_workers_ready?
true
end

private :_config_to_array

def configure(conf)
super

log.warn "The in_rdkafka_group consumer was not yet tested under heavy production load. Use it at your own risk!"

log.info "Will watch for topics #{@topics} at brokers " \
"#{@kafka_configs["bootstrap.servers"]} and '#{@kafka_configs["group.id"]}' group"

@topics = _config_to_array(@topics)

@parser_proc = setup_parser

@time_source = :record if @use_record_time

if @time_source == :record and @time_format
@time_parser = Fluent::TimeParser.new(@time_format)
end
end

def setup_parser
case @format
when 'json'
begin
require 'oj'
Oj.default_options = Fluent::DEFAULT_OJ_OPTIONS
Proc.new { |msg| Oj.load(msg.payload) }
rescue LoadError
require 'yajl'
Proc.new { |msg| Yajl::Parser.parse(msg.payload) }
end
when 'ltsv'
require 'ltsv'
Proc.new { |msg| LTSV.parse(msg.payload, {:symbolize_keys => false}).first }
when 'msgpack'
require 'msgpack'
Proc.new { |msg| MessagePack.unpack(msg.payload) }
when 'text'
Proc.new { |msg| {@message_key => msg.payload} }
end
end

def start
super

@consumer = setup_consumer

thread_create(:in_rdkafka_group, &method(:run))
end

def shutdown
# This nil assignment should be guarded by mutex in multithread programming manner.
# But the situation is very low contention, so we don't use mutex for now.
# If the problem happens, we will add a guard for consumer.
consumer = @consumer
@consumer = nil
consumer.close

super
end

def setup_consumer
consumer = Rdkafka::Config.new(@kafka_configs).consumer
consumer.subscribe(*@topics)
consumer
end

def reconnect_consumer
log.warn "Stopping Consumer"
consumer = @consumer
@consumer = nil
if consumer
consumer.close
end
log.warn "Could not connect to broker. retry_time:#{@retry_count}. Next retry will be in #{@retry_wait_seconds} seconds"
@retry_count = @retry_count + 1
sleep @retry_wait_seconds
@consumer = setup_consumer
log.warn "Re-starting consumer #{Time.now.to_s}"
@retry_count = 0
rescue =>e
log.error "unexpected error during re-starting consumer object access", :error => e.to_s
log.error_backtrace
if @retry_count <= @retry_limit or disable_retry_limit
reconnect_consumer
end
end

class Batch
attr_reader :topic
attr_reader :messages

def initialize(topic)
@topic = topic
@messages = []
end
end

# Executes the passed codeblock on a batch of messages.
# It is guaranteed that every message in a given batch belongs to the same topic, because the tagging logic in :run expects that property.
# The number of maximum messages in a batch is capped by the :max_batch_size configuration value. It ensures that consuming from a single
# topic for a long time (e.g. with `auto.offset.reset` set to `earliest`) does not lead to memory exhaustion. Also, calling consumer.poll
# advances thes consumer offset, so in case the process crashes we might lose at most :max_batch_size messages.
def each_batch(&block)
batch = nil
message = nil
while @consumer
message = @consumer.poll(@max_wait_time_ms)
if message
if not batch
batch = Batch.new(message.topic)
elsif batch.topic != message.topic || batch.messages.size >= @max_batch_size
yield batch
batch = Batch.new(message.topic)
end
batch.messages << message
else
yield batch if batch
batch = nil
end
end
yield batch if batch
end

def run
while @consumer
begin
each_batch { |batch|
log.debug "A new batch for topic #{batch.topic} with #{batch.messages.size} messages"
es = Fluent::MultiEventStream.new
tag = batch.topic
tag = @add_prefix + "." + tag if @add_prefix
tag = tag + "." + @add_suffix if @add_suffix

batch.messages.each { |msg|
begin
record = @parser_proc.call(msg)
case @time_source
when :kafka
record_time = Fluent::EventTime.from_time(msg.timestamp)
when :now
record_time = Fluent::Engine.now
when :record
if @time_format
record_time = @time_parser.parse(record[@record_time_key].to_s)
else
record_time = record[@record_time_key]
end
else
log.fatal "BUG: invalid time_source: #{@time_source}"
end
if @kafka_message_key
record[@kafka_message_key] = msg.key
end
if @add_headers
msg.headers.each_pair { |k, v|
record[k] = v
}
end
es.add(record_time, record)
rescue => e
log.warn "parser error in #{msg.topic}/#{msg.partition}", :error => e.to_s, :value => msg.payload, :offset => msg.offset
log.debug_backtrace
end
}

unless es.empty?
emit_events(tag, es)
end
}
rescue ForShutdown
rescue => e
log.error "unexpected error during consuming events from kafka. Re-fetch events.", :error => e.to_s
log.error_backtrace
reconnect_consumer
end
end
rescue => e
log.error "unexpected error during consumer object access", :error => e.to_s
log.error_backtrace
end

def emit_events(tag, es)
retries = 0
begin
router.emit_stream(tag, es)
rescue BufferError
raise ForShutdown if @consumer.nil?

if @retry_emit_limit.nil?
sleep 1
retry
end

if retries < @retry_emit_limit
retries += 1
sleep 1
retry
else
raise RuntimeError, "Exceeds retry_emit_limit"
end
end
end
end

0 comments on commit 7f4f516

Please sign in to comment.