diff --git a/lib/fluent/config/element.rb b/lib/fluent/config/element.rb index 177fa009ab..1359faa5af 100644 --- a/lib/fluent/config/element.rb +++ b/lib/fluent/config/element.rb @@ -36,12 +36,12 @@ def initialize(name, arg, attrs, elements, unused = nil) # it's global logger, not plugin logger: deprecated message should be global warning, not plugin level. @logger = defined?($log) ? $log : nil - @target_worker_id = nil + @target_worker_ids = [] end attr_accessor :name, :arg, :unused, :v1_config, :corresponding_proxies, :unused_in attr_writer :elements - attr_reader :target_worker_id + attr_reader :target_worker_ids RESERVED_PARAMETERS_COMPAT = { '@type' => 'type', @@ -223,22 +223,29 @@ def self.unescape_parameter(v) end def set_target_worker_id(worker_id) - @target_worker_id = worker_id + @target_worker_ids = [worker_id] @elements.each { |e| e.set_target_worker_id(worker_id) } end + def set_target_worker_ids(worker_ids) + @target_worker_ids = worker_ids.uniq + @elements.each { |e| + e.set_target_worker_ids(worker_ids.uniq) + } + end + def for_every_workers? - @target_worker_id == nil + @target_worker_ids.empty? end def for_this_worker? - @target_worker_id == Fluent::Engine.worker_id + @target_worker_ids.include?(Fluent::Engine.worker_id) end def for_another_worker? - @target_worker_id != nil && @target_worker_id != Fluent::Engine.worker_id + !@target_worker_ids.empty? && !@target_worker_ids.include?(Fluent::Engine.worker_id) end end end diff --git a/lib/fluent/plugin/base.rb b/lib/fluent/plugin/base.rb index 6011ae137e..70ded69bd3 100644 --- a/lib/fluent/plugin/base.rb +++ b/lib/fluent/plugin/base.rb @@ -52,7 +52,12 @@ def fluentd_worker_id def configure(conf) if conf.respond_to?(:for_this_worker?) && conf.for_this_worker? - system_config_override(workers: 1) + workers = if conf.target_worker_ids && !conf.target_worker_ids.empty? + conf.target_worker_ids.size + else + 1 + end + system_config_override(workers: workers) end super @_state ||= State.new(false, false, false, false, false, false, false, false, false) diff --git a/lib/fluent/root_agent.rb b/lib/fluent/root_agent.rb index a3db981bb5..e5d8a9c7f4 100644 --- a/lib/fluent/root_agent.rb +++ b/lib/fluent/root_agent.rb @@ -64,26 +64,64 @@ def initialize(log:, system_config: SystemConfig.new) attr_reader :labels def configure(conf) + used_worker_ids = [] + available_worker_ids = (0..Fluent::Engine.system_config.workers - 1).to_a # initialize elements conf.elements(name: 'worker').each do |e| target_worker_id_str = e.arg if target_worker_id_str.empty? - raise ConfigError, "Missing worker id on directive" + raise Fluent::ConfigError, "Missing worker id on directive" end - target_worker_id = target_worker_id_str.to_i - if target_worker_id < 0 || target_worker_id > (Fluent::Engine.system_config.workers - 1) - raise ConfigError, "worker id #{target_worker_id} specified by directive is not allowed. Available worker id is between 0 and #{(Fluent::Engine.system_config.workers - 1)}" - end + target_worker_ids = target_worker_id_str.split("-") + if target_worker_ids.size == 2 + first_worker_id = target_worker_ids.first.to_i + last_worker_id = target_worker_ids.last.to_i + if first_worker_id > last_worker_id + raise Fluent::ConfigError, "greater first_worker_id<#{first_worker_id}> than last_worker_id<#{last_worker_id}> specified by directive is not allowed. Available multi worker assign syntax is -" + end + target_worker_ids = [] + first_worker_id.step(last_worker_id, 1) do |worker_id| + target_worker_id = worker_id.to_i + target_worker_ids << target_worker_id + + if target_worker_id < 0 || target_worker_id > (Fluent::Engine.system_config.workers - 1) + raise Fluent::ConfigError, "worker id #{target_worker_id} specified by directive is not allowed. Available worker id is between 0 and #{(Fluent::Engine.system_config.workers - 1)}" + end + available_worker_ids.delete(target_worker_id) if available_worker_ids.include?(target_worker_id) + if used_worker_ids.include?(target_worker_id) && !Fluent::Engine.dry_run_mode + raise Fluent::ConfigError, "specified worker_id<#{worker_id}> collisions is detected on directive. Available worker id(s): #{available_worker_ids}" + end + used_worker_ids << target_worker_id + + e.elements.each do |elem| + unless ['source', 'match', 'filter', 'label'].include?(elem.name) + raise Fluent::ConfigError, " section cannot have <#{elem.name}> directive" + end + end - ## On dry_run mode, all worker sections have to be configured on supervisor (recognized as worker_id = 0). - target_worker_id = 0 if Fluent::Engine.dry_run_mode + # On dry_run mode, all worker sections have to be configured on supervisor (recognized as worker_id = 0). + target_worker_ids = [0] if Fluent::Engine.dry_run_mode - e.elements.each do |elem| - unless ['source', 'match', 'filter', 'label'].include?(elem.name) - raise ConfigError, " section cannot have <#{elem.name}> directive" + unless target_worker_ids.empty? + e.set_target_worker_ids(target_worker_ids.uniq) + end + end + else + target_worker_id = target_worker_id_str.to_i + if target_worker_id < 0 || target_worker_id > (Fluent::Engine.system_config.workers - 1) + raise Fluent::ConfigError, "worker id #{target_worker_id} specified by directive is not allowed. Available worker id is between 0 and #{(Fluent::Engine.system_config.workers - 1)}" + end + + ## On dry_run mode, all worker sections have to be configured on supervisor (recognized as worker_id = 0). + target_worker_id = 0 if Fluent::Engine.dry_run_mode + + e.elements.each do |elem| + unless ['source', 'match', 'filter', 'label'].include?(elem.name) + raise Fluent::ConfigError, " section cannot have <#{elem.name}> directive" + end + elem.set_target_worker_id(target_worker_id) end - elem.set_target_worker_id(target_worker_id) end conf += e end diff --git a/test/config/test_element.rb b/test/config/test_element.rb index b09899f282..74695c3602 100644 --- a/test/config/test_element.rb +++ b/test/config/test_element.rb @@ -406,11 +406,11 @@ def element(name = 'ROOT', arg = '', attrs = {}, elements = [], unused = nil) test 'set target_worker_id recursively' do e = element('label', '@mytest', {}, [ element('filter', '**'), element('match', '**', {}, [ element('store'), element('store') ]) ]) e.set_target_worker_id(1) - assert_equal 1, e.target_worker_id - assert_equal 1, e.elements[0].target_worker_id - assert_equal 1, e.elements[1].target_worker_id - assert_equal 1, e.elements[1].elements[0].target_worker_id - assert_equal 1, e.elements[1].elements[1].target_worker_id + assert_equal [1], e.target_worker_ids + assert_equal [1], e.elements[0].target_worker_ids + assert_equal [1], e.elements[1].target_worker_ids + assert_equal [1], e.elements[1].elements[0].target_worker_ids + assert_equal [1], e.elements[1].elements[1].target_worker_ids end end @@ -434,12 +434,24 @@ def element(name = 'ROOT', arg = '', attrs = {}, elements = [], unused = nil) assert e.for_this_worker? end + test 'target_worker_ids includes current worker_id' do + e = element() + e.set_target_worker_ids([0]) + assert e.for_this_worker? + end + test 'target_worker_id != current worker_id' do e = element() e.set_target_worker_id(1) assert_false e.for_this_worker? end + test 'target_worker_ids does not includes current worker_id' do + e = element() + e.set_target_worker_ids([1, 2]) + assert_false e.for_this_worker? + end + test "doesn't have target_worker_id" do e = element() assert_false e.for_this_worker? @@ -453,12 +465,24 @@ def element(name = 'ROOT', arg = '', attrs = {}, elements = [], unused = nil) assert_false e.for_another_worker? end + test 'target_worker_ids contains current worker_id' do + e = element() + e.set_target_worker_ids([0, 1]) + assert_false e.for_another_worker? + end + test 'target_worker_id != current worker_id' do e = element() e.set_target_worker_id(1) assert e.for_another_worker? end + test 'target_worker_ids does not contains current worker_id' do + e = element() + e.set_target_worker_ids([1, 2]) + assert e.for_another_worker? + end + test "doesn't have target_worker_id" do e = element() assert_false e.for_another_worker? diff --git a/test/test_root_agent.rb b/test/test_root_agent.rb index 78d6eb0a5b..73ecd601c3 100644 --- a/test/test_root_agent.rb +++ b/test/test_root_agent.rb @@ -686,6 +686,54 @@ def configure_ra(conf_str) end end + test 'raises configuration error for too big worker id on multi workers syntax' do + errmsg = "worker id 4 specified by directive is not allowed. Available worker id is between 0 and 3" + assert_raise Fluent::ConfigError.new(errmsg) do + conf = <<-EOC + + +EOC + configure_ra(conf) + end + end + + test 'raises configuration error for worker id collisions on multi workers syntax' do + errmsg = "specified worker_id<2> collisions is detected on directive. Available worker id(s): [3]" + assert_raise Fluent::ConfigError.new(errmsg) do + conf = <<-EOC + + + + +EOC + configure_ra(conf) + end + end + + test 'raises configuration error for worker id collisions on multi workers syntax when multi avaliable worker_ids are left' do + errmsg = "specified worker_id<1> collisions is detected on directive. Available worker id(s): [2, 3]" + assert_raise Fluent::ConfigError.new(errmsg) do + conf = <<-EOC + + + + +EOC + configure_ra(conf) + end + end + + test 'raises configuration error for too big worker id on invalid reversed multi workers syntax' do + errmsg = "greater first_worker_id<3> than last_worker_id<0> specified by directive is not allowed. Available multi worker assign syntax is -" + assert_raise Fluent::ConfigError.new(errmsg) do + conf = <<-EOC + + +EOC + configure_ra(conf) + end + end + test 'raises configuration error for invalid elements as a child of worker section' do errmsg = ' section cannot have directive' assert_raise Fluent::ConfigError.new(errmsg) do @@ -844,5 +892,33 @@ def configure_ra(conf_str) assert_equal 0, ra.labels.size refute ra.error_collector end + + test 'with plugins for workers syntax should match worker_id equals to 2' do + conf = <<-EOC + + + @type forward + + + @type test_filter + @id test_filter + + + @type stdout + + + +EOC + + ra = configure_ra(conf) + assert_kind_of Fluent::Plugin::ForwardInput, ra.inputs.first + assert_kind_of Fluent::Plugin::StdoutOutput, ra.outputs.first + assert_kind_of FluentTestFilter, ra.filters.first + assert ra.error_collector + end end end