From 30fea9da7f28d202ed1a2e052046c4687dffb494 Mon Sep 17 00:00:00 2001 From: Masahiro Nakagawa Date: Thu, 13 Oct 2016 08:29:50 +0900 Subject: [PATCH] Chunks in Compat layer should provide msgpack_each method. fix #1272 --- lib/fluent/compat/output.rb | 3 ++ test/plugin/test_output_as_buffered.rb | 56 ++++++++++++++++++++++++++ 2 files changed, 59 insertions(+) diff --git a/lib/fluent/compat/output.rb b/lib/fluent/compat/output.rb index 9547497173..bbad075a02 100644 --- a/lib/fluent/compat/output.rb +++ b/lib/fluent/compat/output.rb @@ -26,6 +26,7 @@ require 'fluent/compat/output_chain' require 'fluent/timezone' require 'fluent/mixin' +require 'fluent/event' require 'fluent/process' # to load Fluent::DetachProcessMixin require 'fluent/plugin_helper/compat_parameters' @@ -123,6 +124,7 @@ module BufferedChunkMixin # prepend this module to BufferedOutput (including ObjectBufferedOutput) plugin singleton class def write(chunk) chunk.extend(ChunkSizeCompatMixin) + chunk.extend(ChunkMessagePackEventStreamer) chunk.extend(AddKeyToChunkMixin) if chunk.metadata.variables && chunk.metadata.variables.has_key?(:key) super end @@ -132,6 +134,7 @@ module TimeSliceChunkMixin # prepend this module to TimeSlicedOutput plugin singleton class def write(chunk) chunk.extend(ChunkSizeCompatMixin) + chunk.extend(ChunkMessagePackEventStreamer) chunk.extend(AddTimeSliceKeyToChunkMixin) chunk.time_slice_format = @time_slice_format chunk.timekey = @_timekey diff --git a/test/plugin/test_output_as_buffered.rb b/test/plugin/test_output_as_buffered.rb index 53b7e613b3..5c912f0e7b 100644 --- a/test/plugin/test_output_as_buffered.rb +++ b/test/plugin/test_output_as_buffered.rb @@ -1,6 +1,7 @@ require_relative '../helper' require 'fluent/plugin/output' require 'fluent/plugin/buffer' +require 'fluent/output' require 'fluent/event' require 'json' @@ -126,6 +127,28 @@ def try_write(chunk) @try_write ? @try_write.call(chunk) : nil end end + module OldPluginMethodMixin + def initialize + super + @format = nil + @write = nil + end + def register(name, &block) + instance_variable_set("@#{name}", block) + end + def format(tag, time, record) + @format ? @format.call(tag, time, record) : [tag, time, record].to_json + end + def write(chunk) + @write ? @write.call(chunk) : nil + end + end + class DummyOldBufferedOutput < Fluent::BufferedOutput + include OldPluginMethodMixin + end + class DummyOldObjectBufferedOutput < Fluent::ObjectBufferedOutput + include OldPluginMethodMixin + end end class BufferedOutputTest < Test::Unit::TestCase @@ -138,6 +161,8 @@ def create_output(type=:full) when :standard then FluentPluginOutputAsBufferedTest::DummyStandardBufferedOutput.new when :custom then FluentPluginOutputAsBufferedTest::DummyCustomFormatBufferedOutput.new when :full then FluentPluginOutputAsBufferedTest::DummyFullFeatureOutput.new + when :old_buf then FluentPluginOutputAsBufferedTest::DummyOldBufferedOutput.new + when :old_obj then FluentPluginOutputAsBufferedTest::DummyOldObjectBufferedOutput.new else raise ArgumentError, "unknown type: #{type}" end @@ -346,6 +371,37 @@ def waiting(seconds) assert_equal events, each_pushed.map{|tag,time,record| [time,record]} end end + + data(:BufferedOutput => :old_buf, + :ObjectBufferedOutput => :old_obj) + test 'old plugin types can iterate chunk by msgpack_each in #write' do |plugin_type| + events_from_chunk = [] + # event_emitter helper requires Engine.root_agent for routing + ra = Fluent::RootAgent.new(log: $log) + stub(Fluent::Engine).root_agent { ra } + @i = create_output(plugin_type) + @i.configure(config_element('ROOT', '', {}, [config_element('buffer', '', @hash)])) + @i.register(:format) { |tag, time, record| [time, record].to_msgpack } + @i.register(:write) { |chunk| e = []; chunk.msgpack_each { |t, r| e << [t, r] }; events_from_chunk << [:write, e]; } + @i.start + @i.after_start + + events = [ + [event_time('2016-10-05 16:16:16 -0700'), {"message" => "yaaaaaaaaay!"}], + [event_time('2016-10-05 16:16:17 -0700'), {"message" => "yoooooooooy!"}], + ] + + @i.emit_events("test.tag", Fluent::ArrayEventStream.new(events)) + @i.emit_events("test.tag", Fluent::ArrayEventStream.new(events)) + + waiting(5) { sleep 0.1 until events_from_chunk.size == 2 } + + assert_equal 2, events_from_chunk.size + 2.times.each do |i| + assert_equal :write, events_from_chunk[i][0] + assert_equal events, events_from_chunk[i][1] + end + end end sub_test_case 'buffered output configured with many chunk keys' do