Skip to content

Commit

Permalink
Merge pull request #2292 from fluent/support-multi-workers-assign-syntax
Browse files Browse the repository at this point in the history
Support multi workers assign syntax on `<worker>` section. Fix #2289
  • Loading branch information
repeatedly authored Feb 25, 2019
2 parents 0db9e43 + 44b67c9 commit 07841c0
Show file tree
Hide file tree
Showing 5 changed files with 173 additions and 23 deletions.
19 changes: 13 additions & 6 deletions lib/fluent/config/element.rb
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@ def initialize(name, arg, attrs, elements, unused = nil)
# it's global logger, not plugin logger: deprecated message should be global warning, not plugin level.
@logger = defined?($log) ? $log : nil

@target_worker_id = nil
@target_worker_ids = []
end

attr_accessor :name, :arg, :unused, :v1_config, :corresponding_proxies, :unused_in
attr_writer :elements
attr_reader :target_worker_id
attr_reader :target_worker_ids

RESERVED_PARAMETERS_COMPAT = {
'@type' => 'type',
Expand Down Expand Up @@ -223,22 +223,29 @@ def self.unescape_parameter(v)
end

def set_target_worker_id(worker_id)
@target_worker_id = worker_id
@target_worker_ids = [worker_id]
@elements.each { |e|
e.set_target_worker_id(worker_id)
}
end

def set_target_worker_ids(worker_ids)
@target_worker_ids = worker_ids.uniq
@elements.each { |e|
e.set_target_worker_ids(worker_ids.uniq)
}
end

def for_every_workers?
@target_worker_id == nil
@target_worker_ids.empty?
end

def for_this_worker?
@target_worker_id == Fluent::Engine.worker_id
@target_worker_ids.include?(Fluent::Engine.worker_id)
end

def for_another_worker?
@target_worker_id != nil && @target_worker_id != Fluent::Engine.worker_id
!@target_worker_ids.empty? && !@target_worker_ids.include?(Fluent::Engine.worker_id)
end
end
end
Expand Down
7 changes: 6 additions & 1 deletion lib/fluent/plugin/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,12 @@ def fluentd_worker_id

def configure(conf)
if conf.respond_to?(:for_this_worker?) && conf.for_this_worker?
system_config_override(workers: 1)
workers = if conf.target_worker_ids && !conf.target_worker_ids.empty?
conf.target_worker_ids.size
else
1
end
system_config_override(workers: workers)
end
super
@_state ||= State.new(false, false, false, false, false, false, false, false, false)
Expand Down
60 changes: 49 additions & 11 deletions lib/fluent/root_agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -64,26 +64,64 @@ def initialize(log:, system_config: SystemConfig.new)
attr_reader :labels

def configure(conf)
used_worker_ids = []
available_worker_ids = (0..Fluent::Engine.system_config.workers - 1).to_a
# initialize <worker> elements
conf.elements(name: 'worker').each do |e|
target_worker_id_str = e.arg
if target_worker_id_str.empty?
raise ConfigError, "Missing worker id on <worker> directive"
raise Fluent::ConfigError, "Missing worker id on <worker> directive"
end

target_worker_id = target_worker_id_str.to_i
if target_worker_id < 0 || target_worker_id > (Fluent::Engine.system_config.workers - 1)
raise ConfigError, "worker id #{target_worker_id} specified by <worker> directive is not allowed. Available worker id is between 0 and #{(Fluent::Engine.system_config.workers - 1)}"
end
target_worker_ids = target_worker_id_str.split("-")
if target_worker_ids.size == 2
first_worker_id = target_worker_ids.first.to_i
last_worker_id = target_worker_ids.last.to_i
if first_worker_id > last_worker_id
raise Fluent::ConfigError, "greater first_worker_id<#{first_worker_id}> than last_worker_id<#{last_worker_id}> specified by <worker> directive is not allowed. Available multi worker assign syntax is <smaller_worker_id>-<greater_worker_id>"
end
target_worker_ids = []
first_worker_id.step(last_worker_id, 1) do |worker_id|
target_worker_id = worker_id.to_i
target_worker_ids << target_worker_id

if target_worker_id < 0 || target_worker_id > (Fluent::Engine.system_config.workers - 1)
raise Fluent::ConfigError, "worker id #{target_worker_id} specified by <worker> directive is not allowed. Available worker id is between 0 and #{(Fluent::Engine.system_config.workers - 1)}"
end
available_worker_ids.delete(target_worker_id) if available_worker_ids.include?(target_worker_id)
if used_worker_ids.include?(target_worker_id) && !Fluent::Engine.dry_run_mode
raise Fluent::ConfigError, "specified worker_id<#{worker_id}> collisions is detected on <worker> directive. Available worker id(s): #{available_worker_ids}"
end
used_worker_ids << target_worker_id

e.elements.each do |elem|
unless ['source', 'match', 'filter', 'label'].include?(elem.name)
raise Fluent::ConfigError, "<worker> section cannot have <#{elem.name}> directive"
end
end

## On dry_run mode, all worker sections have to be configured on supervisor (recognized as worker_id = 0).
target_worker_id = 0 if Fluent::Engine.dry_run_mode
# On dry_run mode, all worker sections have to be configured on supervisor (recognized as worker_id = 0).
target_worker_ids = [0] if Fluent::Engine.dry_run_mode

e.elements.each do |elem|
unless ['source', 'match', 'filter', 'label'].include?(elem.name)
raise ConfigError, "<worker> section cannot have <#{elem.name}> directive"
unless target_worker_ids.empty?
e.set_target_worker_ids(target_worker_ids.uniq)
end
end
else
target_worker_id = target_worker_id_str.to_i
if target_worker_id < 0 || target_worker_id > (Fluent::Engine.system_config.workers - 1)
raise Fluent::ConfigError, "worker id #{target_worker_id} specified by <worker> directive is not allowed. Available worker id is between 0 and #{(Fluent::Engine.system_config.workers - 1)}"
end

## On dry_run mode, all worker sections have to be configured on supervisor (recognized as worker_id = 0).
target_worker_id = 0 if Fluent::Engine.dry_run_mode

e.elements.each do |elem|
unless ['source', 'match', 'filter', 'label'].include?(elem.name)
raise Fluent::ConfigError, "<worker> section cannot have <#{elem.name}> directive"
end
elem.set_target_worker_id(target_worker_id)
end
elem.set_target_worker_id(target_worker_id)
end
conf += e
end
Expand Down
34 changes: 29 additions & 5 deletions test/config/test_element.rb
Original file line number Diff line number Diff line change
Expand Up @@ -406,11 +406,11 @@ def element(name = 'ROOT', arg = '', attrs = {}, elements = [], unused = nil)
test 'set target_worker_id recursively' do
e = element('label', '@mytest', {}, [ element('filter', '**'), element('match', '**', {}, [ element('store'), element('store') ]) ])
e.set_target_worker_id(1)
assert_equal 1, e.target_worker_id
assert_equal 1, e.elements[0].target_worker_id
assert_equal 1, e.elements[1].target_worker_id
assert_equal 1, e.elements[1].elements[0].target_worker_id
assert_equal 1, e.elements[1].elements[1].target_worker_id
assert_equal [1], e.target_worker_ids
assert_equal [1], e.elements[0].target_worker_ids
assert_equal [1], e.elements[1].target_worker_ids
assert_equal [1], e.elements[1].elements[0].target_worker_ids
assert_equal [1], e.elements[1].elements[1].target_worker_ids
end
end

Expand All @@ -434,12 +434,24 @@ def element(name = 'ROOT', arg = '', attrs = {}, elements = [], unused = nil)
assert e.for_this_worker?
end

test 'target_worker_ids includes current worker_id' do
e = element()
e.set_target_worker_ids([0])
assert e.for_this_worker?
end

test 'target_worker_id != current worker_id' do
e = element()
e.set_target_worker_id(1)
assert_false e.for_this_worker?
end

test 'target_worker_ids does not includes current worker_id' do
e = element()
e.set_target_worker_ids([1, 2])
assert_false e.for_this_worker?
end

test "doesn't have target_worker_id" do
e = element()
assert_false e.for_this_worker?
Expand All @@ -453,12 +465,24 @@ def element(name = 'ROOT', arg = '', attrs = {}, elements = [], unused = nil)
assert_false e.for_another_worker?
end

test 'target_worker_ids contains current worker_id' do
e = element()
e.set_target_worker_ids([0, 1])
assert_false e.for_another_worker?
end

test 'target_worker_id != current worker_id' do
e = element()
e.set_target_worker_id(1)
assert e.for_another_worker?
end

test 'target_worker_ids does not contains current worker_id' do
e = element()
e.set_target_worker_ids([1, 2])
assert e.for_another_worker?
end

test "doesn't have target_worker_id" do
e = element()
assert_false e.for_another_worker?
Expand Down
76 changes: 76 additions & 0 deletions test/test_root_agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -686,6 +686,54 @@ def configure_ra(conf_str)
end
end

test 'raises configuration error for too big worker id on multi workers syntax' do
errmsg = "worker id 4 specified by <worker> directive is not allowed. Available worker id is between 0 and 3"
assert_raise Fluent::ConfigError.new(errmsg) do
conf = <<-EOC
<worker 1-4>
</worker>
EOC
configure_ra(conf)
end
end

test 'raises configuration error for worker id collisions on multi workers syntax' do
errmsg = "specified worker_id<2> collisions is detected on <worker> directive. Available worker id(s): [3]"
assert_raise Fluent::ConfigError.new(errmsg) do
conf = <<-EOC
<worker 0-2>
</worker>
<worker 2-4>
</worker>
EOC
configure_ra(conf)
end
end

test 'raises configuration error for worker id collisions on multi workers syntax when multi avaliable worker_ids are left' do
errmsg = "specified worker_id<1> collisions is detected on <worker> directive. Available worker id(s): [2, 3]"
assert_raise Fluent::ConfigError.new(errmsg) do
conf = <<-EOC
<worker 0-1>
</worker>
<worker 1-3>
</worker>
EOC
configure_ra(conf)
end
end

test 'raises configuration error for too big worker id on invalid reversed multi workers syntax' do
errmsg = "greater first_worker_id<3> than last_worker_id<0> specified by <worker> directive is not allowed. Available multi worker assign syntax is <smaller_worker_id>-<greater_worker_id>"
assert_raise Fluent::ConfigError.new(errmsg) do
conf = <<-EOC
<worker 3-0>
</worker>
EOC
configure_ra(conf)
end
end

test 'raises configuration error for invalid elements as a child of worker section' do
errmsg = '<worker> section cannot have <system> directive'
assert_raise Fluent::ConfigError.new(errmsg) do
Expand Down Expand Up @@ -844,5 +892,33 @@ def configure_ra(conf_str)
assert_equal 0, ra.labels.size
refute ra.error_collector
end

test 'with plugins for workers syntax should match worker_id equals to 2' do
conf = <<-EOC
<worker 0-2>
<source>
@type forward
</source>
<filter **>
@type test_filter
@id test_filter
</filter>
<match pattern>
@type stdout
</match>
<label @ERROR>
<match>
@type null
</match>
</label>
</worker>
EOC

ra = configure_ra(conf)
assert_kind_of Fluent::Plugin::ForwardInput, ra.inputs.first
assert_kind_of Fluent::Plugin::StdoutOutput, ra.outputs.first
assert_kind_of FluentTestFilter, ra.filters.first
assert ra.error_collector
end
end
end

0 comments on commit 07841c0

Please sign in to comment.