From c2909621e06e21773aad785cf51b3a3e5060075f Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Thu, 6 Oct 2016 18:44:08 +0900 Subject: [PATCH 1/3] chunk.each (chunk.msgpack_each) should be available only for chunks contains msgpack binary data. It's original design, but not implemented when v0.14 output plugins were introduced. Currently there are not so many output plugins using v0.14 buffered output API, so it's time to change. chunk.each (chunk.msgpack_each) will be available only for: * output plugins using standard format (doesn't implement #format method) * output plugins marked to have #format method returning msgpack binary data --- lib/fluent/plugin/buffer/chunk.rb | 1 - test/plugin/test_buffer_chunk.rb | 18 +++++++++++++++--- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/lib/fluent/plugin/buffer/chunk.rb b/lib/fluent/plugin/buffer/chunk.rb index 2f6b3bcb09..dc6b9f40bd 100644 --- a/lib/fluent/plugin/buffer/chunk.rb +++ b/lib/fluent/plugin/buffer/chunk.rb @@ -29,7 +29,6 @@ class Buffer # fluent/plugin/buffer is already loaded class Chunk include MonitorMixin include UniqueId::Mixin - include ChunkMessagePackEventStreamer # Chunks has 2 part: # * metadata: contains metadata which should be restored after resume (if possible) diff --git a/test/plugin/test_buffer_chunk.rb b/test/plugin/test_buffer_chunk.rb index ceb39d891d..5a1bfeb9da 100644 --- a/test/plugin/test_buffer_chunk.rb +++ b/test/plugin/test_buffer_chunk.rb @@ -31,7 +31,6 @@ class BufferChunkTest < Test::Unit::TestCase assert chunk.respond_to?(:read) assert chunk.respond_to?(:open) assert chunk.respond_to?(:write_to) - assert chunk.respond_to?(:msgpack_each) assert_raise(NotImplementedError){ chunk.append([]) } assert_raise(NotImplementedError){ chunk.concat(nil, 0) } assert_raise(NotImplementedError){ chunk.commit } @@ -43,7 +42,19 @@ class BufferChunkTest < Test::Unit::TestCase assert_raise(NotImplementedError){ chunk.read } assert_raise(NotImplementedError){ chunk.open(){} } assert_raise(NotImplementedError){ chunk.write_to(nil) } - assert_raise(NotImplementedError){ chunk.msgpack_each(){|v| v} } + assert !chunk.respond_to?(:msgpack_each) + end + + test 'has method #each and #msgpack_each only when extended by ChunkMessagePackEventStreamer' do + meta = Object.new + chunk = Fluent::Plugin::Buffer::Chunk.new(meta) + + assert !chunk.respond_to?(:each) + assert !chunk.respond_to?(:msgpack_each) + + chunk.extend Fluent::ChunkMessagePackEventStreamer + assert chunk.respond_to?(:each) + assert chunk.respond_to?(:msgpack_each) end test 'some methods raise ArgumentError with an option of `compressed: :gzip` and without extending Compressble`' do @@ -162,9 +173,10 @@ def open(**kwargs) assert "my data\nyour data\n", io.to_s end - test 'can feed objects into blocks with unpacking msgpack' do + test 'can feed objects into blocks with unpacking msgpack if ChunkMessagePackEventStreamer is included' do require 'msgpack' c = TestChunk.new(Object.new) + c.extend Fluent::ChunkMessagePackEventStreamer c.data << MessagePack.pack(['my data', 1]) c.data << MessagePack.pack(['your data', 2]) ary = [] From 446d2dc8dae64a00daac6b9ad6dd935e7e209420 Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Thu, 6 Oct 2016 18:48:37 +0900 Subject: [PATCH 2/3] Add method #formatted_to_msgpack_binary to mark that #format of this plugin returns msgpack binary to enable chunk.each to iterate content objects. --- lib/fluent/plugin/output.rb | 10 ++- test/plugin/test_output_as_buffered.rb | 113 +++++++++++++++++++++++++ 2 files changed, 122 insertions(+), 1 deletion(-) diff --git a/lib/fluent/plugin/output.rb b/lib/fluent/plugin/output.rb index a33a344ae1..6d3b9e1246 100644 --- a/lib/fluent/plugin/output.rb +++ b/lib/fluent/plugin/output.rb @@ -119,6 +119,12 @@ def format(tag, time, record) raise NotImplementedError, "BUG: output plugins MUST implement this method" end + def formatted_to_msgpack_binary + # To indicate custom format method (#format) returns msgpack binary or not. + # If #format returns msgpack binary, override this method to return true. + false + end + def prefer_buffered_processing # override this method to return false only when all of these are true: # * plugin has both implementation for buffered and non-buffered methods @@ -176,6 +182,7 @@ def initialize @buffering = true end @custom_format = implement?(:custom_format) + @enable_msgpack_streamer = false # decided later @buffer = nil @secondary = nil @@ -340,6 +347,7 @@ def start end @custom_format = implement?(:custom_format) + @enable_msgpack_streamer = @custom_format ? formatted_to_msgpack_binary : true @delayed_commit = if implement?(:buffered) && implement?(:delayed_commit) prefer_delayed_commit else @@ -955,7 +963,7 @@ def try_flush using_secondary = true end - unless @custom_format + if @enable_msgpack_streamer chunk.extend ChunkMessagePackEventStreamer end diff --git a/test/plugin/test_output_as_buffered.rb b/test/plugin/test_output_as_buffered.rb index 7c7a9c271c..fe497594cb 100644 --- a/test/plugin/test_output_as_buffered.rb +++ b/test/plugin/test_output_as_buffered.rb @@ -56,6 +56,47 @@ def shutdown super end end + class DummyStandardBufferedOutput < DummyBareOutput + def initialize + super + @prefer_delayed_commit = nil + @write = nil + @try_write = nil + end + def prefer_delayed_commit + @prefer_delayed_commit ? @prefer_delayed_commit.call : false + end + def write(chunk) + @write ? @write.call(chunk) : nil + end + def try_write(chunk) + @try_write ? @try_write.call(chunk) : nil + end + end + class DummyCustomFormatBufferedOutput < DummyBareOutput + def initialize + super + @format_type_is_msgpack = nil + @prefer_delayed_commit = nil + @write = nil + @try_write = nil + end + def format(tag, time, record) + @format ? @format.call(tag, time, record) : [tag, time, record].to_json + end + def formatted_to_msgpack_binary + @format_type_is_msgpack ? @format_type_is_msgpack.call : false + end + def prefer_delayed_commit + @prefer_delayed_commit ? @prefer_delayed_commit.call : false + end + def write(chunk) + @write ? @write.call(chunk) : nil + end + def try_write(chunk) + @try_write ? @try_write.call(chunk) : nil + end + end class DummyFullFeatureOutput < DummyBareOutput def initialize super @@ -94,6 +135,8 @@ def create_output(type=:full) when :sync then FluentPluginOutputAsBufferedTest::DummySyncOutput.new when :buffered then FluentPluginOutputAsBufferedTest::DummyAsyncOutput.new when :delayed then FluentPluginOutputAsBufferedTest::DummyDelayedOutput.new + when :standard then FluentPluginOutputAsBufferedTest::DummyStandardBufferedOutput.new + when :custom then FluentPluginOutputAsBufferedTest::DummyCustomFormatBufferedOutput.new when :full then FluentPluginOutputAsBufferedTest::DummyFullFeatureOutput.new else raise ArgumentError, "unknown type: #{type}" @@ -125,6 +168,76 @@ def waiting(seconds) Timecop.return end + sub_test_case 'chunk feature in #write for output plugins' do + setup do + @stored_global_logger = $log + $log = Fluent::Test::TestLogger.new + @hash = { + 'flush_mode' => 'immediate', + 'flush_thread_interval' => '0.01', + 'flush_thread_burst_interval' => '0.01', + } + end + + teardown do + $log = @stored_global_logger + end + + test 'plugin using standard format can iterate chunk for time, record in #write' do + events_from_chunk = [] + @i = create_output(:standard) + @i.configure(config_element('ROOT','',{},[config_element('buffer','',@hash)])) + @i.register(:prefer_delayed_commit){ false } + @i.register(:write){ |chunk| e = []; assert chunk.respond_to?(:each); chunk.each{|t,r| e << [t,r]}; events_from_chunk << [:write, e] } + @i.register(:try_write){ |chunk| e = []; assert chunk.respond_to?(:each); chunk.each{|t,r| e << [t,r]}; events_from_chunk << [:try_write, e] } + @i.start + @i.after_start + + events = [ + [event_time('2016-10-05 16:16:16 -0700'), {"message" => "yaaaaaaaaay!"}], + [event_time('2016-10-05 16:16:17 -0700'), {"message" => "yoooooooooy!"}], + ] + + @i.emit_events("test.tag", Fluent::ArrayEventStream.new(events)) + @i.emit_events("test.tag", Fluent::ArrayEventStream.new(events)) + + waiting(5){ sleep 0.1 until events_from_chunk.size == 2 } + + assert_equal 2, events_from_chunk.size + 2.times.each do |i| + assert_equal :write, events_from_chunk[i][0] + assert_equal events, events_from_chunk[i][1] + end + end + + test 'plugin using standard format can iterate chunk for time, record in #try_write' do + events_from_chunk = [] + @i = create_output(:standard) + @i.configure(config_element('ROOT','',{},[config_element('buffer','',@hash)])) + @i.register(:prefer_delayed_commit){ true } + @i.register(:write){ |chunk| e = []; assert chunk.respond_to?(:each); chunk.each{|t,r| e << [t,r]}; events_from_chunk << [:write, e] } + @i.register(:try_write){ |chunk| e = []; assert chunk.respond_to?(:each); chunk.each{|t,r| e << [t,r]}; events_from_chunk << [:try_write, e] } + @i.start + @i.after_start + + events = [ + [event_time('2016-10-05 16:16:16 -0700'), {"message" => "yaaaaaaaaay!"}], + [event_time('2016-10-05 16:16:17 -0700'), {"message" => "yoooooooooy!"}], + ] + + @i.emit_events("test.tag", Fluent::ArrayEventStream.new(events)) + @i.emit_events("test.tag", Fluent::ArrayEventStream.new(events)) + + waiting(5){ sleep 0.1 until events_from_chunk.size == 2 } + + assert_equal 2, events_from_chunk.size + 2.times.each do |i| + assert_equal :try_write, events_from_chunk[i][0] + assert_equal events, events_from_chunk[i][1] + end + end + end + sub_test_case 'buffered output configured with many chunk keys' do setup do @stored_global_logger = $log From f63b6752d7cbd089da897829096b57ea4ff74e25 Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Fri, 7 Oct 2016 09:59:19 +0900 Subject: [PATCH 3/3] add tests about custom format plugin --- test/plugin/test_output_as_buffered.rb | 110 +++++++++++++++++++++++++ 1 file changed, 110 insertions(+) diff --git a/test/plugin/test_output_as_buffered.rb b/test/plugin/test_output_as_buffered.rb index fe497594cb..53b7e613b3 100644 --- a/test/plugin/test_output_as_buffered.rb +++ b/test/plugin/test_output_as_buffered.rb @@ -236,6 +236,116 @@ def waiting(seconds) assert_equal events, events_from_chunk[i][1] end end + + test 'plugin using custom format cannot iterate chunk in #write' do + events_from_chunk = [] + @i = create_output(:custom) + @i.configure(config_element('ROOT','',{},[config_element('buffer','',@hash)])) + @i.register(:prefer_delayed_commit){ false } + @i.register(:format){ |tag, time, record| [tag,time,record].to_json } + @i.register(:format_type_is_msgpack){ false } + @i.register(:write){ |chunk| assert !(chunk.respond_to?(:each)) } + @i.register(:try_write){ |chunk| assert !(chunk.respond_to?(:each)) } + @i.start + @i.after_start + + events = [ + [event_time('2016-10-05 16:16:16 -0700'), {"message" => "yaaaaaaaaay!"}], + [event_time('2016-10-05 16:16:17 -0700'), {"message" => "yoooooooooy!"}], + ] + + @i.emit_events("test.tag", Fluent::ArrayEventStream.new(events)) + + assert_equal 0, events_from_chunk.size + end + + test 'plugin using custom format cannot iterate chunk in #try_write' do + events_from_chunk = [] + @i = create_output(:custom) + @i.configure(config_element('ROOT','',{},[config_element('buffer','',@hash)])) + @i.register(:prefer_delayed_commit){ true } + @i.register(:format){ |tag, time, record| [tag,time,record].to_json } + @i.register(:format_type_is_msgpack){ false } + @i.register(:write){ |chunk| assert !(chunk.respond_to?(:each)) } + @i.register(:try_write){ |chunk| assert !(chunk.respond_to?(:each)) } + @i.start + @i.after_start + + events = [ + [event_time('2016-10-05 16:16:16 -0700'), {"message" => "yaaaaaaaaay!"}], + [event_time('2016-10-05 16:16:17 -0700'), {"message" => "yoooooooooy!"}], + ] + + @i.emit_events("test.tag", Fluent::ArrayEventStream.new(events)) + + assert_equal 0, events_from_chunk.size + end + + test 'plugin using custom format can iterate chunk in #write if #format returns msgpack' do + events_from_chunk = [] + @i = create_output(:custom) + @i.configure(config_element('ROOT','',{},[config_element('buffer','',@hash)])) + @i.register(:prefer_delayed_commit){ false } + @i.register(:format){ |tag, time, record| [tag,time,record].to_msgpack } + @i.register(:format_type_is_msgpack){ true } + @i.register(:write){ |chunk| e = []; assert chunk.respond_to?(:each); chunk.each{|ta,t,r| e << [ta,t,r]}; events_from_chunk << [:write, e] } + @i.register(:try_write){ |chunk| e = []; assert chunk.respond_to?(:each); chunk.each{|ta,t,r| e << [ta,t,r]}; events_from_chunk << [:try_write, e] } + @i.start + @i.after_start + + events = [ + [event_time('2016-10-05 16:16:16 -0700'), {"message" => "yaaaaaaaaay!"}], + [event_time('2016-10-05 16:16:17 -0700'), {"message" => "yoooooooooy!"}], + ] + + @i.emit_events("test.tag", Fluent::ArrayEventStream.new(events)) + @i.emit_events("test.tag", Fluent::ArrayEventStream.new(events)) + + waiting(5){ sleep 0.1 until events_from_chunk.size == 2 } + + assert_equal 2, events_from_chunk.size + 2.times.each do |i| + assert_equal :write, events_from_chunk[i][0] + each_pushed = events_from_chunk[i][1] + assert_equal 2, each_pushed.size + assert_equal 'test.tag', each_pushed[0][0] + assert_equal 'test.tag', each_pushed[1][0] + assert_equal events, each_pushed.map{|tag,time,record| [time,record]} + end + end + + test 'plugin using custom format can iterate chunk in #try_write if #format returns msgpack' do + events_from_chunk = [] + @i = create_output(:custom) + @i.configure(config_element('ROOT','',{},[config_element('buffer','',@hash)])) + @i.register(:prefer_delayed_commit){ true } + @i.register(:format){ |tag, time, record| [tag,time,record].to_msgpack } + @i.register(:format_type_is_msgpack){ true } + @i.register(:write){ |chunk| events_from_chunk = []; assert chunk.respond_to?(:each); chunk.each{|ta,t,r| e << [ta,t,r]}; events_from_chunk << [:write, e] } + @i.register(:try_write){ |chunk| e = []; assert chunk.respond_to?(:each); chunk.each{|ta,t,r| e << [ta,t,r]}; events_from_chunk << [:try_write, e] } + @i.start + @i.after_start + + events = [ + [event_time('2016-10-05 16:16:16 -0700'), {"message" => "yaaaaaaaaay!"}], + [event_time('2016-10-05 16:16:17 -0700'), {"message" => "yoooooooooy!"}], + ] + + @i.emit_events("test.tag", Fluent::ArrayEventStream.new(events)) + @i.emit_events("test.tag", Fluent::ArrayEventStream.new(events)) + + waiting(5){ sleep 0.1 until events_from_chunk.size == 2 } + + assert_equal 2, events_from_chunk.size + 2.times.each do |i| + assert_equal :try_write, events_from_chunk[i][0] + each_pushed = events_from_chunk[i][1] + assert_equal 2, each_pushed.size + assert_equal 'test.tag', each_pushed[0][0] + assert_equal 'test.tag', each_pushed[1][0] + assert_equal events, each_pushed.map{|tag,time,record| [time,record]} + end + end end sub_test_case 'buffered output configured with many chunk keys' do