Skip to content

Commit

Permalink
Add test for reload config
Browse files Browse the repository at this point in the history
Signed-off-by: Yuta Iwama <[email protected]>
  • Loading branch information
ganmacs committed Dec 18, 2019
1 parent c7b4f99 commit 4825306
Show file tree
Hide file tree
Showing 3 changed files with 219 additions and 15 deletions.
22 changes: 17 additions & 5 deletions lib/fluent/engine.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
require 'fluent/system_config'
require 'fluent/plugin'
require 'fluent/fluent_log_event_router'
require 'fluent/static_config_analysis'

module Fluent
class EngineClass
Expand Down Expand Up @@ -166,17 +167,28 @@ def run
def reload_config(conf, supervisor: false)
# configure first to reduce down time while restarting
new_agent = RootAgent.new(log: log, system_config: @system_config)
if (plugin = new_agent.find_unreloadable_plugin)
raise Fluent::ConfigError, "Unreloadable plugin: #{plugin.class}"
ret = Fluent::StaticConfigAnalysis.call(conf, workers: system_config.workers)

ret.all_plugins.each do |plugin|
if plugin.respond_to?(:reloadable_plugin?) && !plugin.reloadable_plugin?
raise Fluent::ConfigError, "Unreloadable plugin: #{plugin.class}"
end
end

# Assign @root_agent to new root_agent
# for https://github.com/fluent/fluentd/blob/fcef949ce40472547fde295ddd2cfe297e1eddd6/lib/fluent/plugin_helper/event_emitter.rb#L50
old_agent, @root_agent = @root_agent, new_agent
begin
@root_agent.configure(conf)
rescue
@root_agent = old_agent
raise
end

new_agent.configure(conf)
unless @suppress_config_dump
$log.info :supervisor, "using configuration file: #{conf.to_s.rstrip}"
end

old_agent, @root_agent = @root_agent, new_agent

# supervisor doesn't handle actual data. so the following code is unnecessary.
if supervisor
old_agent.shutdown # to close thread created in #configure
Expand Down
10 changes: 0 additions & 10 deletions lib/fluent/root_agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -63,16 +63,6 @@ def initialize(log:, system_config: SystemConfig.new)
attr_reader :inputs
attr_reader :labels

def find_unreloadable_plugin
lifecycle do |instance|
if instance.respond_to?(:reloadable_plugin?) && !instance.reloadable_plugin?
return instance
end
end

nil
end

def configure(conf)
used_worker_ids = []
available_worker_ids = (0..Fluent::Engine.system_config.workers - 1).to_a
Expand Down
202 changes: 202 additions & 0 deletions test/test_engine.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
require_relative 'helper'
require 'fluent/engine'
require 'fluent/config'
require 'fluent/input'
require 'fluent/system_config'

class EngineTest < ::Test::Unit::TestCase
class DummyEngineTestOutput < Fluent::Plugin::Output
Fluent::Plugin.register_output('dummy_engine_test', self)
def write(chunk); end
end

class DummyEngineTest2Output < Fluent::Plugin::Output
Fluent::Plugin.register_output('dummy_engine_test2', self)
def write(chunk); end
end

class DummyEngineTestInput < Fluent::Plugin::Input
Fluent::Plugin.register_input('dummy_engine_test', self)
def multi_workers_ready?; true; end
end

class DummyEngineTest2Input < Fluent::Plugin::Input
Fluent::Plugin.register_input('dummy_engine_test2', self)
def multi_workers_ready?; true; end
end

class DummyEngineClassVarTestInput < Fluent::Plugin::Input
Fluent::Plugin.register_input('dummy_engine_class_var_test', self)
@@test = nil
def multi_workers_ready?; true; end
end

sub_test_case '#reload_config' do
test 'reload new configuration' do
conf_data = <<-CONF
<source>
@type dummy_engine_test
</source>
<match>
@type dummy_engine_test
</match>
CONF

conf = Fluent::Config.parse(conf_data, '(test)', '(test_dir)', true)
system_config = Fluent::SystemConfig.create(conf)

engine = Fluent::EngineClass.new
engine.init(system_config)
engine.configure(conf)

assert_kind_of DummyEngineTestInput, engine.root_agent.inputs[0]
assert_kind_of DummyEngineTestOutput, engine.root_agent.outputs[0]

new_conf_data = <<-CONF
<source>
@type dummy_engine_test2
</source>
<match>
@type dummy_engine_test2
</match>
CONF

new_conf = Fluent::Config.parse(new_conf_data, '(test)', '(test_dir)', true)

agent = Fluent::RootAgent.new(log: $log, system_config: system_config)
stub(Fluent::RootAgent).new do
stub(agent).start.once
agent
end

engine.reload_config(new_conf)

assert_kind_of DummyEngineTest2Input, engine.root_agent.inputs[0]
assert_kind_of DummyEngineTest2Output, engine.root_agent.outputs[0]
end

test "doesn't start RootAgent when supervisor is true" do
conf_data = <<-CONF
<source>
@type dummy_engine_test
</source>
<match>
@type dummy_engine_test
</match>
CONF

conf = Fluent::Config.parse(conf_data, '(test)', '(test_dir)', true)
system_config = Fluent::SystemConfig.create(conf)

engine = Fluent::EngineClass.new
engine.init(system_config)
engine.configure(conf)

assert_kind_of DummyEngineTestInput, engine.root_agent.inputs[0]
assert_kind_of DummyEngineTestOutput, engine.root_agent.outputs[0]

new_conf_data = <<-CONF
<source>
@type dummy_engine_test2
</source>
<match>
@type dummy_engine_test2
</match>
CONF

new_conf = Fluent::Config.parse(new_conf_data, '(test)', '(test_dir)', true)

agent = Fluent::RootAgent.new(log: $log, system_config: system_config)
stub(Fluent::RootAgent).new do
stub(agent).start.never
agent
end

engine.reload_config(new_conf, supervisor: true)

assert_kind_of DummyEngineTest2Input, engine.root_agent.inputs[0]
assert_kind_of DummyEngineTest2Output, engine.root_agent.outputs[0]
end

test 'raise an error when conf is invalid' do
conf_data = <<-CONF
<source>
@type dummy_engine_test
</source>
<match>
@type dummy_engine_test
</match>
CONF

conf = Fluent::Config.parse(conf_data, '(test)', '(test_dir)', true)
system_config = Fluent::SystemConfig.create(conf)

engine = Fluent::EngineClass.new
engine.init(system_config)
engine.configure(conf)

assert_kind_of DummyEngineTestInput, engine.root_agent.inputs[0]
assert_kind_of DummyEngineTestOutput, engine.root_agent.outputs[0]

new_conf_data = <<-CONF
<source>
@type
</source>
CONF

new_conf = Fluent::Config.parse(new_conf_data, '(test)', '(test_dir)', true)

agent = Fluent::RootAgent.new(log: $log, system_config: system_config)
stub(Fluent::RootAgent).new do
stub(agent).start.never
agent
end

assert_raise(Fluent::ConfigError.new("Missing '@type' parameter on <source> directive")) do
engine.reload_config(new_conf)
end

assert_kind_of DummyEngineTestInput, engine.root_agent.inputs[0]
assert_kind_of DummyEngineTestOutput, engine.root_agent.outputs[0]
end

test 'raise an error when unreloadable exists' do
conf_data = <<-CONF
<source>
@type dummy_engine_test
</source>
<match>
@type dummy_engine_test
</match>
CONF

conf = Fluent::Config.parse(conf_data, '(test)', '(test_dir)', true)
system_config = Fluent::SystemConfig.create(conf)

engine = Fluent::EngineClass.new
engine.init(system_config)
engine.configure(conf)

assert_kind_of DummyEngineTestInput, engine.root_agent.inputs[0]
assert_kind_of DummyEngineTestOutput, engine.root_agent.outputs[0]

conf_data = <<-CONF
<source>
@type dummy_engine_class_var_test
</source>
<match>
@type dummy_engine_test
</match>
CONF

new_conf = Fluent::Config.parse(conf_data, '(test)', '(test_dir)', true)

assert_raise(Fluent::ConfigError.new('Unreloadable plugin: EngineTest::DummyEngineClassVarTestInput')) do
engine.reload_config(new_conf)
end

assert_kind_of DummyEngineTestInput, engine.root_agent.inputs[0]
assert_kind_of DummyEngineTestOutput, engine.root_agent.outputs[0]
end
end
end

0 comments on commit 4825306

Please sign in to comment.