diff --git a/example/worker_section.conf b/example/worker_section.conf
new file mode 100644
index 0000000000..f93962e802
--- /dev/null
+++ b/example/worker_section.conf
@@ -0,0 +1,32 @@
+
+ workers 8
+ root_dir /path/fluentd/root
+
+
+
+
+
+
+ # this section works only on first worker process
+
+
+ @id output_for_monitoring
+ @type elasticsearch
+ # ...
+
+
diff --git a/lib/fluent/agent.rb b/lib/fluent/agent.rb
index 42f25b2d41..345316cb63 100644
--- a/lib/fluent/agent.rb
+++ b/lib/fluent/agent.rb
@@ -62,6 +62,7 @@ def configure(conf)
# initialize and elements
conf.elements('filter', 'match').each { |e|
+ next if e.has_target? && e.target_worker_id != Fluent::Engine.worker_id
pattern = e.arg.empty? ? '**' : e.arg
type = e['@type']
raise ConfigError, "Missing '@type' parameter on <#{e.name}> directive" unless type
@@ -121,7 +122,8 @@ def lifecycle(desc: false)
end
def add_match(type, pattern, conf)
- log.info :worker0, "adding match#{@context.nil? ? '' : " in #{@context}"}", pattern: pattern, type: type
+ log_type = conf.target_worker_id == Fluent::Engine.worker_id ? :default : :worker0
+ log.info log_type, "adding match#{@context.nil? ? '' : " in #{@context}"}", pattern: pattern, type: type
output = Plugin.new_output(type)
output.context_router = @event_router
@@ -142,7 +144,8 @@ def add_match(type, pattern, conf)
end
def add_filter(type, pattern, conf)
- log.info :worker0, "adding filter#{@context.nil? ? '' : " in #{@context}"}", pattern: pattern, type: type
+ log_type = conf.target_worker_id == Fluent::Engine.worker_id ? :default : :worker0
+ log.info log_type, "adding filter#{@context.nil? ? '' : " in #{@context}"}", pattern: pattern, type: type
filter = Plugin.new_filter(type)
filter.context_router = @event_router
diff --git a/lib/fluent/config/element.rb b/lib/fluent/config/element.rb
index 0641eef7eb..ffc34b826c 100644
--- a/lib/fluent/config/element.rb
+++ b/lib/fluent/config/element.rb
@@ -35,10 +35,13 @@ def initialize(name, arg, attrs, elements, unused = nil)
# it's global logger, not plugin logger: deprecated message should be global warning, not plugin level.
@logger = defined?($log) ? $log : nil
+
+ @target_worker_id = nil
end
attr_accessor :name, :arg, :unused, :v1_config, :corresponding_proxies, :unused_in
attr_writer :elements
+ attr_reader :target_worker_id
RESERVED_PARAMETERS_COMPAT = {
'@type' => 'type',
@@ -213,6 +216,17 @@ def self.unescape_parameter(v)
v.each_char { |c| result << LiteralParser.unescape_char(c) }
result
end
+
+ def set_target_worker_id(worker_id)
+ @target_worker_id = worker_id
+ @elements.each { |e|
+ e.set_target_worker_id(worker_id)
+ }
+ end
+
+ def has_target?
+ !!@target_worker_id
+ end
end
end
end
diff --git a/lib/fluent/engine.rb b/lib/fluent/engine.rb
index ea05954ab4..590d45edd2 100644
--- a/lib/fluent/engine.rb
+++ b/lib/fluent/engine.rb
@@ -31,6 +31,7 @@ def initialize
@root_agent = nil
@default_loop = nil
@engine_stopped = false
+ @_worker_id = nil
@log_event_router = nil
@log_emit_thread = nil
@@ -97,12 +98,21 @@ def run_configure(conf)
else
"section <#{e.name}> is not used in <#{parent_name}>"
end
- $log.warn :worker0, message
+ if !e.has_target?
+ $log.warn :worker0, message
+ elsif e.target_worker_id == worker_id
+ $log.warn message
+ end
next
end
unless e.name == 'system'
unless @without_source && e.name == 'source'
- $log.warn :worker0, "parameter '#{key}' in #{e.to_s.strip} is not used."
+ message = "parameter '#{key}' in #{e.to_s.strip} is not used."
+ if !e.has_target?
+ $log.warn :worker0, message
+ elsif e.target_worker_id == worker_id
+ $log.warn message
+ end
end
end
}
@@ -128,7 +138,7 @@ def configure(conf)
unmatched_tags = Fluent::Log.event_tags.select{|t| !@log_event_router.match?(t) }
unless unmatched_tags.empty?
- $log.warn :worker0, "match for some tags of log events are not defined (to be ignored)", tags: unmatched_tags
+ $log.warn "match for some tags of log events are not defined (to be ignored)", tags: unmatched_tags
end
rescue ArgumentError # ArgumentError "#{label_name} label not found"
# use default event router if