From a4497b0d5a6314ff80c810e3bdac13cb68942738 Mon Sep 17 00:00:00 2001 From: Masahiro Nakagawa Date: Wed, 15 Mar 2017 11:24:35 +0900 Subject: [PATCH 1/3] Fix Input and Output deadlock when buffer is full during startup Output threads need calling `after_start` to run but `after_start` is sometimes not called when Input#start is blocked. This happens when in_tail with `read_from_head true` and output with `overflow_action block`. This fixes the problem by checking `after_start` immediately after start. --- lib/fluent/root_agent.rb | 5 ++-- test/test_plugin_classes.rb | 35 +++++++++++++++++++++++ test/test_root_agent.rb | 55 +++++++++++++++++++++++++++---------- 3 files changed, 79 insertions(+), 16 deletions(-) diff --git a/lib/fluent/root_agent.rb b/lib/fluent/root_agent.rb index e98ccadd3a..3d839816ba 100644 --- a/lib/fluent/root_agent.rb +++ b/lib/fluent/root_agent.rb @@ -136,8 +136,9 @@ def lifecycle(desc: false, kind_callback: nil) def start lifecycle(desc: true) do |i| # instance i.start unless i.started? - end - lifecycle(desc: true) do |i| + # Input#start emits lots of evetns with in_tail/`read_from_head true` case and + # it causes deadlock for small buffer/queue output. To avoid such problem, + # buffer related output threads should be run before `Input#start`. i.after_start unless i.after_started? end end diff --git a/test/test_plugin_classes.rb b/test/test_plugin_classes.rb index 09ccf776a1..b2e484c306 100644 --- a/test/test_plugin_classes.rb +++ b/test/test_plugin_classes.rb @@ -21,6 +21,28 @@ def shutdown end end + class FluentTestGenInput < ::Fluent::Plugin::Input + ::Fluent::Plugin.register_input('test_in_gen', self) + + attr_reader :started + + config_param :num, :integer, default: 10000 + + def start + super + @started = true + + @num.times { |i| + router.emit("test.evet", Fluent::EventTime.now, {'message' => 'Hello!', 'key' => "value#{i}", 'num' => i}) + } + end + + def shutdown + @started = false + super + end + end + class FluentTestOutput < ::Fluent::Plugin::Output ::Fluent::Plugin.register_output('test_out', self) @@ -112,6 +134,19 @@ def process(tag, es) class FluentTestBufferedOutput < ::Fluent::Plugin::Output ::Fluent::Plugin.register_output('test_out_buffered', self) + + attr_reader :started + + def start + super + @started = true + end + + def shutdown + @started = false + super + end + def write(chunk) # drop everything end diff --git a/test/test_root_agent.rb b/test/test_root_agent.rb index e979fc0bfc..9dbcd9121f 100644 --- a/test/test_root_agent.rb +++ b/test/test_root_agent.rb @@ -174,10 +174,15 @@ def configure_ra(conf_str) end sub_test_case 'start/shutdown' do - setup do - @ra = RootAgent.new(log: $log) - stub(Engine).root_agent { @ra } - @ra.configure(Config.parse(<<-EOC, "(test)", "(test_dir)", true)) + def setup_root_agent(conf) + ra = RootAgent.new(log: $log) + stub(Engine).root_agent { ra } + ra.configure(Config.parse(conf, "(test)", "(test_dir)", true)) + ra + end + + test 'plugin status' do + ra = setup_root_agent(<<-EOC) @type test_in @id test_in @@ -191,19 +196,41 @@ def configure_ra(conf_str) @id test_out EOC - @ra + ra.start + assert_true ra.inputs.first.started + assert_true ra.filters.first.started + assert_true ra.outputs.first.started + + ra.shutdown + assert_false ra.inputs.first.started + assert_false ra.filters.first.started + assert_false ra.outputs.first.started end - test 'plugin status' do - @ra.start - assert_true @ra.inputs.first.started - assert_true @ra.filters.first.started - assert_true @ra.outputs.first.started + test 'output plugin threads should run before input plugin is blocked with buffer full' do + ra = setup_root_agent(<<-EOC) + + @type test_in_gen + @id test_in_gen + + + @type test_out_buffered + @id test_out_buffered + + chunk_limit_size 1k + queue_limit_length 2 + flush_thread_count 2 + overflow_action block + + +EOC + ra.start + assert_true ra.inputs.first.started + assert_true ra.outputs.first.started - @ra.shutdown - assert_false @ra.inputs.first.started - assert_false @ra.filters.first.started - assert_false @ra.outputs.first.started + ra.shutdown + assert_false ra.inputs.first.started + assert_false ra.outputs.first.started end end From 7676fe1d3ec554f80972ecff05804b77bbe3051c Mon Sep 17 00:00:00 2001 From: Masahiro Nakagawa Date: Wed, 15 Mar 2017 11:51:12 +0900 Subject: [PATCH 2/3] Wrap start call with waiting to avoid endless sleep --- test/test_root_agent.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/test_root_agent.rb b/test/test_root_agent.rb index 9dbcd9121f..e467bd81b4 100644 --- a/test/test_root_agent.rb +++ b/test/test_root_agent.rb @@ -224,7 +224,7 @@ def setup_root_agent(conf) EOC - ra.start + waiting(5) { ra.start } assert_true ra.inputs.first.started assert_true ra.outputs.first.started From ecf0a6a40602665d05042514045108100f4a49fd Mon Sep 17 00:00:00 2001 From: Masahiro Nakagawa Date: Wed, 15 Mar 2017 18:19:34 +0900 Subject: [PATCH 3/3] Update comment for start sequence problem --- lib/fluent/root_agent.rb | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/lib/fluent/root_agent.rb b/lib/fluent/root_agent.rb index 3d839816ba..c83f1df976 100644 --- a/lib/fluent/root_agent.rb +++ b/lib/fluent/root_agent.rb @@ -136,9 +136,12 @@ def lifecycle(desc: false, kind_callback: nil) def start lifecycle(desc: true) do |i| # instance i.start unless i.started? - # Input#start emits lots of evetns with in_tail/`read_from_head true` case and - # it causes deadlock for small buffer/queue output. To avoid such problem, + # Input#start sometimes emits lots of evetns with in_tail/`read_from_head true` case + # and it causes deadlock for small buffer/queue output. To avoid such problem, # buffer related output threads should be run before `Input#start`. + # This is why after_start should be called immediately after start call. + # This depends on `desc: true` because calling plugin order of `desc: true` is + # Output, Filter, Label, Output with Router, then Input. i.after_start unless i.after_started? end end