Skip to content

Commit

Permalink
update to new shutdown semantics
Browse files Browse the repository at this point in the history
  • Loading branch information
talevy committed Sep 18, 2015
1 parent 8fa404f commit b76e1fe
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 16 deletions.
25 changes: 13 additions & 12 deletions lib/logstash/inputs/kafka.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
require 'logstash/namespace'
require 'logstash/inputs/base'
require 'jruby-kafka'
require 'stud/interval'

# This input will read events from a Kafka topic. It uses the high level consumer API provided
# by Kafka to read messages from the broker. It also maintains the state of what has been
Expand Down Expand Up @@ -133,31 +134,32 @@ def run(logstash_queue)
@logger.info('Running kafka', :group_id => @group_id, :topic_id => @topic_id, :zk_connect => @zk_connect)
begin
@consumer_group.run(@consumer_threads,@kafka_client_queue)
begin
while true

while !stop?
if !@kafka_client_queue.empty?
event = @kafka_client_queue.pop
queue_event(event, logstash_queue)
end
rescue LogStash::ShutdownSignal
@logger.info('Kafka got shutdown signal')
@consumer_group.shutdown
end

until @kafka_client_queue.empty?
queue_event(@kafka_client_queue.pop,logstash_queue)
end

@logger.info('Done running kafka input')
rescue => e
@logger.warn('kafka client threw exception, restarting',
:exception => e)
if @consumer_group.running?
@consumer_group.shutdown
end
sleep(Float(@consumer_restart_sleep_ms) * 1 / 1000)
retry
Stud.stoppable_sleep(Float(@consumer_restart_sleep_ms) * 1 / 1000) { stop? }
retry if !stop?
end
finished
end # def run

public
def stop
@consumer_group.shutdown if @consumer_group.running?
end

private
def create_consumer_group(options)
Kafka::Group.new(options)
Expand All @@ -182,5 +184,4 @@ def queue_event(message_and_metadata, output_queue)
:backtrace => e.backtrace)
end # begin
end # def queue_event

end #class LogStash::Inputs::Kafka
1 change: 1 addition & 0 deletions logstash-input-kafka.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ Gem::Specification.new do |s|
s.add_runtime_dependency "logstash-core", '>= 1.4.0', '< 2.0.0'
s.add_runtime_dependency 'logstash-codec-json'
s.add_runtime_dependency 'logstash-codec-plain'
s.add_runtime_dependency 'stud', '>= 0.0.22', '< 0.1.0'

s.add_runtime_dependency 'jruby-kafka', ['>= 1.2.0', '< 2.0.0']

Expand Down
49 changes: 45 additions & 4 deletions spec/inputs/kafka_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@ class LogStash::Inputs::TestKafka < LogStash::Inputs::Kafka
private
def queue_event(msg, output_queue)
super(msg, output_queue)
# need to raise exception here to stop the infinite loop
raise LogStash::ShutdownSignal
do_stop
end
end

Expand All @@ -30,7 +29,26 @@ def run(a_num_threads, a_queue)
end
end

describe 'inputs/kafka' do
class LogStash::Inputs::TestInfiniteKafka < LogStash::Inputs::Kafka
private
def queue_event(msg, output_queue)
super(msg, output_queue)
end
end

class TestInfiniteKafkaGroup < Kafka::Group
def run(a_num_threads, a_queue)
blah = TestMessageAndMetadata.new(@topic, 0, nil, 'Kafka message')
Thread.new do
while true
a_queue << blah
sleep 0.2
end
end
end
end

describe LogStash::Inputs::Kafka do
let (:kafka_config) {{'topic_id' => 'test'}}
let (:empty_config) {{}}
let (:bad_kafka_config) {{'topic_id' => 'test', 'white_list' => 'other_topic'}}
Expand All @@ -57,6 +75,30 @@ def run(a_num_threads, a_queue)
expect {input.register}.to raise_error
end

context "interrupted plugin" do
let(:plugin) { LogStash::Inputs::TestInfiniteKafka.new(kafka_config) }
let!(:queue) { SizedQueue.new(20) }
let!(:consumer_thread) { Thread.new(queue) { |queue| loop { queue.pop } } }
subject { Thread.new(plugin, queue) { |plugin, queue| plugin.run(queue) } }

before do
expect(plugin).to receive(:create_consumer_group) do |options|
TestInfiniteKafkaGroup.new(options)
end
plugin.register
end

after do
Thread.kill(consumer_thread)
end

it "should shutdown when stopped is called" do
expect(subject).to be_alive
plugin.do_stop
wait(3).for { subject }.to_not be_alive
end
end

it 'should populate kafka config with default values' do
kafka = LogStash::Inputs::TestKafka.new(kafka_config)
insist {kafka.zk_connect} == 'localhost:2181'
Expand Down Expand Up @@ -98,5 +140,4 @@ def run(a_num_threads, a_queue)
insist { e['kafka']['partition'] } == 0
insist { e['kafka']['key'] } == nil
end

end

0 comments on commit b76e1fe

Please sign in to comment.