Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support multi workers assign syntax on <worker> section. Fix #2289 #2292

Merged
merged 9 commits into from
Feb 25, 2019
19 changes: 13 additions & 6 deletions lib/fluent/config/element.rb
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@ def initialize(name, arg, attrs, elements, unused = nil)
# it's global logger, not plugin logger: deprecated message should be global warning, not plugin level.
@logger = defined?($log) ? $log : nil

@target_worker_id = nil
@target_worker_ids = []
end

attr_accessor :name, :arg, :unused, :v1_config, :corresponding_proxies, :unused_in
attr_writer :elements
attr_reader :target_worker_id
attr_reader :target_worker_ids

RESERVED_PARAMETERS_COMPAT = {
'@type' => 'type',
Expand Down Expand Up @@ -223,22 +223,29 @@ def self.unescape_parameter(v)
end

def set_target_worker_id(worker_id)
@target_worker_id = worker_id
@target_worker_ids = [worker_id]
@elements.each { |e|
e.set_target_worker_id(worker_id)
}
end

def set_target_worker_ids(worker_ids)
@target_worker_ids = worker_ids.uniq
@elements.each { |e|
e.set_target_worker_ids(worker_ids.uniq)
}
end

def for_every_workers?
@target_worker_id == nil
@target_worker_ids.empty?
end

def for_this_worker?
@target_worker_id == Fluent::Engine.worker_id
@target_worker_ids.include?(Fluent::Engine.worker_id)
end

def for_another_worker?
@target_worker_id != nil && @target_worker_id != Fluent::Engine.worker_id
!@target_worker_ids.empty? && !@target_worker_ids.include?(Fluent::Engine.worker_id)
end
end
end
Expand Down
3 changes: 2 additions & 1 deletion lib/fluent/plugin/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ def fluentd_worker_id

def configure(conf)
if conf.respond_to?(:for_this_worker?) && conf.for_this_worker?
system_config_override(workers: 1)
workers = conf.target_worker_ids.size || 1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When conf.target_worker_ids.size return nil or false?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added a checking code on 44b67c9.

system_config_override(workers: workers)
end
super
@_state ||= State.new(false, false, false, false, false, false, false, false, false)
Expand Down
63 changes: 52 additions & 11 deletions lib/fluent/root_agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -64,26 +64,67 @@ def initialize(log:, system_config: SystemConfig.new)
attr_reader :labels

def configure(conf)
used_worker_ids = []
available_worker_ids = []
0.step(Fluent::Engine.system_config.workers - 1, 1).each do |id|
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't we use available_worker_ids = (0..Fluent::Engine.system_config.workers - 1).to_a like code?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh.... We can use it. 44f206a

available_worker_ids << id
end
# initialize <worker> elements
conf.elements(name: 'worker').each do |e|
target_worker_id_str = e.arg
if target_worker_id_str.empty?
raise ConfigError, "Missing worker id on <worker> directive"
raise Fluent::ConfigError, "Missing worker id on <worker> directive"
end

target_worker_id = target_worker_id_str.to_i
if target_worker_id < 0 || target_worker_id > (Fluent::Engine.system_config.workers - 1)
raise ConfigError, "worker id #{target_worker_id} specified by <worker> directive is not allowed. Available worker id is between 0 and #{(Fluent::Engine.system_config.workers - 1)}"
end
target_worker_ids = target_worker_id_str.split("-")
if target_worker_ids.size == 2
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add duplication check to avoid following case?

<worker 0-3>
</worker>

<worker 3-7> # should be 4-7
</worker>

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added a collisions checker in d900a73.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And I added more concreate error message commit: 03da2b3.

first_worker_id = target_worker_ids.first.to_i
last_worker_id = target_worker_ids.last.to_i
if first_worker_id > last_worker_id
raise Fluent::ConfigError, "greater first_worker_id<#{first_worker_id}> than last_worker_id<#{last_worker_id}> specified by <worker> directive is not allowed. Available multi worker assign syntax is <smaller_worker_id>-<greater_worker_id>"
end
target_worker_ids = []
first_worker_id.step(last_worker_id, 1) do |worker_id|
target_worker_id = worker_id.to_i
target_worker_ids << target_worker_id

if target_worker_id < 0 || target_worker_id > (Fluent::Engine.system_config.workers - 1)
raise Fluent::ConfigError, "worker id #{target_worker_id} specified by <worker> directive is not allowed. Available worker id is between 0 and #{(Fluent::Engine.system_config.workers - 1)}"
end
available_worker_ids.delete(target_worker_id) if available_worker_ids.include?(target_worker_id)
if used_worker_ids.include?(target_worker_id) && !Fluent::Engine.dry_run_mode
raise Fluent::ConfigError, "specified worker_id<#{worker_id}> collisions is detected on <worker> directive. Available worker id(s): #{available_worker_ids}"
end
used_worker_ids << target_worker_id

e.elements.each do |elem|
unless ['source', 'match', 'filter', 'label'].include?(elem.name)
raise Fluent::ConfigError, "<worker> section cannot have <#{elem.name}> directive"
end
end

# On dry_run mode, all worker sections have to be configured on supervisor (recognized as worker_id = 0).
target_worker_ids = [0] if Fluent::Engine.dry_run_mode

unless target_worker_ids.empty?
e.set_target_worker_ids(target_worker_ids.uniq)
end
end
else
target_worker_id = target_worker_id_str.to_i
if target_worker_id < 0 || target_worker_id > (Fluent::Engine.system_config.workers - 1)
raise Fluent::ConfigError, "worker id #{target_worker_id} specified by <worker> directive is not allowed. Available worker id is between 0 and #{(Fluent::Engine.system_config.workers - 1)}"
end

## On dry_run mode, all worker sections have to be configured on supervisor (recognized as worker_id = 0).
target_worker_id = 0 if Fluent::Engine.dry_run_mode
## On dry_run mode, all worker sections have to be configured on supervisor (recognized as worker_id = 0).
target_worker_id = 0 if Fluent::Engine.dry_run_mode

e.elements.each do |elem|
unless ['source', 'match', 'filter', 'label'].include?(elem.name)
raise ConfigError, "<worker> section cannot have <#{elem.name}> directive"
e.elements.each do |elem|
unless ['source', 'match', 'filter', 'label'].include?(elem.name)
raise Fluent::ConfigError, "<worker> section cannot have <#{elem.name}> directive"
end
elem.set_target_worker_id(target_worker_id)
end
elem.set_target_worker_id(target_worker_id)
end
conf += e
end
Expand Down
34 changes: 29 additions & 5 deletions test/config/test_element.rb
Original file line number Diff line number Diff line change
Expand Up @@ -406,11 +406,11 @@ def element(name = 'ROOT', arg = '', attrs = {}, elements = [], unused = nil)
test 'set target_worker_id recursively' do
e = element('label', '@mytest', {}, [ element('filter', '**'), element('match', '**', {}, [ element('store'), element('store') ]) ])
e.set_target_worker_id(1)
assert_equal 1, e.target_worker_id
assert_equal 1, e.elements[0].target_worker_id
assert_equal 1, e.elements[1].target_worker_id
assert_equal 1, e.elements[1].elements[0].target_worker_id
assert_equal 1, e.elements[1].elements[1].target_worker_id
assert_equal [1], e.target_worker_ids
assert_equal [1], e.elements[0].target_worker_ids
assert_equal [1], e.elements[1].target_worker_ids
assert_equal [1], e.elements[1].elements[0].target_worker_ids
assert_equal [1], e.elements[1].elements[1].target_worker_ids
end
end

Expand All @@ -434,12 +434,24 @@ def element(name = 'ROOT', arg = '', attrs = {}, elements = [], unused = nil)
assert e.for_this_worker?
end

test 'target_worker_ids includes current worker_id' do
e = element()
e.set_target_worker_ids([0])
assert e.for_this_worker?
end

test 'target_worker_id != current worker_id' do
e = element()
e.set_target_worker_id(1)
assert_false e.for_this_worker?
end

test 'target_worker_ids does not includes current worker_id' do
e = element()
e.set_target_worker_ids([1, 2])
assert_false e.for_this_worker?
end

test "doesn't have target_worker_id" do
e = element()
assert_false e.for_this_worker?
Expand All @@ -453,12 +465,24 @@ def element(name = 'ROOT', arg = '', attrs = {}, elements = [], unused = nil)
assert_false e.for_another_worker?
end

test 'target_worker_ids contains current worker_id' do
e = element()
e.set_target_worker_ids([0, 1])
assert_false e.for_another_worker?
end

test 'target_worker_id != current worker_id' do
e = element()
e.set_target_worker_id(1)
assert e.for_another_worker?
end

test 'target_worker_ids does not contains current worker_id' do
e = element()
e.set_target_worker_ids([1, 2])
assert e.for_another_worker?
end

test "doesn't have target_worker_id" do
e = element()
assert_false e.for_another_worker?
Expand Down
76 changes: 76 additions & 0 deletions test/test_root_agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -686,6 +686,54 @@ def configure_ra(conf_str)
end
end

test 'raises configuration error for too big worker id on multi workers syntax' do
errmsg = "worker id 4 specified by <worker> directive is not allowed. Available worker id is between 0 and 3"
assert_raise Fluent::ConfigError.new(errmsg) do
conf = <<-EOC
<worker 1-4>
</worker>
EOC
configure_ra(conf)
end
end

test 'raises configuration error for worker id collisions on multi workers syntax' do
errmsg = "specified worker_id<2> collisions is detected on <worker> directive. Available worker id(s): [3]"
assert_raise Fluent::ConfigError.new(errmsg) do
conf = <<-EOC
<worker 0-2>
</worker>
<worker 2-4>
</worker>
EOC
configure_ra(conf)
end
end

test 'raises configuration error for worker id collisions on multi workers syntax when multi avaliable worker_ids are left' do
errmsg = "specified worker_id<1> collisions is detected on <worker> directive. Available worker id(s): [2, 3]"
assert_raise Fluent::ConfigError.new(errmsg) do
conf = <<-EOC
<worker 0-1>
</worker>
<worker 1-3>
</worker>
EOC
configure_ra(conf)
end
end

test 'raises configuration error for too big worker id on invalid reversed multi workers syntax' do
errmsg = "greater first_worker_id<3> than last_worker_id<0> specified by <worker> directive is not allowed. Available multi worker assign syntax is <smaller_worker_id>-<greater_worker_id>"
assert_raise Fluent::ConfigError.new(errmsg) do
conf = <<-EOC
<worker 3-0>
</worker>
EOC
configure_ra(conf)
end
end

test 'raises configuration error for invalid elements as a child of worker section' do
errmsg = '<worker> section cannot have <system> directive'
assert_raise Fluent::ConfigError.new(errmsg) do
Expand Down Expand Up @@ -844,5 +892,33 @@ def configure_ra(conf_str)
assert_equal 0, ra.labels.size
refute ra.error_collector
end

test 'with plugins for workers syntax should match worker_id equals to 2' do
conf = <<-EOC
<worker 0-2>
<source>
@type forward
</source>
<filter **>
@type test_filter
@id test_filter
</filter>
<match pattern>
@type stdout
</match>
<label @ERROR>
<match>
@type null
</match>
</label>
</worker>
EOC

ra = configure_ra(conf)
assert_kind_of Fluent::Plugin::ForwardInput, ra.inputs.first
assert_kind_of Fluent::Plugin::StdoutOutput, ra.outputs.first
assert_kind_of FluentTestFilter, ra.filters.first
assert ra.error_collector
end
end
end