diff --git a/lib/fluent/plugin/buffer/chunk.rb b/lib/fluent/plugin/buffer/chunk.rb index 2f6b3bcb09..dc6b9f40bd 100644 --- a/lib/fluent/plugin/buffer/chunk.rb +++ b/lib/fluent/plugin/buffer/chunk.rb @@ -29,7 +29,6 @@ class Buffer # fluent/plugin/buffer is already loaded class Chunk include MonitorMixin include UniqueId::Mixin - include ChunkMessagePackEventStreamer # Chunks has 2 part: # * metadata: contains metadata which should be restored after resume (if possible) diff --git a/lib/fluent/plugin/output.rb b/lib/fluent/plugin/output.rb index a33a344ae1..6d3b9e1246 100644 --- a/lib/fluent/plugin/output.rb +++ b/lib/fluent/plugin/output.rb @@ -119,6 +119,12 @@ def format(tag, time, record) raise NotImplementedError, "BUG: output plugins MUST implement this method" end + def formatted_to_msgpack_binary + # To indicate custom format method (#format) returns msgpack binary or not. + # If #format returns msgpack binary, override this method to return true. + false + end + def prefer_buffered_processing # override this method to return false only when all of these are true: # * plugin has both implementation for buffered and non-buffered methods @@ -176,6 +182,7 @@ def initialize @buffering = true end @custom_format = implement?(:custom_format) + @enable_msgpack_streamer = false # decided later @buffer = nil @secondary = nil @@ -340,6 +347,7 @@ def start end @custom_format = implement?(:custom_format) + @enable_msgpack_streamer = @custom_format ? formatted_to_msgpack_binary : true @delayed_commit = if implement?(:buffered) && implement?(:delayed_commit) prefer_delayed_commit else @@ -955,7 +963,7 @@ def try_flush using_secondary = true end - unless @custom_format + if @enable_msgpack_streamer chunk.extend ChunkMessagePackEventStreamer end diff --git a/test/plugin/test_buffer_chunk.rb b/test/plugin/test_buffer_chunk.rb index ceb39d891d..5a1bfeb9da 100644 --- a/test/plugin/test_buffer_chunk.rb +++ b/test/plugin/test_buffer_chunk.rb @@ -31,7 +31,6 @@ class BufferChunkTest < Test::Unit::TestCase assert chunk.respond_to?(:read) assert chunk.respond_to?(:open) assert chunk.respond_to?(:write_to) - assert chunk.respond_to?(:msgpack_each) assert_raise(NotImplementedError){ chunk.append([]) } assert_raise(NotImplementedError){ chunk.concat(nil, 0) } assert_raise(NotImplementedError){ chunk.commit } @@ -43,7 +42,19 @@ class BufferChunkTest < Test::Unit::TestCase assert_raise(NotImplementedError){ chunk.read } assert_raise(NotImplementedError){ chunk.open(){} } assert_raise(NotImplementedError){ chunk.write_to(nil) } - assert_raise(NotImplementedError){ chunk.msgpack_each(){|v| v} } + assert !chunk.respond_to?(:msgpack_each) + end + + test 'has method #each and #msgpack_each only when extended by ChunkMessagePackEventStreamer' do + meta = Object.new + chunk = Fluent::Plugin::Buffer::Chunk.new(meta) + + assert !chunk.respond_to?(:each) + assert !chunk.respond_to?(:msgpack_each) + + chunk.extend Fluent::ChunkMessagePackEventStreamer + assert chunk.respond_to?(:each) + assert chunk.respond_to?(:msgpack_each) end test 'some methods raise ArgumentError with an option of `compressed: :gzip` and without extending Compressble`' do @@ -162,9 +173,10 @@ def open(**kwargs) assert "my data\nyour data\n", io.to_s end - test 'can feed objects into blocks with unpacking msgpack' do + test 'can feed objects into blocks with unpacking msgpack if ChunkMessagePackEventStreamer is included' do require 'msgpack' c = TestChunk.new(Object.new) + c.extend Fluent::ChunkMessagePackEventStreamer c.data << MessagePack.pack(['my data', 1]) c.data << MessagePack.pack(['your data', 2]) ary = [] diff --git a/test/plugin/test_output_as_buffered.rb b/test/plugin/test_output_as_buffered.rb index 7c7a9c271c..53b7e613b3 100644 --- a/test/plugin/test_output_as_buffered.rb +++ b/test/plugin/test_output_as_buffered.rb @@ -56,6 +56,47 @@ def shutdown super end end + class DummyStandardBufferedOutput < DummyBareOutput + def initialize + super + @prefer_delayed_commit = nil + @write = nil + @try_write = nil + end + def prefer_delayed_commit + @prefer_delayed_commit ? @prefer_delayed_commit.call : false + end + def write(chunk) + @write ? @write.call(chunk) : nil + end + def try_write(chunk) + @try_write ? @try_write.call(chunk) : nil + end + end + class DummyCustomFormatBufferedOutput < DummyBareOutput + def initialize + super + @format_type_is_msgpack = nil + @prefer_delayed_commit = nil + @write = nil + @try_write = nil + end + def format(tag, time, record) + @format ? @format.call(tag, time, record) : [tag, time, record].to_json + end + def formatted_to_msgpack_binary + @format_type_is_msgpack ? @format_type_is_msgpack.call : false + end + def prefer_delayed_commit + @prefer_delayed_commit ? @prefer_delayed_commit.call : false + end + def write(chunk) + @write ? @write.call(chunk) : nil + end + def try_write(chunk) + @try_write ? @try_write.call(chunk) : nil + end + end class DummyFullFeatureOutput < DummyBareOutput def initialize super @@ -94,6 +135,8 @@ def create_output(type=:full) when :sync then FluentPluginOutputAsBufferedTest::DummySyncOutput.new when :buffered then FluentPluginOutputAsBufferedTest::DummyAsyncOutput.new when :delayed then FluentPluginOutputAsBufferedTest::DummyDelayedOutput.new + when :standard then FluentPluginOutputAsBufferedTest::DummyStandardBufferedOutput.new + when :custom then FluentPluginOutputAsBufferedTest::DummyCustomFormatBufferedOutput.new when :full then FluentPluginOutputAsBufferedTest::DummyFullFeatureOutput.new else raise ArgumentError, "unknown type: #{type}" @@ -125,6 +168,186 @@ def waiting(seconds) Timecop.return end + sub_test_case 'chunk feature in #write for output plugins' do + setup do + @stored_global_logger = $log + $log = Fluent::Test::TestLogger.new + @hash = { + 'flush_mode' => 'immediate', + 'flush_thread_interval' => '0.01', + 'flush_thread_burst_interval' => '0.01', + } + end + + teardown do + $log = @stored_global_logger + end + + test 'plugin using standard format can iterate chunk for time, record in #write' do + events_from_chunk = [] + @i = create_output(:standard) + @i.configure(config_element('ROOT','',{},[config_element('buffer','',@hash)])) + @i.register(:prefer_delayed_commit){ false } + @i.register(:write){ |chunk| e = []; assert chunk.respond_to?(:each); chunk.each{|t,r| e << [t,r]}; events_from_chunk << [:write, e] } + @i.register(:try_write){ |chunk| e = []; assert chunk.respond_to?(:each); chunk.each{|t,r| e << [t,r]}; events_from_chunk << [:try_write, e] } + @i.start + @i.after_start + + events = [ + [event_time('2016-10-05 16:16:16 -0700'), {"message" => "yaaaaaaaaay!"}], + [event_time('2016-10-05 16:16:17 -0700'), {"message" => "yoooooooooy!"}], + ] + + @i.emit_events("test.tag", Fluent::ArrayEventStream.new(events)) + @i.emit_events("test.tag", Fluent::ArrayEventStream.new(events)) + + waiting(5){ sleep 0.1 until events_from_chunk.size == 2 } + + assert_equal 2, events_from_chunk.size + 2.times.each do |i| + assert_equal :write, events_from_chunk[i][0] + assert_equal events, events_from_chunk[i][1] + end + end + + test 'plugin using standard format can iterate chunk for time, record in #try_write' do + events_from_chunk = [] + @i = create_output(:standard) + @i.configure(config_element('ROOT','',{},[config_element('buffer','',@hash)])) + @i.register(:prefer_delayed_commit){ true } + @i.register(:write){ |chunk| e = []; assert chunk.respond_to?(:each); chunk.each{|t,r| e << [t,r]}; events_from_chunk << [:write, e] } + @i.register(:try_write){ |chunk| e = []; assert chunk.respond_to?(:each); chunk.each{|t,r| e << [t,r]}; events_from_chunk << [:try_write, e] } + @i.start + @i.after_start + + events = [ + [event_time('2016-10-05 16:16:16 -0700'), {"message" => "yaaaaaaaaay!"}], + [event_time('2016-10-05 16:16:17 -0700'), {"message" => "yoooooooooy!"}], + ] + + @i.emit_events("test.tag", Fluent::ArrayEventStream.new(events)) + @i.emit_events("test.tag", Fluent::ArrayEventStream.new(events)) + + waiting(5){ sleep 0.1 until events_from_chunk.size == 2 } + + assert_equal 2, events_from_chunk.size + 2.times.each do |i| + assert_equal :try_write, events_from_chunk[i][0] + assert_equal events, events_from_chunk[i][1] + end + end + + test 'plugin using custom format cannot iterate chunk in #write' do + events_from_chunk = [] + @i = create_output(:custom) + @i.configure(config_element('ROOT','',{},[config_element('buffer','',@hash)])) + @i.register(:prefer_delayed_commit){ false } + @i.register(:format){ |tag, time, record| [tag,time,record].to_json } + @i.register(:format_type_is_msgpack){ false } + @i.register(:write){ |chunk| assert !(chunk.respond_to?(:each)) } + @i.register(:try_write){ |chunk| assert !(chunk.respond_to?(:each)) } + @i.start + @i.after_start + + events = [ + [event_time('2016-10-05 16:16:16 -0700'), {"message" => "yaaaaaaaaay!"}], + [event_time('2016-10-05 16:16:17 -0700'), {"message" => "yoooooooooy!"}], + ] + + @i.emit_events("test.tag", Fluent::ArrayEventStream.new(events)) + + assert_equal 0, events_from_chunk.size + end + + test 'plugin using custom format cannot iterate chunk in #try_write' do + events_from_chunk = [] + @i = create_output(:custom) + @i.configure(config_element('ROOT','',{},[config_element('buffer','',@hash)])) + @i.register(:prefer_delayed_commit){ true } + @i.register(:format){ |tag, time, record| [tag,time,record].to_json } + @i.register(:format_type_is_msgpack){ false } + @i.register(:write){ |chunk| assert !(chunk.respond_to?(:each)) } + @i.register(:try_write){ |chunk| assert !(chunk.respond_to?(:each)) } + @i.start + @i.after_start + + events = [ + [event_time('2016-10-05 16:16:16 -0700'), {"message" => "yaaaaaaaaay!"}], + [event_time('2016-10-05 16:16:17 -0700'), {"message" => "yoooooooooy!"}], + ] + + @i.emit_events("test.tag", Fluent::ArrayEventStream.new(events)) + + assert_equal 0, events_from_chunk.size + end + + test 'plugin using custom format can iterate chunk in #write if #format returns msgpack' do + events_from_chunk = [] + @i = create_output(:custom) + @i.configure(config_element('ROOT','',{},[config_element('buffer','',@hash)])) + @i.register(:prefer_delayed_commit){ false } + @i.register(:format){ |tag, time, record| [tag,time,record].to_msgpack } + @i.register(:format_type_is_msgpack){ true } + @i.register(:write){ |chunk| e = []; assert chunk.respond_to?(:each); chunk.each{|ta,t,r| e << [ta,t,r]}; events_from_chunk << [:write, e] } + @i.register(:try_write){ |chunk| e = []; assert chunk.respond_to?(:each); chunk.each{|ta,t,r| e << [ta,t,r]}; events_from_chunk << [:try_write, e] } + @i.start + @i.after_start + + events = [ + [event_time('2016-10-05 16:16:16 -0700'), {"message" => "yaaaaaaaaay!"}], + [event_time('2016-10-05 16:16:17 -0700'), {"message" => "yoooooooooy!"}], + ] + + @i.emit_events("test.tag", Fluent::ArrayEventStream.new(events)) + @i.emit_events("test.tag", Fluent::ArrayEventStream.new(events)) + + waiting(5){ sleep 0.1 until events_from_chunk.size == 2 } + + assert_equal 2, events_from_chunk.size + 2.times.each do |i| + assert_equal :write, events_from_chunk[i][0] + each_pushed = events_from_chunk[i][1] + assert_equal 2, each_pushed.size + assert_equal 'test.tag', each_pushed[0][0] + assert_equal 'test.tag', each_pushed[1][0] + assert_equal events, each_pushed.map{|tag,time,record| [time,record]} + end + end + + test 'plugin using custom format can iterate chunk in #try_write if #format returns msgpack' do + events_from_chunk = [] + @i = create_output(:custom) + @i.configure(config_element('ROOT','',{},[config_element('buffer','',@hash)])) + @i.register(:prefer_delayed_commit){ true } + @i.register(:format){ |tag, time, record| [tag,time,record].to_msgpack } + @i.register(:format_type_is_msgpack){ true } + @i.register(:write){ |chunk| events_from_chunk = []; assert chunk.respond_to?(:each); chunk.each{|ta,t,r| e << [ta,t,r]}; events_from_chunk << [:write, e] } + @i.register(:try_write){ |chunk| e = []; assert chunk.respond_to?(:each); chunk.each{|ta,t,r| e << [ta,t,r]}; events_from_chunk << [:try_write, e] } + @i.start + @i.after_start + + events = [ + [event_time('2016-10-05 16:16:16 -0700'), {"message" => "yaaaaaaaaay!"}], + [event_time('2016-10-05 16:16:17 -0700'), {"message" => "yoooooooooy!"}], + ] + + @i.emit_events("test.tag", Fluent::ArrayEventStream.new(events)) + @i.emit_events("test.tag", Fluent::ArrayEventStream.new(events)) + + waiting(5){ sleep 0.1 until events_from_chunk.size == 2 } + + assert_equal 2, events_from_chunk.size + 2.times.each do |i| + assert_equal :try_write, events_from_chunk[i][0] + each_pushed = events_from_chunk[i][1] + assert_equal 2, each_pushed.size + assert_equal 'test.tag', each_pushed[0][0] + assert_equal 'test.tag', each_pushed[1][0] + assert_equal events, each_pushed.map{|tag,time,record| [time,record]} + end + end + end + sub_test_case 'buffered output configured with many chunk keys' do setup do @stored_global_logger = $log