Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add <worker n> section to set a configuration for a specific worker #1507

Merged
merged 7 commits into from
Apr 13, 2017
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions example/worker_section.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
<system>
workers 8
root_dir /path/fluentd/root
</system>

<source> # top-level sections works on all workers in parallel
@type forward
port 24224
</source>

<match all> # this section works on all workers too
@type stdout
<inject>
worker_id_key worker_id
</inject>
</match>

<worker 0> # this section works only on first worker process
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This configuration doesn't use the <worker> section feature to execute plugins which doesn't support multi process workers.

<match worker0>
@type stdout
<inject>
worker_id_key worker_id
</inject>
</match>
</worker>
7 changes: 5 additions & 2 deletions lib/fluent/agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ def configure(conf)

# initialize <match> and <filter> elements
conf.elements('filter', 'match').each { |e|
next if e.has_target? && e.target_worker_id != Fluent::Engine.worker_id
pattern = e.arg.empty? ? '**' : e.arg
type = e['@type']
raise ConfigError, "Missing '@type' parameter on <#{e.name}> directive" unless type
Expand Down Expand Up @@ -121,7 +122,8 @@ def lifecycle(desc: false)
end

def add_match(type, pattern, conf)
log.info :worker0, "adding match#{@context.nil? ? '' : " in #{@context}"}", pattern: pattern, type: type
log_type = conf.target_worker_id == Fluent::Engine.worker_id ? :default : :worker0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extract to method is better too?

log.info log_type, "adding match#{@context.nil? ? '' : " in #{@context}"}", pattern: pattern, type: type

output = Plugin.new_output(type)
output.context_router = @event_router
Expand All @@ -142,7 +144,8 @@ def add_match(type, pattern, conf)
end

def add_filter(type, pattern, conf)
log.info :worker0, "adding filter#{@context.nil? ? '' : " in #{@context}"}", pattern: pattern, type: type
log_type = conf.target_worker_id == Fluent::Engine.worker_id ? :default : :worker0
log.info log_type, "adding filter#{@context.nil? ? '' : " in #{@context}"}", pattern: pattern, type: type

filter = Plugin.new_filter(type)
filter.context_router = @event_router
Expand Down
14 changes: 14 additions & 0 deletions lib/fluent/config/element.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,13 @@ def initialize(name, arg, attrs, elements, unused = nil)

# it's global logger, not plugin logger: deprecated message should be global warning, not plugin level.
@logger = defined?($log) ? $log : nil

@target_worker_id = nil
end

attr_accessor :name, :arg, :unused, :v1_config, :corresponding_proxies, :unused_in
attr_writer :elements
attr_reader :target_worker_id

RESERVED_PARAMETERS_COMPAT = {
'@type' => 'type',
Expand Down Expand Up @@ -213,6 +216,17 @@ def self.unescape_parameter(v)
v.each_char { |c| result << LiteralParser.unescape_char(c) }
result
end

def set_target_worker_id(worker_id)
@target_worker_id = worker_id
@elements.each { |e|
e.set_target_worker_id(worker_id)
}
end

def has_target?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#has_target doesn't show the correct meaning of this method. What's the target?
#has_target_worker_id looks better.

!!@target_worker_id
end
end
end
end
29 changes: 20 additions & 9 deletions lib/fluent/engine.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ def initialize
@root_agent = nil
@default_loop = nil
@engine_stopped = false
@_worker_id = nil

@log_event_router = nil
@log_emit_thread = nil
Expand Down Expand Up @@ -97,12 +98,21 @@ def run_configure(conf)
else
"section <#{e.name}> is not used in <#{parent_name}>"
end
$log.warn :worker0, message
if !e.has_target?
$log.warn :worker0, message
elsif e.target_worker_id == worker_id
$log.warn message
end
next
end
unless e.name == 'system'
unless @without_source && e.name == 'source'
$log.warn :worker0, "parameter '#{key}' in #{e.to_s.strip} is not used."
message = "parameter '#{key}' in #{e.to_s.strip} is not used."
if !e.has_target?
$log.warn :worker0, message
elsif e.target_worker_id == worker_id
$log.warn message
end
end
end
}
Expand All @@ -128,7 +138,7 @@ def configure(conf)

unmatched_tags = Fluent::Log.event_tags.select{|t| !@log_event_router.match?(t) }
unless unmatched_tags.empty?
$log.warn :worker0, "match for some tags of log events are not defined (to be ignored)", tags: unmatched_tags
$log.warn "match for some tags of log events are not defined (to be ignored)", tags: unmatched_tags
end
rescue ArgumentError # ArgumentError "#{label_name} label not found"
# use default event router if <label @FLUENT_LOG> is missing in configuration
Expand All @@ -139,7 +149,7 @@ def configure(conf)

unmatched_tags = Fluent::Log.event_tags.select{|t| !@log_event_router.match?(t) }
unless unmatched_tags.empty?
$log.warn :worker0, "match for some tags of log events are not defined (to be ignored)", tags: unmatched_tags
$log.warn "match for some tags of log events are not defined (to be ignored)", tags: unmatched_tags
end
end
end
Expand Down Expand Up @@ -200,10 +210,6 @@ def log_event_loop
end

def run
# if ENV doesn't have SERVERENGINE_WORKER_ID, it is a worker under --no-supervisor or in tests
# so it's (almost) a single worker, worker_id=0
worker_id = (ENV['SERVERENGINE_WORKER_ID'] || 0).to_i

begin
$log.info "starting fluentd worker", pid: Process.pid, ppid: Process.ppid, worker: worker_id
start
Expand Down Expand Up @@ -251,8 +257,13 @@ def push_log_event(tag, time, record)
@log_event_queue.push([tag, time, record])
end

private
def worker_id
return @_worker_id if @_worker_id
@_worker_id = (ENV['SERVERENGINE_WORKER_ID'] || 0).to_i
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Above deleted comment should be here.

@_worker_id
end

private
def start
@root_agent.start
end
Expand Down
3 changes: 3 additions & 0 deletions lib/fluent/plugin/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ def fluentd_worker_id
end

def configure(conf)
if conf.respond_to?(:target_worker_id) && conf.target_worker_id == Fluent::Engine.worker_id
system_config_override(workers: 1)
end
super
@_state ||= State.new(false, false, false, false, false, false, false, false, false)
@_state.configure = true
Expand Down
27 changes: 26 additions & 1 deletion lib/fluent/root_agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,34 @@ def initialize(log:, system_config: SystemConfig.new)
attr_reader :labels

def configure(conf)
# initialize <worker> elements
conf.elements(name: 'worker').each do |e|
target_worker_id_str = e.arg
if target_worker_id_str.empty?
raise ConfigError, "Missing worker id on <worker> directive"
end

target_worker_id = target_worker_id_str.to_i
if target_worker_id < 0 || target_worker_id > (Fluent::Engine.system_config.workers - 1)
raise ConfigError, "worker#{target_worker_id} specified by <worker> directive doesn't exist. Specify id between 0 and #{(Fluent::Engine.system_config.workers - 1)}"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If specify -1, log message is worker-1. This is unclear, worker - 1 or worker -1.

"worker id #{target_worker_id} specified in directive is not allowed. Available worker id is between 0 and #{(Fluent::Engine.system_config.workers - 1)}"

end

e.elements.each do |elem|
unless ['source', 'match', 'filter', 'label'].include?(elem.name)
raise ConfigError, "<worker> section cannot have <#{elem.name}> directive"
end
elem.set_target_worker_id(target_worker_id)
end
conf += e
end
conf.elements.delete_if{|e| e.name == 'worker'}

error_label_config = nil

# initialize <label> elements before configuring all plugins to avoid 'label not found' in input, filter and output.
label_configs = {}
conf.elements(name: 'label').each { |e|
next if e.has_target? && e.target_worker_id != Fluent::Engine.worker_id
name = e.arg
raise ConfigError, "Missing symbol argument on <label> directive" if name.empty?

Expand All @@ -90,6 +113,7 @@ def configure(conf)
log.info :worker0, "'--without-source' is applied. Ignore <source> sections"
else
conf.elements(name: 'source').each { |e|
next if e.has_target? && e.target_worker_id != Fluent::Engine.worker_id
type = e['@type']
raise ConfigError, "Missing '@type' parameter on <source> directive" unless type
add_source(type, e)
Expand Down Expand Up @@ -235,7 +259,8 @@ def suppress_interval(interval_time)
end

def add_source(type, conf)
log.info :worker0, "adding source", type: type
log_type = conf.target_worker_id == Fluent::Engine.worker_id ? :default : :worker0
log.info log_type, "adding source", type: type

input = Plugin.new_input(type)
# <source> emits events to the top-level event router (RootAgent#event_router).
Expand Down
Loading