Skip to content

Commit

Permalink
Add <worker n> section to set a configuration for a specific worker
Browse files Browse the repository at this point in the history
  • Loading branch information
Yuki Ito committed Mar 23, 2017
1 parent 881d481 commit a23d1ea
Show file tree
Hide file tree
Showing 10 changed files with 546 additions and 14 deletions.
32 changes: 32 additions & 0 deletions example/worker_section.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
<system>
workers 8
root_dir /path/fluentd/root
</system>

<source> # top-level sections works on all workers in parallel
@id edge_input
@type forward
@label @traffic
port 24224
</source>

<label @traffic> # this section works on all workers too
<match **>
@id backend_output
@type forward
</match>
</label>

<worker 0> # this section works only on first worker process
<source>
@id system_log_reader
@type tail
tag monitoring
path /path/now/watching/...
</source>
<match monitoring>
@id output_for_monitoring
@type elasticsearch
# ...
</match>
</worker>
7 changes: 5 additions & 2 deletions lib/fluent/agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ def configure(conf)

# initialize <match> and <filter> elements
conf.elements('filter', 'match').each { |e|
next if e.has_target? && e.target_worker_id != Fluent::Engine.worker_id
pattern = e.arg.empty? ? '**' : e.arg
type = e['@type']
raise ConfigError, "Missing '@type' parameter on <#{e.name}> directive" unless type
Expand Down Expand Up @@ -121,7 +122,8 @@ def lifecycle(desc: false)
end

def add_match(type, pattern, conf)
log.info :worker0, "adding match#{@context.nil? ? '' : " in #{@context}"}", pattern: pattern, type: type
log_type = conf.target_worker_id == Fluent::Engine.worker_id ? :default : :worker0
log.info log_type, "adding match#{@context.nil? ? '' : " in #{@context}"}", pattern: pattern, type: type

output = Plugin.new_output(type)
output.context_router = @event_router
Expand All @@ -142,7 +144,8 @@ def add_match(type, pattern, conf)
end

def add_filter(type, pattern, conf)
log.info :worker0, "adding filter#{@context.nil? ? '' : " in #{@context}"}", pattern: pattern, type: type
log_type = conf.target_worker_id == Fluent::Engine.worker_id ? :default : :worker0
log.info log_type, "adding filter#{@context.nil? ? '' : " in #{@context}"}", pattern: pattern, type: type

filter = Plugin.new_filter(type)
filter.context_router = @event_router
Expand Down
14 changes: 14 additions & 0 deletions lib/fluent/config/element.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,13 @@ def initialize(name, arg, attrs, elements, unused = nil)

# it's global logger, not plugin logger: deprecated message should be global warning, not plugin level.
@logger = defined?($log) ? $log : nil

@target_worker_id = nil
end

attr_accessor :name, :arg, :unused, :v1_config, :corresponding_proxies, :unused_in
attr_writer :elements
attr_reader :target_worker_id

RESERVED_PARAMETERS_COMPAT = {
'@type' => 'type',
Expand Down Expand Up @@ -213,6 +216,17 @@ def self.unescape_parameter(v)
v.each_char { |c| result << LiteralParser.unescape_char(c) }
result
end

def set_target_worker_id(worker_id)
@target_worker_id = worker_id
@elements.each { |e|
e.set_target_worker_id(worker_id)
}
end

def has_target?
!!@target_worker_id
end
end
end
end
29 changes: 20 additions & 9 deletions lib/fluent/engine.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ def initialize
@root_agent = nil
@default_loop = nil
@engine_stopped = false
@_worker_id = nil

@log_event_router = nil
@log_emit_thread = nil
Expand Down Expand Up @@ -97,12 +98,21 @@ def run_configure(conf)
else
"section <#{e.name}> is not used in <#{parent_name}>"
end
$log.warn :worker0, message
if !e.has_target?
$log.warn :worker0, message
elsif e.target_worker_id == worker_id
$log.warn message
end
next
end
unless e.name == 'system'
unless @without_source && e.name == 'source'
$log.warn :worker0, "parameter '#{key}' in #{e.to_s.strip} is not used."
message = "parameter '#{key}' in #{e.to_s.strip} is not used."
if !e.has_target?
$log.warn :worker0, message
elsif e.target_worker_id == worker_id
$log.warn message
end
end
end
}
Expand All @@ -128,7 +138,7 @@ def configure(conf)

unmatched_tags = Fluent::Log.event_tags.select{|t| !@log_event_router.match?(t) }
unless unmatched_tags.empty?
$log.warn :worker0, "match for some tags of log events are not defined (to be ignored)", tags: unmatched_tags
$log.warn "match for some tags of log events are not defined (to be ignored)", tags: unmatched_tags
end
rescue ArgumentError # ArgumentError "#{label_name} label not found"
# use default event router if <label @FLUENT_LOG> is missing in configuration
Expand All @@ -139,7 +149,7 @@ def configure(conf)

unmatched_tags = Fluent::Log.event_tags.select{|t| !@log_event_router.match?(t) }
unless unmatched_tags.empty?
$log.warn :worker0, "match for some tags of log events are not defined (to be ignored)", tags: unmatched_tags
$log.warn "match for some tags of log events are not defined (to be ignored)", tags: unmatched_tags
end
end
end
Expand Down Expand Up @@ -200,10 +210,6 @@ def log_event_loop
end

def run
# if ENV doesn't have SERVERENGINE_WORKER_ID, it is a worker under --no-supervisor or in tests
# so it's (almost) a single worker, worker_id=0
worker_id = (ENV['SERVERENGINE_WORKER_ID'] || 0).to_i

begin
$log.info "starting fluentd worker", pid: Process.pid, ppid: Process.ppid, worker: worker_id
start
Expand Down Expand Up @@ -251,8 +257,13 @@ def push_log_event(tag, time, record)
@log_event_queue.push([tag, time, record])
end

private
def worker_id
return @_worker_id if @_worker_id
@_worker_id = (ENV['SERVERENGINE_WORKER_ID'] || 0).to_i
@_worker_id
end

private
def start
@root_agent.start
end
Expand Down
3 changes: 3 additions & 0 deletions lib/fluent/plugin/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ def fluentd_worker_id
end

def configure(conf)
if conf.respond_to?(:target_worker_id) && conf.target_worker_id == Fluent::Engine.worker_id
system_config_override(workers: 1)
end
super
@_state ||= State.new(false, false, false, false, false, false, false, false, false)
@_state.configure = true
Expand Down
27 changes: 26 additions & 1 deletion lib/fluent/root_agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,34 @@ def initialize(log:, system_config: SystemConfig.new)
attr_reader :labels

def configure(conf)
# initialize <worker> elements
conf.elements(name: 'worker').each do |e|
target_worker_id_str = e.arg
if target_worker_id_str.empty?
raise ConfigError, "Missing worker id on <worker> directive"
end

target_worker_id = target_worker_id_str.to_i
if target_worker_id < 0 || target_worker_id > (Fluent::Engine.system_config.workers - 1)
raise ConfigError, "worker#{target_worker_id} specified by <worker> directive doesn't exist. Specify id between 0 and #{(Fluent::Engine.system_config.workers - 1)}"
end

e.elements.each do |elem|
unless ['source', 'match', 'filter', 'label'].include?(elem.name)
raise ConfigError, "<worker> section cannot have <#{elem.name}> directive"
end
elem.set_target_worker_id(target_worker_id)
end
conf += e
end
conf.elements.delete_if{|e| e.name == 'worker'}

error_label_config = nil

# initialize <label> elements before configuring all plugins to avoid 'label not found' in input, filter and output.
label_configs = {}
conf.elements(name: 'label').each { |e|
next if e.has_target? && e.target_worker_id != Fluent::Engine.worker_id
name = e.arg
raise ConfigError, "Missing symbol argument on <label> directive" if name.empty?

Expand All @@ -90,6 +113,7 @@ def configure(conf)
log.info :worker0, "'--without-source' is applied. Ignore <source> sections"
else
conf.elements(name: 'source').each { |e|
next if e.has_target? && e.target_worker_id != Fluent::Engine.worker_id
type = e['@type']
raise ConfigError, "Missing '@type' parameter on <source> directive" unless type
add_source(type, e)
Expand Down Expand Up @@ -235,7 +259,8 @@ def suppress_interval(interval_time)
end

def add_source(type, conf)
log.info :worker0, "adding source", type: type
log_type = conf.target_worker_id == Fluent::Engine.worker_id ? :default : :worker0
log.info log_type, "adding source", type: type

input = Plugin.new_input(type)
# <source> emits events to the top-level event router (RootAgent#event_router).
Expand Down
Loading

0 comments on commit a23d1ea

Please sign in to comment.