Skip to content

Commit

Permalink
add tests about custom format plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
tagomoris committed Oct 7, 2016
1 parent 446d2dc commit f63b675
Showing 1 changed file with 110 additions and 0 deletions.
110 changes: 110 additions & 0 deletions test/plugin/test_output_as_buffered.rb
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,116 @@ def waiting(seconds)
assert_equal events, events_from_chunk[i][1]
end
end

test 'plugin using custom format cannot iterate chunk in #write' do
events_from_chunk = []
@i = create_output(:custom)
@i.configure(config_element('ROOT','',{},[config_element('buffer','',@hash)]))
@i.register(:prefer_delayed_commit){ false }
@i.register(:format){ |tag, time, record| [tag,time,record].to_json }
@i.register(:format_type_is_msgpack){ false }
@i.register(:write){ |chunk| assert !(chunk.respond_to?(:each)) }
@i.register(:try_write){ |chunk| assert !(chunk.respond_to?(:each)) }
@i.start
@i.after_start

events = [
[event_time('2016-10-05 16:16:16 -0700'), {"message" => "yaaaaaaaaay!"}],
[event_time('2016-10-05 16:16:17 -0700'), {"message" => "yoooooooooy!"}],
]

@i.emit_events("test.tag", Fluent::ArrayEventStream.new(events))

assert_equal 0, events_from_chunk.size
end

test 'plugin using custom format cannot iterate chunk in #try_write' do
events_from_chunk = []
@i = create_output(:custom)
@i.configure(config_element('ROOT','',{},[config_element('buffer','',@hash)]))
@i.register(:prefer_delayed_commit){ true }
@i.register(:format){ |tag, time, record| [tag,time,record].to_json }
@i.register(:format_type_is_msgpack){ false }
@i.register(:write){ |chunk| assert !(chunk.respond_to?(:each)) }
@i.register(:try_write){ |chunk| assert !(chunk.respond_to?(:each)) }
@i.start
@i.after_start

events = [
[event_time('2016-10-05 16:16:16 -0700'), {"message" => "yaaaaaaaaay!"}],
[event_time('2016-10-05 16:16:17 -0700'), {"message" => "yoooooooooy!"}],
]

@i.emit_events("test.tag", Fluent::ArrayEventStream.new(events))

assert_equal 0, events_from_chunk.size
end

test 'plugin using custom format can iterate chunk in #write if #format returns msgpack' do
events_from_chunk = []
@i = create_output(:custom)
@i.configure(config_element('ROOT','',{},[config_element('buffer','',@hash)]))
@i.register(:prefer_delayed_commit){ false }
@i.register(:format){ |tag, time, record| [tag,time,record].to_msgpack }
@i.register(:format_type_is_msgpack){ true }
@i.register(:write){ |chunk| e = []; assert chunk.respond_to?(:each); chunk.each{|ta,t,r| e << [ta,t,r]}; events_from_chunk << [:write, e] }
@i.register(:try_write){ |chunk| e = []; assert chunk.respond_to?(:each); chunk.each{|ta,t,r| e << [ta,t,r]}; events_from_chunk << [:try_write, e] }
@i.start
@i.after_start

events = [
[event_time('2016-10-05 16:16:16 -0700'), {"message" => "yaaaaaaaaay!"}],
[event_time('2016-10-05 16:16:17 -0700'), {"message" => "yoooooooooy!"}],
]

@i.emit_events("test.tag", Fluent::ArrayEventStream.new(events))
@i.emit_events("test.tag", Fluent::ArrayEventStream.new(events))

waiting(5){ sleep 0.1 until events_from_chunk.size == 2 }

assert_equal 2, events_from_chunk.size
2.times.each do |i|
assert_equal :write, events_from_chunk[i][0]
each_pushed = events_from_chunk[i][1]
assert_equal 2, each_pushed.size
assert_equal 'test.tag', each_pushed[0][0]
assert_equal 'test.tag', each_pushed[1][0]
assert_equal events, each_pushed.map{|tag,time,record| [time,record]}
end
end

test 'plugin using custom format can iterate chunk in #try_write if #format returns msgpack' do
events_from_chunk = []
@i = create_output(:custom)
@i.configure(config_element('ROOT','',{},[config_element('buffer','',@hash)]))
@i.register(:prefer_delayed_commit){ true }
@i.register(:format){ |tag, time, record| [tag,time,record].to_msgpack }
@i.register(:format_type_is_msgpack){ true }
@i.register(:write){ |chunk| events_from_chunk = []; assert chunk.respond_to?(:each); chunk.each{|ta,t,r| e << [ta,t,r]}; events_from_chunk << [:write, e] }
@i.register(:try_write){ |chunk| e = []; assert chunk.respond_to?(:each); chunk.each{|ta,t,r| e << [ta,t,r]}; events_from_chunk << [:try_write, e] }
@i.start
@i.after_start

events = [
[event_time('2016-10-05 16:16:16 -0700'), {"message" => "yaaaaaaaaay!"}],
[event_time('2016-10-05 16:16:17 -0700'), {"message" => "yoooooooooy!"}],
]

@i.emit_events("test.tag", Fluent::ArrayEventStream.new(events))
@i.emit_events("test.tag", Fluent::ArrayEventStream.new(events))

waiting(5){ sleep 0.1 until events_from_chunk.size == 2 }

assert_equal 2, events_from_chunk.size
2.times.each do |i|
assert_equal :try_write, events_from_chunk[i][0]
each_pushed = events_from_chunk[i][1]
assert_equal 2, each_pushed.size
assert_equal 'test.tag', each_pushed[0][0]
assert_equal 'test.tag', each_pushed[1][0]
assert_equal events, each_pushed.map{|tag,time,record| [time,record]}
end
end
end

sub_test_case 'buffered output configured with many chunk keys' do
Expand Down

0 comments on commit f63b675

Please sign in to comment.