diff --git a/lib/fluent/agent.rb b/lib/fluent/agent.rb index b75a2570c9..7e662704b1 100644 --- a/lib/fluent/agent.rb +++ b/lib/fluent/agent.rb @@ -62,7 +62,9 @@ def configure(conf) # initialize and elements conf.elements('filter', 'match').each { |e| - next if e.for_another_worker? + if !Fluent::Engine.supervisor_mode && e.for_another_worker? + next + end pattern = e.arg.empty? ? '**' : e.arg type = e['@type'] raise ConfigError, "Missing '@type' parameter on <#{e.name}> directive" unless type diff --git a/lib/fluent/command/fluentd.rb b/lib/fluent/command/fluentd.rb index dc5a075b63..924027f038 100644 --- a/lib/fluent/command/fluentd.rb +++ b/lib/fluent/command/fluentd.rb @@ -323,7 +323,7 @@ supervisor = Fluent::Supervisor.new(opts) supervisor.configure(supervisor: true) - supervisor.run_supervisor + supervisor.run_supervisor(dry_run: opts[:dry_run]) else if opts[:standalone_worker] && opts[:workers] && opts[:workers] > 1 puts "Error: multi workers is not supported with --no-supervisor" diff --git a/lib/fluent/engine.rb b/lib/fluent/engine.rb index e440ce461a..368f5e1cf7 100644 --- a/lib/fluent/engine.rb +++ b/lib/fluent/engine.rb @@ -41,16 +41,16 @@ def initialize @fluent_log_event_router = nil @system_config = SystemConfig.new - @dry_run_mode = false + @supervisor_mode = false end MAINLOOP_SLEEP_INTERVAL = 0.3 - attr_reader :root_agent, :system_config - attr_accessor :dry_run_mode + attr_reader :root_agent, :system_config, :supervisor_mode - def init(system_config) + def init(system_config, supervisor_mode: false) @system_config = system_config + @supervisor_mode = supervisor_mode @suppress_config_dump = system_config.suppress_config_dump unless system_config.suppress_config_dump.nil? @without_source = system_config.without_source unless system_config.without_source.nil? @@ -75,34 +75,29 @@ def parse_config(io, fname, basepath = Dir.pwd, v1_config = false) end end - def run_configure(conf) + def run_configure(conf, dry_run: false) configure(conf) - conf.check_not_fetched { |key, e| + conf.check_not_fetched do |key, e| parent_name, plugin_name = e.unused_in - if parent_name - message = if plugin_name - "section <#{e.name}> is not used in <#{parent_name}> of #{plugin_name} plugin" - else - "section <#{e.name}> is not used in <#{parent_name}>" - end - if e.for_every_workers? - $log.warn :worker0, message - elsif e.for_this_worker? - $log.warn message - end - next + message = if parent_name && plugin_name + "section <#{e.name}> is not used in <#{parent_name}> of #{plugin_name} plugin" + elsif parent_name + "section <#{e.name}> is not used in <#{parent_name}>" + elsif e.name != 'system' && !(@without_source && e.name == 'source') + "parameter '#{key}' in #{e.to_s.strip} is not used." + else + nil + end + next if message.nil? + + if dry_run && @supervisor_mode + $log.warn :supervisor, message + elsif e.for_every_workers? + $log.warn :worker0, message + elsif e.for_this_worker? + $log.warn message end - unless e.name == 'system' - unless @without_source && e.name == 'source' - message = "parameter '#{key}' in #{e.to_s.strip} is not used." - if e.for_every_workers? - $log.warn :worker0, message - elsif e.for_this_worker? - $log.warn message - end - end - end - } + end end def configure(conf) @@ -182,6 +177,10 @@ def push_log_event(tag, time, record) end def worker_id + if @supervisor_mode + return -1 + end + return @_worker_id if @_worker_id # if ENV doesn't have SERVERENGINE_WORKER_ID, it is a worker under --no-supervisor or in tests # so it's (almost) a single worker, worker_id=0 diff --git a/lib/fluent/plugin/base.rb b/lib/fluent/plugin/base.rb index a7474db007..b5c16a69e6 100644 --- a/lib/fluent/plugin/base.rb +++ b/lib/fluent/plugin/base.rb @@ -52,7 +52,7 @@ def fluentd_worker_id end def configure(conf) - if conf.respond_to?(:for_this_worker?) && conf.for_this_worker? + if Fluent::Engine.supervisor_mode || (conf.respond_to?(:for_this_worker?) && conf.for_this_worker?) workers = if conf.target_worker_ids && !conf.target_worker_ids.empty? conf.target_worker_ids.size else diff --git a/lib/fluent/root_agent.rb b/lib/fluent/root_agent.rb index 78fb8b81b8..fcceb039e4 100644 --- a/lib/fluent/root_agent.rb +++ b/lib/fluent/root_agent.rb @@ -89,7 +89,7 @@ def configure(conf) raise Fluent::ConfigError, "worker id #{target_worker_id} specified by directive is not allowed. Available worker id is between 0 and #{(Fluent::Engine.system_config.workers - 1)}" end available_worker_ids.delete(target_worker_id) if available_worker_ids.include?(target_worker_id) - if used_worker_ids.include?(target_worker_id) && !Fluent::Engine.dry_run_mode + if used_worker_ids.include?(target_worker_id) raise Fluent::ConfigError, "specified worker_id<#{worker_id}> collisions is detected on directive. Available worker id(s): #{available_worker_ids}" end used_worker_ids << target_worker_id @@ -100,9 +100,6 @@ def configure(conf) end end - # On dry_run mode, all worker sections have to be configured on supervisor (recognized as worker_id = 0). - target_worker_ids = [0] if Fluent::Engine.dry_run_mode - unless target_worker_ids.empty? e.set_target_worker_ids(target_worker_ids.uniq) end @@ -113,9 +110,6 @@ def configure(conf) raise Fluent::ConfigError, "worker id #{target_worker_id} specified by directive is not allowed. Available worker id is between 0 and #{(Fluent::Engine.system_config.workers - 1)}" end - ## On dry_run mode, all worker sections have to be configured on supervisor (recognized as worker_id = 0). - target_worker_id = 0 if Fluent::Engine.dry_run_mode - e.elements.each do |elem| unless ['source', 'match', 'filter', 'label'].include?(elem.name) raise Fluent::ConfigError, " section cannot have <#{elem.name}> directive" @@ -132,7 +126,9 @@ def configure(conf) # initialize @@ -863,6 +865,7 @@ module Fluent::Plugin class FakeInput < Input Fluent::Plugin.register_input('fake', self) config_param :secret, :string, secret: true + def multi_workers_ready?; true; end end end EOC