From 5702b333648c6e47dffbfe7e93a75f6a79aebdd4 Mon Sep 17 00:00:00 2001 From: Yuta Iwama Date: Tue, 29 Oct 2019 15:24:51 +0900 Subject: [PATCH 1/7] Dump config if dry-run mode This change had been introduced since https://github.com/fluent/fluentd/pull/1580/files . but even if dry-run mode, config need to show itself Signed-off-by: Yuta Iwama --- lib/fluent/supervisor.rb | 1 - 1 file changed, 1 deletion(-) diff --git a/lib/fluent/supervisor.rb b/lib/fluent/supervisor.rb index 1a0832f085..46472590ae 100644 --- a/lib/fluent/supervisor.rb +++ b/lib/fluent/supervisor.rb @@ -590,7 +590,6 @@ def create_socket_manager def dry_run_cmd $log.info "starting fluentd-#{Fluent::VERSION} as dry run mode", ruby: RUBY_VERSION - @system_config.suppress_config_dump = true dry_run exit 0 rescue => e From 6f0ae3b9dce71e1c3529d29e5756030af4a51ada Mon Sep 17 00:00:00 2001 From: Yuta Iwama Date: Tue, 29 Oct 2019 17:44:22 +0900 Subject: [PATCH 2/7] Add supervisor_mode to Engine class and if supervisor_mode, worker_id returns -1 because we need to identify worker0 and supervisor Signed-off-by: Yuta Iwama --- lib/fluent/agent.rb | 4 +++- lib/fluent/engine.rb | 20 ++++++++++++++++---- lib/fluent/root_agent.rb | 16 +++++++--------- lib/fluent/supervisor.rb | 2 +- 4 files changed, 27 insertions(+), 15 deletions(-) diff --git a/lib/fluent/agent.rb b/lib/fluent/agent.rb index b75a2570c9..7e662704b1 100644 --- a/lib/fluent/agent.rb +++ b/lib/fluent/agent.rb @@ -62,7 +62,9 @@ def configure(conf) # initialize and elements conf.elements('filter', 'match').each { |e| - next if e.for_another_worker? + if !Fluent::Engine.supervisor_mode && e.for_another_worker? + next + end pattern = e.arg.empty? ? '**' : e.arg type = e['@type'] raise ConfigError, "Missing '@type' parameter on <#{e.name}> directive" unless type diff --git a/lib/fluent/engine.rb b/lib/fluent/engine.rb index e440ce461a..be106a9f33 100644 --- a/lib/fluent/engine.rb +++ b/lib/fluent/engine.rb @@ -42,15 +42,17 @@ def initialize @system_config = SystemConfig.new @dry_run_mode = false + @supervisor_mode = false end MAINLOOP_SLEEP_INTERVAL = 0.3 - attr_reader :root_agent, :system_config + attr_reader :root_agent, :system_config, :supervisor_mode attr_accessor :dry_run_mode - def init(system_config) + def init(system_config, supervisor_mode: false) @system_config = system_config + @supervisor_mode = supervisor_mode @suppress_config_dump = system_config.suppress_config_dump unless system_config.suppress_config_dump.nil? @without_source = system_config.without_source unless system_config.without_source.nil? @@ -85,17 +87,23 @@ def run_configure(conf) else "section <#{e.name}> is not used in <#{parent_name}>" end - if e.for_every_workers? + + if @dry_run_mode && @supervisor_mode + $log.warn :supervisor, message + elsif e.for_every_workers? $log.warn :worker0, message elsif e.for_this_worker? $log.warn message end next end + unless e.name == 'system' unless @without_source && e.name == 'source' message = "parameter '#{key}' in #{e.to_s.strip} is not used." - if e.for_every_workers? + if @dry_run_mode && @supervisor_mode + $log.warn :supervisor, message + elsif e.for_every_workers? $log.warn :worker0, message elsif e.for_this_worker? $log.warn message @@ -182,6 +190,10 @@ def push_log_event(tag, time, record) end def worker_id + if @supervisor_mode + return -1 + end + return @_worker_id if @_worker_id # if ENV doesn't have SERVERENGINE_WORKER_ID, it is a worker under --no-supervisor or in tests # so it's (almost) a single worker, worker_id=0 diff --git a/lib/fluent/root_agent.rb b/lib/fluent/root_agent.rb index 78fb8b81b8..fcceb039e4 100644 --- a/lib/fluent/root_agent.rb +++ b/lib/fluent/root_agent.rb @@ -89,7 +89,7 @@ def configure(conf) raise Fluent::ConfigError, "worker id #{target_worker_id} specified by directive is not allowed. Available worker id is between 0 and #{(Fluent::Engine.system_config.workers - 1)}" end available_worker_ids.delete(target_worker_id) if available_worker_ids.include?(target_worker_id) - if used_worker_ids.include?(target_worker_id) && !Fluent::Engine.dry_run_mode + if used_worker_ids.include?(target_worker_id) raise Fluent::ConfigError, "specified worker_id<#{worker_id}> collisions is detected on directive. Available worker id(s): #{available_worker_ids}" end used_worker_ids << target_worker_id @@ -100,9 +100,6 @@ def configure(conf) end end - # On dry_run mode, all worker sections have to be configured on supervisor (recognized as worker_id = 0). - target_worker_ids = [0] if Fluent::Engine.dry_run_mode - unless target_worker_ids.empty? e.set_target_worker_ids(target_worker_ids.uniq) end @@ -113,9 +110,6 @@ def configure(conf) raise Fluent::ConfigError, "worker id #{target_worker_id} specified by directive is not allowed. Available worker id is between 0 and #{(Fluent::Engine.system_config.workers - 1)}" end - ## On dry_run mode, all worker sections have to be configured on supervisor (recognized as worker_id = 0). - target_worker_id = 0 if Fluent::Engine.dry_run_mode - e.elements.each do |elem| unless ['source', 'match', 'filter', 'label'].include?(elem.name) raise Fluent::ConfigError, " section cannot have <#{elem.name}> directive" @@ -132,7 +126,9 @@ def configure(conf) # initialize @@ -863,6 +865,7 @@ module Fluent::Plugin class FakeInput < Input Fluent::Plugin.register_input('fake', self) config_param :secret, :string, secret: true + def multi_workers_ready?; true; end end end EOC