Skip to content

Commit

Permalink
Merge pull request #1502 from fluent/fix-deadlock-for-input-start
Browse files Browse the repository at this point in the history
Fix Input and Output deadlock when buffer is full during startup
  • Loading branch information
repeatedly authored Mar 15, 2017
2 parents 87f9588 + ecf0a6a commit d98dd81
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 16 deletions.
8 changes: 6 additions & 2 deletions lib/fluent/root_agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,12 @@ def lifecycle(desc: false, kind_callback: nil)
def start
lifecycle(desc: true) do |i| # instance
i.start unless i.started?
end
lifecycle(desc: true) do |i|
# Input#start sometimes emits lots of evetns with in_tail/`read_from_head true` case
# and it causes deadlock for small buffer/queue output. To avoid such problem,
# buffer related output threads should be run before `Input#start`.
# This is why after_start should be called immediately after start call.
# This depends on `desc: true` because calling plugin order of `desc: true` is
# Output, Filter, Label, Output with Router, then Input.
i.after_start unless i.after_started?
end
end
Expand Down
35 changes: 35 additions & 0 deletions test/test_plugin_classes.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,28 @@ def shutdown
end
end

class FluentTestGenInput < ::Fluent::Plugin::Input
::Fluent::Plugin.register_input('test_in_gen', self)

attr_reader :started

config_param :num, :integer, default: 10000

def start
super
@started = true

@num.times { |i|
router.emit("test.evet", Fluent::EventTime.now, {'message' => 'Hello!', 'key' => "value#{i}", 'num' => i})
}
end

def shutdown
@started = false
super
end
end

class FluentTestOutput < ::Fluent::Plugin::Output
::Fluent::Plugin.register_output('test_out', self)

Expand Down Expand Up @@ -112,6 +134,19 @@ def process(tag, es)

class FluentTestBufferedOutput < ::Fluent::Plugin::Output
::Fluent::Plugin.register_output('test_out_buffered', self)

attr_reader :started

def start
super
@started = true
end

def shutdown
@started = false
super
end

def write(chunk)
# drop everything
end
Expand Down
55 changes: 41 additions & 14 deletions test/test_root_agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -174,10 +174,15 @@ def configure_ra(conf_str)
end

sub_test_case 'start/shutdown' do
setup do
@ra = RootAgent.new(log: $log)
stub(Engine).root_agent { @ra }
@ra.configure(Config.parse(<<-EOC, "(test)", "(test_dir)", true))
def setup_root_agent(conf)
ra = RootAgent.new(log: $log)
stub(Engine).root_agent { ra }
ra.configure(Config.parse(conf, "(test)", "(test_dir)", true))
ra
end

test 'plugin status' do
ra = setup_root_agent(<<-EOC)
<source>
@type test_in
@id test_in
Expand All @@ -191,19 +196,41 @@ def configure_ra(conf_str)
@id test_out
</match>
EOC
@ra
ra.start
assert_true ra.inputs.first.started
assert_true ra.filters.first.started
assert_true ra.outputs.first.started

ra.shutdown
assert_false ra.inputs.first.started
assert_false ra.filters.first.started
assert_false ra.outputs.first.started
end

test 'plugin status' do
@ra.start
assert_true @ra.inputs.first.started
assert_true @ra.filters.first.started
assert_true @ra.outputs.first.started
test 'output plugin threads should run before input plugin is blocked with buffer full' do
ra = setup_root_agent(<<-EOC)
<source>
@type test_in_gen
@id test_in_gen
</source>
<match **>
@type test_out_buffered
@id test_out_buffered
<buffer>
chunk_limit_size 1k
queue_limit_length 2
flush_thread_count 2
overflow_action block
</buffer>
</match>
EOC
waiting(5) { ra.start }
assert_true ra.inputs.first.started
assert_true ra.outputs.first.started

@ra.shutdown
assert_false @ra.inputs.first.started
assert_false @ra.filters.first.started
assert_false @ra.outputs.first.started
ra.shutdown
assert_false ra.inputs.first.started
assert_false ra.outputs.first.started
end
end

Expand Down

0 comments on commit d98dd81

Please sign in to comment.