Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lifecycle method called correctly once #1242

Merged
merged 4 commits into from
Sep 28, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 8 additions & 12 deletions lib/fluent/agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -86,21 +86,12 @@ def lifecycle_control_list
lifecycle_control_list[:input] << i
end
end
recursive_output_traverse = ->(o) {
outputs.each do |o|
if o.has_router?
lifecycle_control_list[:output_with_router] << o
else
lifecycle_control_list[:output] << o
end

if o.respond_to?(:outputs)
o.outputs.each do |store|
recursive_output_traverse.call(store)
end
end
}
outputs.each do |o|
recursive_output_traverse.call(o)
end
filters.each do |f|
lifecycle_control_list[:filter] << f
Expand Down Expand Up @@ -135,9 +126,14 @@ def add_match(type, pattern, conf)
output.router = @event_router if output.respond_to?(:router=)
output.configure(conf)
@outputs << output
if output.respond_to?(:outputs) && (output.respond_to?(:multi_output?) && output.multi_output? || output.is_a?(Fluent::MultiOutput))
if output.respond_to?(:outputs) && output.respond_to?(:multi_output?) && output.multi_output?
# TODO: ruby 2.3 or later: replace `output.respond_to?(:multi_output?) && output.multi_output?` with output&.multi_output?
@outputs.push(*output.outputs)
outputs = if output.respond_to?(:static_outputs)
output.static_outputs
else
output.outputs
end
@outputs.push(*outputs)
end
@event_router.add_rule(pattern, output)

Expand Down
66 changes: 63 additions & 3 deletions lib/fluent/plugin/multi_output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class MultiOutput < Base
config_param :@type, :string, default: nil
end

attr_reader :outputs
attr_reader :outputs, :outputs_statically_created

def process(tag, es)
raise NotImplementedError, "BUG: output plugins MUST implement this method"
Expand All @@ -41,8 +41,7 @@ def process(tag, es)
def initialize
super
@outputs = []

@compat = false
@outputs_statically_created = false

@counters_monitor = Monitor.new
# TODO: well organized counters
Expand Down Expand Up @@ -78,11 +77,72 @@ def configure(conf)
end
end

def static_outputs
@outputs_statically_created = true
@outputs
end

# Child plugin's lifecycles are controlled by agent automatically.
# It calls `outputs` to traverse plugins, and invoke start/stop/*shutdown/close/terminate on these directly.
# * `start` of this plugin will be called after child plugins
# * `stop`, `*shutdown`, `close` and `terminate` of this plugin will be called before child plugins

# But when MultiOutput plugins are created dynamically (by forest plugin or others), agent cannot find
# sub-plugins. So child plugins' lifecycles MUST be controlled by MultiOutput plugin itself.
# TODO: this hack will be removed at v2.
def call_lifecycle_method(method_name, checker_name)
return if @outputs_statically_created
@outputs.each do |o|
begin
log.debug "calling #{method_name} on output plugin dynamically created", type: Fluent::Plugin.lookup_type_from_class(o.class), plugin_id: o.plugin_id
o.send(method_name) unless o.send(checker_name)
rescue Exception => e
log.warn "unexpected error while calling #{method_name} on output plugin dynamically created", plugin: o.class, plugin_id: o.plugin_id, error: e
log.warn_backtrace
end
end
end

def start
super
call_lifecycle_method(:start, :started?)
end

def after_start
super
call_lifecycle_method(:after_start, :after_started?)
end

def stop
super
call_lifecycle_method(:stop, :stopped?)
end

def before_shutdown
super
call_lifecycle_method(:before_shutdown, :before_shutdown?)
end

def shutdown
super
call_lifecycle_method(:shutdown, :shutdown?)
end

def after_shutdown
super
call_lifecycle_method(:after_shutdown, :after_shutdown?)
end

def close
super
call_lifecycle_method(:close, :closed?)
end

def terminate
super
call_lifecycle_method(:terminate, :terminated?)
end

def emit_sync(tag, es)
@counters_monitor.synchronize{ @emit_count += 1 }
begin
Expand Down
4 changes: 3 additions & 1 deletion lib/fluent/test/log.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@ module Fluent
module Test
class DummyLogDevice
attr_reader :logs
attr_accessor :flush_logs

def initialize
@logs = []
@flush_logs = true
end

def reset
@logs = []
@logs = [] if @flush_logs
end

def tty?
Expand Down
62 changes: 62 additions & 0 deletions test/test_plugin_classes.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
require_relative 'helper'
require 'fluent/plugin/input'
require 'fluent/plugin/output'
require 'fluent/plugin/bare_output'
require 'fluent/plugin/filter'

module FluentTest
Expand Down Expand Up @@ -48,6 +49,67 @@ def process(tag, es)
end
end

class FluentTestDynamicOutput < ::Fluent::Plugin::BareOutput
::Fluent::Plugin.register_output('test_dynamic_out', self)

attr_reader :child
attr_reader :started

def start
super
@started = true
@child = Fluent::Plugin.new_output('copy')
conf = config_element('DYNAMIC', '', {}, [
config_element('store', '', {'@type' => 'test_out', '@id' => 'dyn_out1'}),
config_element('store', '', {'@type' => 'test_out', '@id' => 'dyn_out2'}),
])
@child.configure(conf)
@child.start
end

def after_start
super
@child.after_start
end

def stop
super
@child.stop
end

def before_shutdown
super
@child.before_shutdown
end

def shutdown
@started = false
super
@child.shutdown
end

def after_shutdown
super
@child.after_shutdown
end

def close
super
@child.close
end

def terminate
super
@child.terminate
end

def process(tag, es)
es.each do |time, record|
@events[tag] << record
end
end
end

class FluentTestErrorOutput < ::Fluent::Plugin::Output
::Fluent::Plugin.register_output('test_out_error', self)

Expand Down
90 changes: 90 additions & 0 deletions test/test_root_agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -351,5 +351,95 @@ def configure_ra(conf_str)
assert_equal [true], label_filters.map{|i| i.terminated? }
assert_equal [true, true, true], label_outputs.map{|i| i.terminated? }
end

test 'plugin #shutdown is not called twice' do
assert_equal 1, @ra.inputs.size
assert_equal 0, @ra.filters.size
assert_equal 0, @ra.outputs.size
assert_equal 1, @ra.labels.size
assert_equal '@testing', @ra.labels.keys.first
assert_equal 1, @ra.labels.values.first.filters.size
assert_equal 3, @ra.labels.values.first.outputs.size

@ra.start

old_level = @ra.log.level
begin
@ra.log.instance_variable_get(:@logger).level = Fluent::Log::LEVEL_INFO - 1
assert_equal Fluent::Log::LEVEL_INFO, @ra.log.level

@ra.log.out.flush_logs = false

@ra.shutdown

test_out1_shutdown_logs = @ra.log.out.logs.select{|line| line =~ /shutting down output plugin type=:test_out plugin_id="test_out1"/ }
assert_equal 1, test_out1_shutdown_logs.size
ensure
@ra.log.out.flush_logs = true
@ra.log.out.reset
@ra.log.level = old_level
end
end
end

sub_test_case 'configured with MultiOutput plugin which creates plugin instances dynamically' do
setup do
@ra = RootAgent.new(log: $log)
stub(Engine).root_agent { @ra }
@ra.configure(Config.parse(<<-EOC, "(test)", "(test_dir)", true))
<source>
@type test_in
@id test_in
@label @testing
</source>
<label @testing>
<match **>
@type test_dynamic_out
@id test_dyn
</match>
</label>
EOC
@ra
end

test 'plugin status with multi output' do
assert_equal 1, @ra.inputs.size
assert_equal 0, @ra.filters.size
assert_equal 0, @ra.outputs.size
assert_equal 1, @ra.labels.size
assert_equal '@testing', @ra.labels.keys.first
assert_equal 0, @ra.labels.values.first.filters.size
assert_equal 1, @ra.labels.values.first.outputs.size

dyn_out = @ra.labels.values.first.outputs.first
assert_nil dyn_out.child

@ra.start

assert_equal 1, @ra.labels.values.first.outputs.size

assert dyn_out.child
assert_false dyn_out.child.outputs_statically_created
assert_equal 2, dyn_out.child.outputs.size

assert_equal true, dyn_out.child.outputs[0].started?
assert_equal true, dyn_out.child.outputs[1].started?
assert_equal true, dyn_out.child.outputs[0].after_started?
assert_equal true, dyn_out.child.outputs[1].after_started?

@ra.shutdown

assert_equal 1, @ra.labels.values.first.outputs.size

assert_false dyn_out.child.outputs_statically_created
assert_equal 2, dyn_out.child.outputs.size

assert_equal [true, true], dyn_out.child.outputs.map{|i| i.stopped? }
assert_equal [true, true], dyn_out.child.outputs.map{|i| i.before_shutdown? }
assert_equal [true, true], dyn_out.child.outputs.map{|i| i.shutdown? }
assert_equal [true, true], dyn_out.child.outputs.map{|i| i.after_shutdown? }
assert_equal [true, true], dyn_out.child.outputs.map{|i| i.closed? }
assert_equal [true, true], dyn_out.child.outputs.map{|i| i.terminated? }
end
end
end