From a23d1ea5ed63f7b1d455a02212a64977e537ccad Mon Sep 17 00:00:00 2001 From: Yuki Ito Date: Fri, 17 Mar 2017 16:36:25 +0900 Subject: [PATCH] Add section to set a configuration for a specific worker --- example/worker_section.conf | 32 ++++++ lib/fluent/agent.rb | 7 +- lib/fluent/config/element.rb | 14 +++ lib/fluent/engine.rb | 29 +++-- lib/fluent/plugin/base.rb | 3 + lib/fluent/root_agent.rb | 27 ++++- test/command/test_fluentd.rb | 204 ++++++++++++++++++++++++++++++++++- test/config/test_element.rb | 25 +++++ test/test_plugin_classes.rb | 15 +++ test/test_root_agent.rb | 204 +++++++++++++++++++++++++++++++++++ 10 files changed, 546 insertions(+), 14 deletions(-) create mode 100644 example/worker_section.conf diff --git a/example/worker_section.conf b/example/worker_section.conf new file mode 100644 index 0000000000..f93962e802 --- /dev/null +++ b/example/worker_section.conf @@ -0,0 +1,32 @@ + + workers 8 + root_dir /path/fluentd/root + + + # top-level sections works on all workers in parallel + @id edge_input + @type forward + @label @traffic + port 24224 + + + + + # this section works only on first worker process + + @id system_log_reader + @type tail + tag monitoring + path /path/now/watching/... + + + @id output_for_monitoring + @type elasticsearch + # ... + + diff --git a/lib/fluent/agent.rb b/lib/fluent/agent.rb index 42f25b2d41..345316cb63 100644 --- a/lib/fluent/agent.rb +++ b/lib/fluent/agent.rb @@ -62,6 +62,7 @@ def configure(conf) # initialize and elements conf.elements('filter', 'match').each { |e| + next if e.has_target? && e.target_worker_id != Fluent::Engine.worker_id pattern = e.arg.empty? ? '**' : e.arg type = e['@type'] raise ConfigError, "Missing '@type' parameter on <#{e.name}> directive" unless type @@ -121,7 +122,8 @@ def lifecycle(desc: false) end def add_match(type, pattern, conf) - log.info :worker0, "adding match#{@context.nil? ? '' : " in #{@context}"}", pattern: pattern, type: type + log_type = conf.target_worker_id == Fluent::Engine.worker_id ? :default : :worker0 + log.info log_type, "adding match#{@context.nil? ? '' : " in #{@context}"}", pattern: pattern, type: type output = Plugin.new_output(type) output.context_router = @event_router @@ -142,7 +144,8 @@ def add_match(type, pattern, conf) end def add_filter(type, pattern, conf) - log.info :worker0, "adding filter#{@context.nil? ? '' : " in #{@context}"}", pattern: pattern, type: type + log_type = conf.target_worker_id == Fluent::Engine.worker_id ? :default : :worker0 + log.info log_type, "adding filter#{@context.nil? ? '' : " in #{@context}"}", pattern: pattern, type: type filter = Plugin.new_filter(type) filter.context_router = @event_router diff --git a/lib/fluent/config/element.rb b/lib/fluent/config/element.rb index 0641eef7eb..ffc34b826c 100644 --- a/lib/fluent/config/element.rb +++ b/lib/fluent/config/element.rb @@ -35,10 +35,13 @@ def initialize(name, arg, attrs, elements, unused = nil) # it's global logger, not plugin logger: deprecated message should be global warning, not plugin level. @logger = defined?($log) ? $log : nil + + @target_worker_id = nil end attr_accessor :name, :arg, :unused, :v1_config, :corresponding_proxies, :unused_in attr_writer :elements + attr_reader :target_worker_id RESERVED_PARAMETERS_COMPAT = { '@type' => 'type', @@ -213,6 +216,17 @@ def self.unescape_parameter(v) v.each_char { |c| result << LiteralParser.unescape_char(c) } result end + + def set_target_worker_id(worker_id) + @target_worker_id = worker_id + @elements.each { |e| + e.set_target_worker_id(worker_id) + } + end + + def has_target? + !!@target_worker_id + end end end end diff --git a/lib/fluent/engine.rb b/lib/fluent/engine.rb index ea05954ab4..590d45edd2 100644 --- a/lib/fluent/engine.rb +++ b/lib/fluent/engine.rb @@ -31,6 +31,7 @@ def initialize @root_agent = nil @default_loop = nil @engine_stopped = false + @_worker_id = nil @log_event_router = nil @log_emit_thread = nil @@ -97,12 +98,21 @@ def run_configure(conf) else "section <#{e.name}> is not used in <#{parent_name}>" end - $log.warn :worker0, message + if !e.has_target? + $log.warn :worker0, message + elsif e.target_worker_id == worker_id + $log.warn message + end next end unless e.name == 'system' unless @without_source && e.name == 'source' - $log.warn :worker0, "parameter '#{key}' in #{e.to_s.strip} is not used." + message = "parameter '#{key}' in #{e.to_s.strip} is not used." + if !e.has_target? + $log.warn :worker0, message + elsif e.target_worker_id == worker_id + $log.warn message + end end end } @@ -128,7 +138,7 @@ def configure(conf) unmatched_tags = Fluent::Log.event_tags.select{|t| !@log_event_router.match?(t) } unless unmatched_tags.empty? - $log.warn :worker0, "match for some tags of log events are not defined (to be ignored)", tags: unmatched_tags + $log.warn "match for some tags of log events are not defined (to be ignored)", tags: unmatched_tags end rescue ArgumentError # ArgumentError "#{label_name} label not found" # use default event router if