Skip to content

Commit

Permalink
Fix value of system_config.workers at run_configure
Browse files Browse the repository at this point in the history
Override `system_config` only if `conf.target_worker_ids.size >= 1`.

* If `<worker N>` or `<worker 0-N>` is not set, `conf.target_worker_ids` is empty.
* If `<system> workers N </system>` is not set, `system_config.workers` defaults to 1.

Signed-off-by: abetomo <[email protected]>
  • Loading branch information
abetomo committed Feb 25, 2023
1 parent 6c649d1 commit 6438674
Show file tree
Hide file tree
Showing 2 changed files with 149 additions and 7 deletions.
12 changes: 5 additions & 7 deletions lib/fluent/plugin/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,12 @@ def fluentd_worker_id
@_fluentd_worker_id
end

# @param conf [Fluent::Config::Element, Hash] configuration
def configure(conf)
if Fluent::Engine.supervisor_mode || (conf.respond_to?(:for_this_worker?) && conf.for_this_worker?)
workers = if conf.target_worker_ids && !conf.target_worker_ids.empty?
conf.target_worker_ids.size
else
1
end
system_config_override(workers: workers)
if conf.is_a?(Fluent::Config::Element)
if (Fluent::Engine.supervisor_mode && !conf.target_worker_ids.empty?) || conf.for_this_worker?
system_config_override(workers: conf.target_worker_ids.size)
end
end
super(conf, system_config.strict_config_value)
@_state ||= State.new(false, false, false, false, false, false, false, false, false)
Expand Down
144 changes: 144 additions & 0 deletions test/plugin/test_base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -146,4 +146,148 @@ class FluentPluginBaseTest::DummyPlugin2 < Fluent::Plugin::TestBase
end
end
end

sub_test_case 'system_config.workers value after configure' do
sub_test_case 'with <system> workers 3 </system>' do
setup do
system_config = Fluent::SystemConfig.new
system_config.workers = 3
stub(Fluent::Engine).system_config { system_config }
end

sub_test_case 'supervisor_mode is true' do
setup do
stub(Fluent::Engine).supervisor_mode { true }
stub(Fluent::Engine).worker_id { -1 }
end

test 'without <worker> directive' do
conf = config_element()
conf.set_target_worker_ids([])
@p.configure(conf)
assert{ @p.system_config.workers == 3 }
end

test 'with <worker 0>' do
conf = config_element()
conf.set_target_worker_ids([0])
@p.configure(conf)
assert{ @p.system_config.workers == 1 }
end

test 'with <worker 0-1>' do
conf = config_element()
conf.set_target_worker_ids([0, 1])
@p.configure(conf)
assert{ @p.system_config.workers == 2 }
end

test 'with <worker 0-2>' do
conf = config_element()
conf.set_target_worker_ids([0, 1, 2])
@p.configure(conf)
assert{ @p.system_config.workers == 3 }
end

test '`conf` is `Hash`' do
@p.configure({})
assert{ @p.system_config.workers == 3 }
end
end

sub_test_case 'supervisor_mode is false' do
setup do
stub(Fluent::Engine).supervisor_mode { false }
stub(Fluent::Engine).worker_id { 0 }
end

test 'without <worker> directive' do
conf = config_element()
conf.set_target_worker_ids([])
@p.configure(conf)
assert{ @p.system_config.workers == 3 }
end

test 'with <worker 0>' do
conf = config_element()
conf.set_target_worker_ids([0])
@p.configure(conf)
assert{ @p.system_config.workers == 1 }
end

test 'with <worker 0-1>' do
conf = config_element()
conf.set_target_worker_ids([0, 1])
@p.configure(conf)
assert{ @p.system_config.workers == 2 }
end

test 'with <worker 0-2>' do
conf = config_element()
conf.set_target_worker_ids([0, 1, 2])
@p.configure(conf)
assert{ @p.system_config.workers == 3 }
end

test '`conf` is `Hash`' do
@p.configure({})
assert{ @p.system_config.workers == 3 }
end
end
end

sub_test_case 'without <system> directive' do
sub_test_case 'supervisor_mode is true' do
setup do
stub(Fluent::Engine).supervisor_mode { true }
stub(Fluent::Engine).worker_id { -1 }
end

test 'without <worker> directive' do
conf = config_element()
conf.set_target_worker_ids([])
@p.configure(conf)
assert{ @p.system_config.workers == 1 }
end

test 'with <worker 0>' do
conf = config_element()
conf.set_target_worker_ids([0])
@p.configure(conf)
assert{ @p.system_config.workers == 1 }
end

test '`conf` is `Hash`' do
@p.configure({})
assert{ @p.system_config.workers == 1 }
end
end

sub_test_case 'supervisor_mode is false' do
setup do
stub(Fluent::Engine).supervisor_mode { false }
stub(Fluent::Engine).worker_id { 0 }
end

test 'without <worker> directive' do
conf = config_element()
conf.set_target_worker_ids([])
@p.configure(conf)
assert{ @p.system_config.workers == 1 }
end

test 'with <worker 0>' do
conf = config_element()
conf.set_target_worker_ids([0])
@p.configure(conf)
assert{ @p.system_config.workers == 1 }
end

test '`conf` is `Hash`' do
@p.configure({})
assert{ @p.system_config.workers == 1 }
end
end
end
end
end

0 comments on commit 6438674

Please sign in to comment.