From 377a6cb836d5ae0a65324efb6cd8b472fa1836be Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Wed, 22 Jun 2016 20:51:59 +0900 Subject: [PATCH 1/6] add v0.14 test driver for MultiOutput plugins --- lib/fluent/test/driver/multi_output.rb | 52 ++++++++++++++++++++++++++ 1 file changed, 52 insertions(+) create mode 100644 lib/fluent/test/driver/multi_output.rb diff --git a/lib/fluent/test/driver/multi_output.rb b/lib/fluent/test/driver/multi_output.rb new file mode 100644 index 0000000000..a51c65e162 --- /dev/null +++ b/lib/fluent/test/driver/multi_output.rb @@ -0,0 +1,52 @@ +# +# Fluentd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +require 'fluent/test/driver/base' +require 'fluent/test/driver/event_feeder' + +require 'fluent/plugin/multi_output' + +module Fluent + module Test + module Driver + class MultiOutput < Base + include EventFeeder + + def initialize(klass, opts: {}, &block) + super + raise ArgumentError, "plugin is not an instance of Fluent::Plugin::MultiOutput" unless @instance.is_a? Fluent::Plugin::MultiOutput + @flush_buffer_at_cleanup = nil + end + + def run(flush: true, **kwargs, &block) + @flush_buffer_at_cleanup = flush + super(**kwargs, &block) + end + + def run_actual(**kwargs, &block) + super(**kwargs, &block) + if @flush_buffer_at_cleanup + @instance.outputs.each{|o| o.force_flush } + end + end + + def flush + @instance.outputs.each{|o| o.force_flush } + end + end + end + end +end From 4000e8be28277d90997a74a3874a07445ac2812a Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Wed, 22 Jun 2016 20:52:19 +0900 Subject: [PATCH 2/6] add configuration example to test copy/roundrobin plugins --- example/copy_roundrobin.conf | 39 ++++++++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) create mode 100644 example/copy_roundrobin.conf diff --git a/example/copy_roundrobin.conf b/example/copy_roundrobin.conf new file mode 100644 index 0000000000..935d29a45c --- /dev/null +++ b/example/copy_roundrobin.conf @@ -0,0 +1,39 @@ + + @type dummy + @label @test + tag test.copy + auto_increment_key id + + + + @type dummy + @label @test + tag test.rr + auto_increment_key id + + + \ No newline at end of file From 915734e0e9e2604559a64f0fac50be96a4a5ee8b Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Wed, 22 Jun 2016 20:53:20 +0900 Subject: [PATCH 3/6] port plugins for tests to v0.14 APIs, and fix/remove old tests for output plugins --- test/scripts/fluent/plugin/out_test.rb | 39 +++--- test/scripts/fluent/plugin/out_test2.rb | 80 ++++++++++++ test/test_output.rb | 154 +----------------------- 3 files changed, 104 insertions(+), 169 deletions(-) create mode 100644 test/scripts/fluent/plugin/out_test2.rb diff --git a/test/scripts/fluent/plugin/out_test.rb b/test/scripts/fluent/plugin/out_test.rb index 7a1cd75298..3dd66bf83f 100644 --- a/test/scripts/fluent/plugin/out_test.rb +++ b/test/scripts/fluent/plugin/out_test.rb @@ -14,17 +14,25 @@ # limitations under the License. # -module Fluent +require 'fluent/plugin/output' +require 'fluent/event' + +module Fluent::Plugin class TestOutput < Output - Plugin.register_output('test', self) + Fluent::Plugin.register_output('test', self) + + config_param :name, :string + + config_section :buffer do + config_set_default :chunk_keys, ['tag'] + end def initialize super @emit_streams = [] - @name = nil end - attr_reader :emit_streams, :name + attr_reader :emit_streams def emits all = [] @@ -54,23 +62,20 @@ def records all end - def configure(conf) - if name = conf['name'] - @name = name - end - end - - def start - super + def prefer_buffered_processing + false end - def shutdown - super + def process(tag, es) + @emit_streams << [tag, es.to_a] end - def emit(tag, es, chain) - chain.next - @emit_streams << [tag, es.to_a] + def write(chunk) + es = Fluent::ArrayEventStream.new + chunk.each do |time, record| + es.add(time, record) + end + @emit_streams << [tag, es] end end end diff --git a/test/scripts/fluent/plugin/out_test2.rb b/test/scripts/fluent/plugin/out_test2.rb new file mode 100644 index 0000000000..aefacc9a4d --- /dev/null +++ b/test/scripts/fluent/plugin/out_test2.rb @@ -0,0 +1,80 @@ +# +# Fluentd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +module Fluent::Plugin + class Test2Output < Output + Fluent::Plugin.register_output('test2', self) + + helpers :event_emitter + + config_param :name, :string + + config_section :buffer do + config_set_default :chunk_keys, ['tag'] + end + + def initialize + super + @emit_streams = [] + end + + attr_reader :emit_streams + + def emits + all = [] + @emit_streams.each {|tag,events| + events.each {|time,record| + all << [tag, time, record] + } + } + all + end + + def events + all = [] + @emit_streams.each {|tag,events| + all.concat events + } + all + end + + def records + all = [] + @emit_streams.each {|tag,events| + events.each {|time,record| + all << record + } + } + all + end + + def prefer_buffered_processing + false + end + + def process(tag, es) + @emit_streams << [tag, es.to_a] + end + + def write(chunk) + es = Fluent::ArrayEventStream.new + chunk.each do |time, record| + es.add(time, record) + end + @emit_streams << [tag, es] + end + end +end diff --git a/test/test_output.rb b/test/test_output.rb index e40bc2e39e..3bb9160bfd 100644 --- a/test/test_output.rb +++ b/test/test_output.rb @@ -16,6 +16,7 @@ class << self def startup $LOAD_PATH.unshift File.expand_path(File.join(File.dirname(__FILE__), 'scripts')) require 'fluent/plugin/out_test' + require 'fluent/plugin/out_test2' end def shutdown @@ -64,54 +65,6 @@ def test_configure # assert_equal Float, d.instance.retry_wait.class end - def test_calc_retry_wait - omit "too internal test" - - # default - d = create_driver - d.instance.retry_limit.times { - # "d.instance.instance_variable_get(:@num_errors) += 1" causes SyntaxError - d.instance.instance_eval { @num_errors += 1 } - } - wait = d.instance.retry_wait * (2 ** (d.instance.retry_limit - 1)) - assert( d.instance.calc_retry_wait > wait - wait / 8.0 ) - - # max_retry_wait - d = create_driver(CONFIG + %[max_retry_wait 4]) - d.instance.retry_limit.times { - d.instance.instance_eval { @num_errors += 1 } - } - assert_equal 4, d.instance.calc_retry_wait - end - - def test_calc_retry_wait_with_integer_retry_wait - omit "too internal test" - - d = create_driver(CONFIG + %[retry_wait 2s]) - d.instance.retry_limit.times { - d.instance.instance_eval { @num_errors += 1 } - } - assert_equal true, d.instance.calc_retry_wait.finite? - end - - def test_large_num_retries - omit "too internal test" - - # Test that everything works properly after a very large number of - # retries and we hit the expected max_retry_wait. - exp_max_retry_wait = 300 - d = create_driver(CONFIG + %[ - disable_retry_limit true - max_retry_wait #{exp_max_retry_wait} - ]) - d.instance.instance_eval { @num_errors += 1000 } - assert_equal exp_max_retry_wait, d.instance.calc_retry_wait - d.instance.instance_eval { @num_errors += 1000 } - assert_equal exp_max_retry_wait, d.instance.calc_retry_wait - d.instance.instance_eval { @num_errors += 1000 } - assert_equal exp_max_retry_wait, d.instance.calc_retry_wait - end - def create_mock_driver(conf=CONFIG) Fluent::Test::BufferedOutputTestDriver.new(Fluent::BufferedOutput) do attr_accessor :submit_flush_threads @@ -141,78 +94,15 @@ def write(chunk) end.configure(conf) end - def test_submit_flush_target - omit "too internal test" - - # default - d = create_mock_driver - d.instance.start_mock - assert_equal 0, d.instance.instance_variable_get('@writer_current_position') - d.instance.submit_flush - assert_equal 0, d.instance.instance_variable_get('@writer_current_position') - d.instance.submit_flush - assert_equal 0, d.instance.instance_variable_get('@writer_current_position') - d.instance.submit_flush - assert_equal 0, d.instance.instance_variable_get('@writer_current_position') - d.instance.submit_flush - assert_equal 0, d.instance.instance_variable_get('@writer_current_position') - d.instance.shutdown - assert_equal 1, d.instance.submit_flush_threads.size - - # num_threads 4 - d = create_mock_driver(CONFIG + %[num_threads 4]) - d.instance.start - assert_equal 0, d.instance.instance_variable_get('@writer_current_position') - d.instance.submit_flush - assert_equal 1, d.instance.instance_variable_get('@writer_current_position') - d.instance.submit_flush - assert_equal 2, d.instance.instance_variable_get('@writer_current_position') - d.instance.submit_flush - assert_equal 3, d.instance.instance_variable_get('@writer_current_position') - d.instance.submit_flush - assert_equal 0, d.instance.instance_variable_get('@writer_current_position') - d.instance.shutdown - assert (d.instance.submit_flush_threads.size > 1), "fails if only one thread works to submit flush" - end - def test_secondary d = create_driver(CONFIG + %[ - type test + type test2 name c0 ]) assert_not_nil d.instance.instance_variable_get(:@secondary).router end - - sub_test_case "test_force_flush" do - setup do - time = Time.parse("2011-01-02 13:14:15 UTC") - Timecop.freeze(time) - @time = time.to_i - end - - teardown do - Timecop.return - end - - test "force_flush works on retrying" do - omit "too internal test" - - d = create_driver(CONFIG) - d.instance.start - buffer = d.instance.instance_variable_get(:@buffer) - # imitate 10 failures - d.instance.instance_variable_set(:@num_errors, 10) - d.instance.instance_variable_set(:@next_retry_time, @time + d.instance.calc_retry_wait) - # buffer should be popped (flushed) immediately - flexmock(buffer).should_receive(:pop).once - # force_flush - buffer.emit("test", 'test', NullOutputChain.instance) - d.instance.force_flush - 10.times { sleep 0.05 } - end - end end class ObjectBufferedOutputTest < ::Test::Unit::TestCase @@ -262,33 +152,6 @@ def create_driver(conf=CONFIG) Fluent::Test::TimeSlicedOutputTestDriver.new(TimeSlicedOutputTestPlugin).configure(conf, true) end - sub_test_case "force_flush test" do - setup do - time = Time.parse("2011-01-02 13:14:15 UTC") - Timecop.freeze(time) - @es = OneEventStream.new(time.to_i, {"message" => "foo"}) - end - - teardown do - Timecop.return - end - - test "force_flush immediately flushes" do - omit "too internal test" - - d = create_driver(CONFIG + %[ - time_format %Y%m%d%H%M%S - ]) - d.instance.start - # buffer should be popped (flushed) immediately - flexmock(d.instance.instance_variable_get(:@buffer)).should_receive(:pop).once - # force_flush - d.instance.emit('test', @es, NullOutputChain.instance) - d.instance.force_flush - 10.times { sleep 0.05 } - end - end - sub_test_case "test emit" do setup do @time = Time.parse("2011-01-02 13:14:15 UTC") @@ -299,19 +162,6 @@ def create_driver(conf=CONFIG) Timecop.return end - test "emit with valid event" do - omit "there's no #emit method anymore in output plugins" - - d = create_driver - d.instance.start - if d.instance.method(:emit).arity == 3 - d.instance.emit('test', OneEventStream.new(@time.to_i, {"message" => "foo"}), NullOutputChain.instance) - else - d.instance.emit('test', OneEventStream.new(@time.to_i, {"message" => "foo"})) - end - assert_equal 0, d.instance.log.logs.size - end - test "emit with invalid event" do d = create_driver d.instance.start From 80da866b8a4f4c86f203294383fd4a38db3e455a Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Wed, 22 Jun 2016 20:53:45 +0900 Subject: [PATCH 4/6] port CopyOutput plugin to v0.14 API --- lib/fluent/plugin/out_copy.rb | 59 ++-------- test/plugin/test_out_copy.rb | 215 ++++++++++++++-------------------- 2 files changed, 96 insertions(+), 178 deletions(-) diff --git a/lib/fluent/plugin/out_copy.rb b/lib/fluent/plugin/out_copy.rb index c60eeb6966..7100042b9e 100644 --- a/lib/fluent/plugin/out_copy.rb +++ b/lib/fluent/plugin/out_copy.rb @@ -14,72 +14,29 @@ # limitations under the License. # -require 'fluent/output' +require 'fluent/plugin/multi_output' require 'fluent/config/error' require 'fluent/event' -module Fluent +module Fluent::Plugin class CopyOutput < MultiOutput - Plugin.register_output('copy', self) + Fluent::Plugin.register_output('copy', self) desc 'If true, pass different record to each `store` plugin.' config_param :deep_copy, :bool, default: false - def initialize - super - @outputs = [] - end - - attr_reader :outputs - - def configure(conf) - super - conf.elements.select {|e| - e.name == 'store' - }.each {|e| - type = e['@type'] - unless type - raise ConfigError, "Missing 'type' parameter on directive" - end - log.debug "adding store type=#{type.dump}" - - output = Plugin.new_output(type) - output.router = router - output.configure(e) - @outputs << output - } - end - - def start - super - - @outputs.each do |o| - o.start unless o.started? - end - end - - def shutdown - @outputs.each do |o| - o.shutdown unless o.shutdown? - end - - super - end - - def emit(tag, es, chain) + def process(tag, es) unless es.repeatable? - m = MultiEventStream.new + m = Fluent::MultiEventStream.new es.each {|time,record| m.add(time, record) } es = m end - if @deep_copy - chain = CopyOutputChain.new(@outputs, tag, es, chain) - else - chain = OutputChain.new(@outputs, tag, es, chain) + + outputs.each do |output| + output.emit_events(tag, @deep_copy ? es.dup : es) end - chain.next end end end diff --git a/test/plugin/test_out_copy.rb b/test/plugin/test_out_copy.rb index 2241ba07b0..e1dbb055af 100644 --- a/test/plugin/test_out_copy.rb +++ b/test/plugin/test_out_copy.rb @@ -1,12 +1,14 @@ require_relative '../helper' -require 'fluent/test' +require 'fluent/test/driver/multi_output' require 'fluent/plugin/out_copy' +require 'fluent/event' class CopyOutputTest < Test::Unit::TestCase class << self def startup $LOAD_PATH.unshift File.expand_path(File.join(File.dirname(__FILE__), '..', 'scripts')) require 'fluent/plugin/out_test' + require 'fluent/plugin/out_test2' end def shutdown @@ -20,21 +22,21 @@ def setup CONFIG = %[ - type test + @type test name c0 - type test + @type test2 name c1 - type test + @type test name c2 ] def create_driver(conf = CONFIG) - Fluent::Test::OutputTestDriver.new(Fluent::CopyOutput).configure(conf) + Fluent::Test::Driver::MultiOutput.new(Fluent::Plugin::CopyOutput).configure(conf) end def test_configure @@ -42,158 +44,117 @@ def test_configure outputs = d.instance.outputs assert_equal 3, outputs.size - assert_equal Fluent::TestOutput, outputs[0].class - assert_equal Fluent::TestOutput, outputs[1].class - assert_equal Fluent::TestOutput, outputs[2].class + assert_equal Fluent::Plugin::TestOutput, outputs[0].class + assert_equal Fluent::Plugin::Test2Output, outputs[1].class + assert_equal Fluent::Plugin::TestOutput, outputs[2].class assert_equal "c0", outputs[0].name assert_equal "c1", outputs[1].name assert_equal "c2", outputs[2].name end - def test_emit + def test_feed_events d = create_driver - time = Time.parse("2011-01-02 13:14:15 UTC").to_i - d.emit({"a"=>1}, time) - d.emit({"a"=>2}, time) + assert !d.instance.outputs[0].has_router? + assert_not_nil d.instance.outputs[1].router + assert !d.instance.outputs[2].has_router? - d.instance.outputs.each {|o| - assert_equal [ - [time, {"a"=>1}], - [time, {"a"=>2}], - ], o.events - } + time = event_time("2011-01-02 13:14:15 UTC") + d.run(default_tag: 'test') do + d.feed(time, {"a" => 1}) + d.feed(time, {"a" => 2}) + end d.instance.outputs.each {|o| - assert_not_nil o.router + assert_equal [ [time, {"a"=>1}], [time, {"a"=>2}] ], o.events } end - def test_msgpack_es_emit_bug - d = Fluent::Test::OutputTestDriver.new(Fluent::CopyOutput) - - outputs = %w(p1 p2).map do |pname| - p = Fluent::Plugin.new_output('test') - p.configure('name' => pname) - p.define_singleton_method(:emit) do |tag, es, chain| - es.each do |time, record| - super(tag, [[time, record]], chain) - end - end - p - end - - d.instance.instance_eval { @outputs = outputs } + def test_msgpack_unpacker_cache_bug_for_msgpack_event_stream + d = create_driver - es = if defined?(MessagePack::Packer) - time = Time.parse("2013-05-26 06:37:22 UTC").to_i - packer = Fluent::Engine.msgpack_factory.packer - packer.pack([time, {"a" => 1}]) - packer.pack([time, {"a" => 2}]) - Fluent::MessagePackEventStream.new(packer.to_s) - else - events = "#{[time, {"a" => 1}].to_msgpack}#{[time, {"a" => 2}].to_msgpack}" - Fluent::MessagePackEventStream.new(events) - end + time = event_time("2011-01-02 13:14:15 UTC") + source = Fluent::ArrayEventStream.new([ [time, {"a" => 1}], [time, {"a" => 2}] ]) + es = Fluent::MessagePackEventStream.new(source.to_msgpack_stream) - d.instance.emit('test', es, Fluent::NullOutputChain.instance) + d.run(default_tag: 'test') do + d.feed(es) + end d.instance.outputs.each { |o| - assert_equal [ - [time, {"a"=>1}], - [time, {"a"=>2}], - ], o.events + assert_equal [ [time, {"a"=>1}], [time, {"a"=>2}] ], o.events } end - def create_event_test_driver(is_deep_copy = false) - deep_copy_config = %[ -deep_copy #{is_deep_copy} - - type test - name c0 - - - type test - name c1 - - - type test - name c2 - -] - - output1 = Fluent::Plugin.new_output('test') - output1.configure('name' => 'output1') - output1.define_singleton_method(:emit) do |tag, es, chain| + def create_event_test_driver(does_deep_copy = false) + config = %[ + deep_copy #{does_deep_copy} + + @type test + name output1 + + + @type test + name output2 + + ] + + d = Fluent::Test::Driver::MultiOutput.new(Fluent::Plugin::CopyOutput).configure(config) + d.instance.outputs[0].define_singleton_method(:process) do |tag, es| es.each do |time, record| record['foo'] = 'bar' - super(tag, [[time, record]], chain) - end - end - - output2 = Fluent::Plugin.new_output('test') - output2.configure('name' => 'output2') - output2.define_singleton_method(:emit) do |tag, es, chain| - es.each do |time, record| - super(tag, [[time, record]], chain) end + super(tag, es) end - - outputs = [output1, output2] - - d = Fluent::Test::OutputTestDriver.new(Fluent::CopyOutput) - d = d.configure(deep_copy_config) - d.instance.instance_eval { @outputs = outputs } d end - def test_one_event - time = Time.parse("2013-05-26 06:37:22 UTC").to_i - - d = create_event_test_driver(false) - es = Fluent::OneEventStream.new(time, {"a" => 1}) - d.instance.emit('test', es, Fluent::NullOutputChain.instance) - - assert_equal [ - [[time, {"a"=>1, "foo"=>"bar"}]], - [[time, {"a"=>1, "foo"=>"bar"}]] - ], d.instance.outputs.map{ |o| o.events } + time = event_time("2013-05-26 06:37:22 UTC") + mes0 = Fluent::MultiEventStream.new + mes0.add(time, {"a" => 1}) + mes0.add(time, {"b" => 1}) + mes1 = Fluent::MultiEventStream.new + mes1.add(time, {"a" => 1}) + mes1.add(time, {"b" => 1}) + + data( + "OneEventStream without deep_copy" => [false, Fluent::OneEventStream.new(time, {"a" => 1})], + "OneEventStream with deep_copy" => [true, Fluent::OneEventStream.new(time, {"a" => 1})], + "ArrayEventStream without deep_copy" => [false, Fluent::ArrayEventStream.new([ [time, {"a" => 1}], [time, {"b" => 2}] ])], + "ArrayEventStream with deep_copy" => [true, Fluent::ArrayEventStream.new([ [time, {"a" => 1}], [time, {"b" => 2}] ])], + "MultiEventStream without deep_copy" => [false, mes0], + "MultiEventStream with deep_copy" => [true, mes1], + ) + def test_deep_copy_controls_shallow_or_deep_copied(data) + does_deep_copy, es = data + + d = create_event_test_driver(does_deep_copy) + + d.run(default_tag: 'test') do + d.feed(es) + end - d = create_event_test_driver(true) - es = Fluent::OneEventStream.new(time, {"a" => 1}) - d.instance.emit('test', es, Fluent::NullOutputChain.instance) + events = d.instance.outputs.map(&:events) - assert_equal [ - [[time, {"a"=>1, "foo"=>"bar"}]], - [[time, {"a"=>1}]] - ], d.instance.outputs.map{ |o| o.events } - end + if does_deep_copy + events[0].each_with_index do |entry0, i| + record0 = entry0.last + record1 = events[1][i].last - def test_multi_event - time = Time.parse("2013-05-26 06:37:22 UTC").to_i - - d = create_event_test_driver(false) - es = Fluent::MultiEventStream.new - es.add(time, {"a" => 1}) - es.add(time, {"b" => 2}) - d.instance.emit('test', es, Fluent::NullOutputChain.instance) - - assert_equal [ - [[time, {"a"=>1, "foo"=>"bar"}], [time, {"b"=>2, "foo"=>"bar"}]], - [[time, {"a"=>1, "foo"=>"bar"}], [time, {"b"=>2, "foo"=>"bar"}]] - ], d.instance.outputs.map{ |o| o.events } - - d = create_event_test_driver(true) - es = Fluent::MultiEventStream.new - es.add(time, {"a" => 1}) - es.add(time, {"b" => 2}) - d.instance.emit('test', es, Fluent::NullOutputChain.instance) - - assert_equal [ - [[time, {"a"=>1, "foo"=>"bar"}], [time, {"b"=>2, "foo"=>"bar"}]], - [[time, {"a"=>1}], [time, {"b"=>2}]] - ], d.instance.outputs.map{ |o| o.events } + assert{ record0.object_id != record1.object_id } + assert_equal "bar", record0["foo"] + assert !record1.has_key?("foo") + end + else + events[0].each_with_index do |entry0, i| + record0 = entry0.last + record1 = events[1][i].last + + assert{ record0.object_id == record1.object_id } + assert_equal "bar", record0["foo"] + assert_equal "bar", record1["foo"] + end + end end end From d1d7b8c9d9b018a1a5a824640bbe917a41861c03 Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Wed, 22 Jun 2016 20:54:04 +0900 Subject: [PATCH 5/6] port RoundRobinOutput plugin to v0.14 API --- lib/fluent/plugin/out_roundrobin.rb | 52 ++++++----------------- test/plugin/test_out_roundrobin.rb | 64 ++++++++++++++++------------- 2 files changed, 48 insertions(+), 68 deletions(-) diff --git a/lib/fluent/plugin/out_roundrobin.rb b/lib/fluent/plugin/out_roundrobin.rb index 3e327195f1..ceda1eaddc 100644 --- a/lib/fluent/plugin/out_roundrobin.rb +++ b/lib/fluent/plugin/out_roundrobin.rb @@ -14,68 +14,42 @@ # limitations under the License. # -require 'fluent/output' +require 'fluent/plugin/multi_output' require 'fluent/config/error' -module Fluent +module Fluent::Plugin class RoundRobinOutput < MultiOutput - Plugin.register_output('roundrobin', self) + Fluent::Plugin.register_output('roundrobin', self) + + config_section :store do + config_param :weight, :integer, default: 1 + end def initialize super - - @outputs = [] @weights = [] end - attr_reader :outputs, :weights + attr_reader :weights attr_accessor :rand_seed def configure(conf) super - conf.elements.select {|e| - e.name == 'store' - }.each {|e| - type = e['@type'] - unless type - raise ConfigError, "Missing 'type' parameter on directive" - end - - weight = e['weight'] - weight = weight ? weight.to_i : 1 - log.debug "adding store type=#{type.dump}, weight=#{weight}" - - output = Plugin.new_output(type) - output.router = router - output.configure(e) - @outputs << output - @weights << weight - } + @stores.each do |store| + @weights << store.weight + end @rr = -1 # starts from @output[0] @rand_seed = Random.new.seed end def start super - rebuild_weight_array - - @outputs.each do |o| - o.start unless o.started? - end - end - - def shutdown - @outputs.each do |o| - o.shutdown unless o.shutdown? - end - - super end - def emit(tag, es, chain) - next_output.emit(tag, es, chain) + def process(tag, es) + next_output.emit_events(tag, es) end private diff --git a/test/plugin/test_out_roundrobin.rb b/test/plugin/test_out_roundrobin.rb index 56722f0174..eaff66770b 100644 --- a/test/plugin/test_out_roundrobin.rb +++ b/test/plugin/test_out_roundrobin.rb @@ -1,5 +1,5 @@ require_relative '../helper' -require 'fluent/test' +require 'fluent/test/driver/multi_output' require 'fluent/plugin/out_roundrobin' class RoundRobinOutputTest < Test::Unit::TestCase @@ -7,6 +7,7 @@ class << self def startup $LOAD_PATH.unshift File.expand_path(File.join(File.dirname(__FILE__), '..', 'scripts')) require 'fluent/plugin/out_test' + require 'fluent/plugin/out_test2' end def shutdown @@ -20,37 +21,37 @@ def setup CONFIG = %[ - type test + @type test name c0 - type test + @type test2 name c1 - type test + @type test name c2 ] CONFIG_WITH_WEIGHT = %[ - type test + @type test name c0 weight 3 - type test + @type test2 name c1 weight 3 - type test + @type test name c2 ] def create_driver(conf = CONFIG) - Fluent::Test::OutputTestDriver.new(Fluent::RoundRobinOutput).configure(conf) + Fluent::Test::Driver::MultiOutput.new(Fluent::Plugin::RoundRobinOutput).configure(conf) end def test_configure @@ -58,9 +59,16 @@ def test_configure outputs = d.instance.outputs assert_equal 3, outputs.size - assert_equal Fluent::TestOutput, outputs[0].class - assert_equal Fluent::TestOutput, outputs[1].class - assert_equal Fluent::TestOutput, outputs[2].class + + assert_equal Fluent::Plugin::TestOutput, outputs[0].class + assert_equal Fluent::Plugin::Test2Output, outputs[1].class + assert_equal Fluent::Plugin::TestOutput, outputs[2].class + + assert !outputs[0].has_router? + assert outputs[1].has_router? + assert outputs[1].router + assert !outputs[2].has_router? + assert_equal "c0", outputs[0].name assert_equal "c1", outputs[1].name assert_equal "c2", outputs[2].name @@ -75,9 +83,11 @@ def test_configure outputs = d.instance.outputs assert_equal 3, outputs.size - assert_equal Fluent::TestOutput, outputs[0].class - assert_equal Fluent::TestOutput, outputs[1].class - assert_equal Fluent::TestOutput, outputs[2].class + + assert_equal Fluent::Plugin::TestOutput, outputs[0].class + assert_equal Fluent::Plugin::Test2Output, outputs[1].class + assert_equal Fluent::Plugin::TestOutput, outputs[2].class + assert_equal "c0", outputs[0].name assert_equal "c1", outputs[1].name assert_equal "c2", outputs[2].name @@ -89,15 +99,15 @@ def test_configure assert_equal 1, weights[2] end - def test_emit + def test_events_feeded_to_plugins_by_roundrobin d = create_driver - time = Time.parse("2011-01-02 13:14:15 UTC").to_i - d.run do - d.emit({"a"=>1}, time) - d.emit({"a"=>2}, time) - d.emit({"a"=>3}, time) - d.emit({"a"=>4}, time) + time = event_time("2011-01-02 13:14:15 UTC") + d.run(default_tag: 'test') do + d.feed(time, {"a" => 1}) + d.feed(time, {"a" => 2}) + d.feed(time, {"a" => 3}) + d.feed(time, {"a" => 4}) end os = d.instance.outputs @@ -114,19 +124,15 @@ def test_emit assert_equal [ [time, {"a"=>3}], ], os[2].events - - d.instance.outputs.each {|o| - assert_not_nil o.router - } end - def test_emit_weighted + def test_events_feeded_with_specified_weights d = create_driver(CONFIG_WITH_WEIGHT) - time = Time.parse("2011-01-02 13:14:15 UTC").to_i - d.run do + time = event_time("2011-01-02 13:14:15 UTC") + d.run(default_tag: 'test') do 14.times do |i| - d.emit({"a"=>i}, time) + d.feed(time, {"a" => i}) end end From 346dc737cba7c620cef18b4bd0ba7225ff3bc3a7 Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Mon, 27 Jun 2016 17:16:52 +0900 Subject: [PATCH 6/6] remove unused accessor --- lib/fluent/plugin/out_roundrobin.rb | 1 - 1 file changed, 1 deletion(-) diff --git a/lib/fluent/plugin/out_roundrobin.rb b/lib/fluent/plugin/out_roundrobin.rb index ceda1eaddc..d66af41c26 100644 --- a/lib/fluent/plugin/out_roundrobin.rb +++ b/lib/fluent/plugin/out_roundrobin.rb @@ -31,7 +31,6 @@ def initialize end attr_reader :weights - attr_accessor :rand_seed def configure(conf) super