Skip to content
This repository was archived by the owner on Dec 7, 2020. It is now read-only.

Commit 577facd

Browse files
committed
updated to match logstash changes
1 parent 1274176 commit 577facd

File tree

4 files changed

+42
-24
lines changed

4 files changed

+42
-24
lines changed

lib/logstash/inputs/kafka.rb

+18-7
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,12 @@ class LogStash::Inputs::Kafka < LogStash::Inputs::Base
2424
# Specifies the ZooKeeper connection string in the form hostname:port where host and port are
2525
# the host and port of a ZooKeeper server. You can also specify multiple hosts in the form
2626
# hostname1:port1,hostname2:port2,hostname3:port3.
27+
#
28+
# The server may also have a ZooKeeper chroot path as part of it's ZooKeeper connection string
29+
# which puts its data under some path in the global ZooKeeper namespace. If so the consumer
30+
# should use the same chroot path in its connection string. For example to give a chroot path of
31+
# /chroot/path you would give the connection string as
32+
# hostname1:port1,hostname2:port2,hostname3:port3/chroot/path.
2733
config :zk_connect, :validate => :string, :default => 'localhost:2181'
2834
# A string that uniquely identifies the group of consumer processes to which this consumer
2935
# belongs. By setting the same group id multiple processes indicate that they are all part of
@@ -34,6 +40,11 @@ class LogStash::Inputs::Kafka < LogStash::Inputs::Base
3440
# Specify whether to jump to beginning of the queue when there is no initial offset in
3541
# ZooKeeper, or if an offset is out of range. If this is false, messages are consumed
3642
# from the latest offset
43+
#
44+
# If reset_beginning is true, the consumer will check ZooKeeper to see if any other group members
45+
# are present and active. If not, the consumer deletes any offset information in the ZooKeeper
46+
# and starts at the smallest offset. If other group members are present reset_beginning will not
47+
# work and the consumer threads will rejoin the consumer group.
3748
config :reset_beginning, :validate => :boolean, :default => false
3849
# Number of threads to read from the partitions. Ideally you should have as many threads as the
3950
# number of partitions for a perfect balance. More threads than partitions means that some
@@ -69,7 +80,7 @@ class LogStash::Inputs::Kafka < LogStash::Inputs::Base
6980

7081
public
7182
def register
72-
jarpath = File.join(File.dirname(__FILE__), "../../../vendor/jar/kafka*/libs/*.jar")
83+
jarpath = File.join(File.dirname(__FILE__), '../../../vendor/jar/kafka*/libs/*.jar')
7384
Dir[jarpath].each do |jar|
7485
require jar
7586
end
@@ -86,7 +97,7 @@ def register
8697
:consumer_id => @consumer_id,
8798
:fetch_message_max_bytes => @fetch_message_max_bytes
8899
}
89-
if @reset_beginning == true
100+
if @reset_beginning
90101
options[:reset_beginning] = 'from-beginning'
91102
end # if :reset_beginning
92103
@kafka_client_queue = SizedQueue.new(@queue_size)
@@ -107,7 +118,7 @@ def run(logstash_queue)
107118
end
108119
rescue LogStash::ShutdownSignal
109120
@logger.info('Kafka got shutdown signal')
110-
@consumer_group.shutdown()
121+
@consumer_group.shutdown
111122
end
112123
until @kafka_client_queue.empty?
113124
queue_event("#{@kafka_client_queue.pop}",logstash_queue)
@@ -117,7 +128,7 @@ def run(logstash_queue)
117128
@logger.warn('kafka client threw exception, restarting',
118129
:exception => e)
119130
if @consumer_group.running?
120-
@consumer_group.shutdown()
131+
@consumer_group.shutdown
121132
end
122133
sleep(Float(@consumer_restart_sleep_ms) * 1 / 1000)
123134
retry
@@ -131,13 +142,13 @@ def queue_event(msg, output_queue)
131142
@codec.decode(msg) do |event|
132143
decorate(event)
133144
if @decorate_events
134-
event['kafka'] = {'msg_size' => msg.bytesize, 'topic' => @topic_id, 'consumer_group' => @group_id}
145+
event['kafka'] = {:msg_size => msg.bytesize, :topic => @topic_id, :consumer_group => @group_id}
135146
end
136147
output_queue << event
137148
end # @codec.decode
138149
rescue => e # parse or event creation error
139-
@logger.error("Failed to create event", :message => msg, :exception => e,
140-
:backtrace => e.backtrace);
150+
@logger.error('Failed to create event', :message => msg, :exception => e,
151+
:backtrace => e.backtrace)
141152
end # begin
142153
end # def queue_event
143154

lib/logstash/outputs/kafka.rb

+4-4
Original file line numberDiff line numberDiff line change
@@ -100,11 +100,11 @@ class LogStash::Outputs::Kafka < LogStash::Outputs::Base
100100
config :send_buffer_bytes, :validate => :number, :default => 100 * 1024
101101
# The client id is a user-specified string sent in each request to help trace calls. It should
102102
# logically identify the application making the request.
103-
config :client_id, :validate => :string, :default => ""
103+
config :client_id, :validate => :string, :default => ''
104104

105105
public
106106
def register
107-
jarpath = File.join(File.dirname(__FILE__), "../../../vendor/jar/kafka*/libs/*.jar")
107+
jarpath = File.join(File.dirname(__FILE__), '../../../vendor/jar/kafka*/libs/*.jar')
108108
Dir[jarpath].each do |jar|
109109
require jar
110110
end
@@ -130,13 +130,13 @@ def register
130130
:client_id => @client_id
131131
}
132132
@producer = Kafka::Producer.new(options)
133-
@producer.connect()
133+
@producer.connect
134134

135135
@logger.info('Registering kafka producer', :topic_id => @topic_id, :broker_list => @broker_list)
136136

137137
@codec.on_event do |event|
138138
begin
139-
@producer.sendMsg(@topic_id,nil,event)
139+
@producer.send_msg(@topic_id,nil,event)
140140
rescue LogStash::ShutdownSignal
141141
@logger.info('Kafka producer got shutdown signal')
142142
rescue => e

spec/inputs/kafka.rb

+10-5
Original file line numberDiff line numberDiff line change
@@ -7,17 +7,20 @@
77
require 'logstash/errors'
88

99
describe LogStash::Inputs::Kafka do
10+
extend LogStash::RSpec
11+
12+
let (:kafka_config) {{"topic_id" => "test"}}
1013

1114
it 'should populate kafka config with default values' do
12-
kafka = LogStash::Inputs::Kafka.new
15+
kafka = LogStash::Inputs::Kafka.new(kafka_config)
1316
insist {kafka.zk_connect} == "localhost:2181"
1417
insist {kafka.topic_id} == "test"
1518
insist {kafka.group_id} == "logstash"
1619
insist {kafka.reset_beginning} == false
1720
end
1821

19-
it "should load register and load kafka jars without errors" do
20-
kafka = LogStash::Inputs::Kafka.new
22+
it "should register and load kafka jars without errors" do
23+
kafka = LogStash::Inputs::Kafka.new(kafka_config)
2124
kafka.register
2225
end
2326

@@ -33,7 +36,7 @@ def queue_event(msg, output_queue)
3336
end
3437
end
3538

36-
kafka = LogStash::Inputs::TestKafka.new
39+
kafka = LogStash::Inputs::TestKafka.new(kafka_config)
3740
kafka.register
3841

3942
class Kafka::Group
@@ -47,6 +50,8 @@ def run(a_numThreads, a_queue)
4750
kafka.run logstash_queue
4851
e = logstash_queue.pop
4952
insist { e["message"] } == "Kafka message"
50-
insist { e["kafka"] } == {"msg_size"=>13, "topic"=>"test", "consumer_group"=>"logstash"}
53+
# no metadata by default
54+
insist { e["kafka"] } == nil
5155
end
56+
5257
end

spec/outputs/kafka.rb

+10-8
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,15 @@
33
require 'rspec'
44
require 'insist'
55
require 'logstash/namespace'
6-
require 'time'
6+
require "logstash/timestamp"
77
require 'logstash/outputs/kafka'
88

99
describe LogStash::Outputs::Kafka do
1010

11+
let (:kafka_config) {{"topic_id" => "test"}}
12+
1113
it 'should populate kafka config with default values' do
12-
kafka = LogStash::Outputs::Kafka.new
14+
kafka = LogStash::Outputs::Kafka.new(kafka_config)
1315
insist {kafka.broker_list} == "localhost:9092"
1416
insist {kafka.topic_id} == "test"
1517
insist {kafka.compression_codec} == "none"
@@ -18,18 +20,18 @@
1820
insist {kafka.producer_type} == "sync"
1921
end
2022

21-
it "should load register and load kafka jars without errors" do
22-
kafka = LogStash::Outputs::Kafka.new
23+
it "should register and load kafka jars without errors" do
24+
kafka = LogStash::Outputs::Kafka.new(kafka_config)
2325
kafka.register
2426
end
2527

2628
it "should send logstash event to kafka broker" do
27-
timestamp = Time.now
29+
timestamp = LogStash::Timestamp.now
2830
expect_any_instance_of(Kafka::Producer)
29-
.to receive(:sendMsg)
30-
.with("test", nil, "{\"message\":\"hello world\",\"host\":\"test\",\"@timestamp\":\"#{timestamp.iso8601(3)}\",\"@version\":\"1\"}")
31+
.to receive(:send_msg)
32+
.with("test", nil, "{\"message\":\"hello world\",\"host\":\"test\",\"@timestamp\":\"#{timestamp}\",\"@version\":\"1\"}")
3133
e = LogStash::Event.new({"message" => "hello world", "host" => "test", "@timestamp" => timestamp})
32-
kafka = LogStash::Outputs::Kafka.new
34+
kafka = LogStash::Outputs::Kafka.new(kafka_config)
3335
kafka.register
3436
kafka.receive(e)
3537
end

0 commit comments

Comments
 (0)