diff --git a/lib/fluent/root_agent.rb b/lib/fluent/root_agent.rb index e98ccadd3a..c83f1df976 100644 --- a/lib/fluent/root_agent.rb +++ b/lib/fluent/root_agent.rb @@ -136,8 +136,12 @@ def lifecycle(desc: false, kind_callback: nil) def start lifecycle(desc: true) do |i| # instance i.start unless i.started? - end - lifecycle(desc: true) do |i| + # Input#start sometimes emits lots of evetns with in_tail/`read_from_head true` case + # and it causes deadlock for small buffer/queue output. To avoid such problem, + # buffer related output threads should be run before `Input#start`. + # This is why after_start should be called immediately after start call. + # This depends on `desc: true` because calling plugin order of `desc: true` is + # Output, Filter, Label, Output with Router, then Input. i.after_start unless i.after_started? end end diff --git a/test/test_plugin_classes.rb b/test/test_plugin_classes.rb index 09ccf776a1..b2e484c306 100644 --- a/test/test_plugin_classes.rb +++ b/test/test_plugin_classes.rb @@ -21,6 +21,28 @@ def shutdown end end + class FluentTestGenInput < ::Fluent::Plugin::Input + ::Fluent::Plugin.register_input('test_in_gen', self) + + attr_reader :started + + config_param :num, :integer, default: 10000 + + def start + super + @started = true + + @num.times { |i| + router.emit("test.evet", Fluent::EventTime.now, {'message' => 'Hello!', 'key' => "value#{i}", 'num' => i}) + } + end + + def shutdown + @started = false + super + end + end + class FluentTestOutput < ::Fluent::Plugin::Output ::Fluent::Plugin.register_output('test_out', self) @@ -112,6 +134,19 @@ def process(tag, es) class FluentTestBufferedOutput < ::Fluent::Plugin::Output ::Fluent::Plugin.register_output('test_out_buffered', self) + + attr_reader :started + + def start + super + @started = true + end + + def shutdown + @started = false + super + end + def write(chunk) # drop everything end diff --git a/test/test_root_agent.rb b/test/test_root_agent.rb index e979fc0bfc..e467bd81b4 100644 --- a/test/test_root_agent.rb +++ b/test/test_root_agent.rb @@ -174,10 +174,15 @@ def configure_ra(conf_str) end sub_test_case 'start/shutdown' do - setup do - @ra = RootAgent.new(log: $log) - stub(Engine).root_agent { @ra } - @ra.configure(Config.parse(<<-EOC, "(test)", "(test_dir)", true)) + def setup_root_agent(conf) + ra = RootAgent.new(log: $log) + stub(Engine).root_agent { ra } + ra.configure(Config.parse(conf, "(test)", "(test_dir)", true)) + ra + end + + test 'plugin status' do + ra = setup_root_agent(<<-EOC) @type test_in @id test_in @@ -191,19 +196,41 @@ def configure_ra(conf_str) @id test_out EOC - @ra + ra.start + assert_true ra.inputs.first.started + assert_true ra.filters.first.started + assert_true ra.outputs.first.started + + ra.shutdown + assert_false ra.inputs.first.started + assert_false ra.filters.first.started + assert_false ra.outputs.first.started end - test 'plugin status' do - @ra.start - assert_true @ra.inputs.first.started - assert_true @ra.filters.first.started - assert_true @ra.outputs.first.started + test 'output plugin threads should run before input plugin is blocked with buffer full' do + ra = setup_root_agent(<<-EOC) + + @type test_in_gen + @id test_in_gen + + + @type test_out_buffered + @id test_out_buffered + + chunk_limit_size 1k + queue_limit_length 2 + flush_thread_count 2 + overflow_action block + + +EOC + waiting(5) { ra.start } + assert_true ra.inputs.first.started + assert_true ra.outputs.first.started - @ra.shutdown - assert_false @ra.inputs.first.started - assert_false @ra.filters.first.started - assert_false @ra.outputs.first.started + ra.shutdown + assert_false ra.inputs.first.started + assert_false ra.outputs.first.started end end