Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add <worker n> section to set a configuration for a specific worker #1507

Merged
merged 7 commits into from
Apr 13, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions example/worker_section.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
<system>
workers 4
root_dir /path/fluentd/root
</system>

<source> # top-level sections works on all workers in parallel
@type forward
port 24224
</source>

<match all> # this sections also works on all workers in parallel
@type stdout
<inject>
worker_id_key worker_id
</inject>
</match>

<worker 0> # this section works only on first worker process
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This configuration doesn't use the <worker> section feature to execute plugins which doesn't support multi process workers.

<source>
@type tail
format none
path /var/log/fluentd_test.log
pos_file /var/log/fluentd_test.pos
tag tail
rotate_wait 5
read_from_head true
refresh_interval 60
</source>

<match tail>
@type stdout
<inject>
worker_id_key worker_id
</inject>
</match>
</worker>
7 changes: 5 additions & 2 deletions lib/fluent/agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ def configure(conf)

# initialize <match> and <filter> elements
conf.elements('filter', 'match').each { |e|
next if e.for_another_worker?
pattern = e.arg.empty? ? '**' : e.arg
type = e['@type']
raise ConfigError, "Missing '@type' parameter on <#{e.name}> directive" unless type
Expand Down Expand Up @@ -121,7 +122,8 @@ def lifecycle(desc: false)
end

def add_match(type, pattern, conf)
log.info :worker0, "adding match#{@context.nil? ? '' : " in #{@context}"}", pattern: pattern, type: type
log_type = conf.for_this_worker? ? :default : :worker0
log.info log_type, "adding match#{@context.nil? ? '' : " in #{@context}"}", pattern: pattern, type: type

output = Plugin.new_output(type)
output.context_router = @event_router
Expand All @@ -142,7 +144,8 @@ def add_match(type, pattern, conf)
end

def add_filter(type, pattern, conf)
log.info :worker0, "adding filter#{@context.nil? ? '' : " in #{@context}"}", pattern: pattern, type: type
log_type = conf.for_this_worker? ? :default : :worker0
log.info log_type, "adding filter#{@context.nil? ? '' : " in #{@context}"}", pattern: pattern, type: type

filter = Plugin.new_filter(type)
filter.context_router = @event_router
Expand Down
22 changes: 22 additions & 0 deletions lib/fluent/config/element.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,13 @@ def initialize(name, arg, attrs, elements, unused = nil)

# it's global logger, not plugin logger: deprecated message should be global warning, not plugin level.
@logger = defined?($log) ? $log : nil

@target_worker_id = nil
end

attr_accessor :name, :arg, :unused, :v1_config, :corresponding_proxies, :unused_in
attr_writer :elements
attr_reader :target_worker_id

RESERVED_PARAMETERS_COMPAT = {
'@type' => 'type',
Expand Down Expand Up @@ -213,6 +216,25 @@ def self.unescape_parameter(v)
v.each_char { |c| result << LiteralParser.unescape_char(c) }
result
end

def set_target_worker_id(worker_id)
@target_worker_id = worker_id
@elements.each { |e|
e.set_target_worker_id(worker_id)
}
end

def for_every_workers?
@target_worker_id == nil
end

def for_this_worker?
@target_worker_id == Fluent::Engine.worker_id
end

def for_another_worker?
@target_worker_id != nil && @target_worker_id != Fluent::Engine.worker_id
end
end
end
end
31 changes: 22 additions & 9 deletions lib/fluent/engine.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ def initialize
@root_agent = nil
@default_loop = nil
@engine_stopped = false
@_worker_id = nil

@log_event_router = nil
@log_emit_thread = nil
Expand Down Expand Up @@ -97,12 +98,21 @@ def run_configure(conf)
else
"section <#{e.name}> is not used in <#{parent_name}>"
end
$log.warn :worker0, message
if e.for_every_workers?
$log.warn :worker0, message
elsif e.for_this_worker?
$log.warn message
end
next
end
unless e.name == 'system'
unless @without_source && e.name == 'source'
$log.warn :worker0, "parameter '#{key}' in #{e.to_s.strip} is not used."
message = "parameter '#{key}' in #{e.to_s.strip} is not used."
if e.for_every_workers?
$log.warn :worker0, message
elsif e.for_this_worker?
$log.warn message
end
end
end
}
Expand All @@ -128,7 +138,7 @@ def configure(conf)

unmatched_tags = Fluent::Log.event_tags.select{|t| !@log_event_router.match?(t) }
unless unmatched_tags.empty?
$log.warn :worker0, "match for some tags of log events are not defined (to be ignored)", tags: unmatched_tags
$log.warn "match for some tags of log events are not defined (to be ignored)", tags: unmatched_tags
end
rescue ArgumentError # ArgumentError "#{label_name} label not found"
# use default event router if <label @FLUENT_LOG> is missing in configuration
Expand All @@ -139,7 +149,7 @@ def configure(conf)

unmatched_tags = Fluent::Log.event_tags.select{|t| !@log_event_router.match?(t) }
unless unmatched_tags.empty?
$log.warn :worker0, "match for some tags of log events are not defined (to be ignored)", tags: unmatched_tags
$log.warn "match for some tags of log events are not defined (to be ignored)", tags: unmatched_tags
end
end
end
Expand Down Expand Up @@ -200,10 +210,6 @@ def log_event_loop
end

def run
# if ENV doesn't have SERVERENGINE_WORKER_ID, it is a worker under --no-supervisor or in tests
# so it's (almost) a single worker, worker_id=0
worker_id = (ENV['SERVERENGINE_WORKER_ID'] || 0).to_i

begin
$log.info "starting fluentd worker", pid: Process.pid, ppid: Process.ppid, worker: worker_id
start
Expand Down Expand Up @@ -251,8 +257,15 @@ def push_log_event(tag, time, record)
@log_event_queue.push([tag, time, record])
end

private
def worker_id
return @_worker_id if @_worker_id
# if ENV doesn't have SERVERENGINE_WORKER_ID, it is a worker under --no-supervisor or in tests
# so it's (almost) a single worker, worker_id=0
@_worker_id = (ENV['SERVERENGINE_WORKER_ID'] || 0).to_i
@_worker_id
end

private
def start
@root_agent.start
end
Expand Down
3 changes: 3 additions & 0 deletions lib/fluent/plugin/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ def fluentd_worker_id
end

def configure(conf)
if conf.respond_to?(:for_this_worker?) && conf.for_this_worker?
system_config_override(workers: 1)
end
super
@_state ||= State.new(false, false, false, false, false, false, false, false, false)
@_state.configure = true
Expand Down
27 changes: 26 additions & 1 deletion lib/fluent/root_agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,34 @@ def initialize(log:, system_config: SystemConfig.new)
attr_reader :labels

def configure(conf)
# initialize <worker> elements
conf.elements(name: 'worker').each do |e|
target_worker_id_str = e.arg
if target_worker_id_str.empty?
raise ConfigError, "Missing worker id on <worker> directive"
end

target_worker_id = target_worker_id_str.to_i
if target_worker_id < 0 || target_worker_id > (Fluent::Engine.system_config.workers - 1)
raise ConfigError, "worker id #{target_worker_id} specified by <worker> directive is not allowed. Available worker id is between 0 and #{(Fluent::Engine.system_config.workers - 1)}"
end

e.elements.each do |elem|
unless ['source', 'match', 'filter', 'label'].include?(elem.name)
raise ConfigError, "<worker> section cannot have <#{elem.name}> directive"
end
elem.set_target_worker_id(target_worker_id)
end
conf += e
end
conf.elements.delete_if{|e| e.name == 'worker'}

error_label_config = nil

# initialize <label> elements before configuring all plugins to avoid 'label not found' in input, filter and output.
label_configs = {}
conf.elements(name: 'label').each { |e|
next if e.for_another_worker?
name = e.arg
raise ConfigError, "Missing symbol argument on <label> directive" if name.empty?

Expand All @@ -90,6 +113,7 @@ def configure(conf)
log.info :worker0, "'--without-source' is applied. Ignore <source> sections"
else
conf.elements(name: 'source').each { |e|
next if e.for_another_worker?
type = e['@type']
raise ConfigError, "Missing '@type' parameter on <source> directive" unless type
add_source(type, e)
Expand Down Expand Up @@ -235,7 +259,8 @@ def suppress_interval(interval_time)
end

def add_source(type, conf)
log.info :worker0, "adding source", type: type
log_type = conf.for_this_worker? ? :default : :worker0
log.info log_type, "adding source", type: type

input = Plugin.new_input(type)
# <source> emits events to the top-level event router (RootAgent#event_router).
Expand Down
Loading