diff --git a/lib/fluent/engine.rb b/lib/fluent/engine.rb index b69cdf36ad..d8edd5cdd3 100644 --- a/lib/fluent/engine.rb +++ b/lib/fluent/engine.rb @@ -23,6 +23,7 @@ require 'fluent/system_config' require 'fluent/plugin' require 'fluent/fluent_log_event_router' +require 'fluent/static_config_analysis' module Fluent class EngineClass @@ -166,17 +167,28 @@ def run def reload_config(conf, supervisor: false) # configure first to reduce down time while restarting new_agent = RootAgent.new(log: log, system_config: @system_config) - if (plugin = new_agent.find_unreloadable_plugin) - raise Fluent::ConfigError, "Unreloadable plugin: #{plugin.class}" + ret = Fluent::StaticConfigAnalysis.call(conf, workers: system_config.workers) + + ret.all_plugins.each do |plugin| + if plugin.respond_to?(:reloadable_plugin?) && !plugin.reloadable_plugin? + raise Fluent::ConfigError, "Unreloadable plugin: #{plugin.class}" + end + end + + # Assign @root_agent to new root_agent + # for https://github.com/fluent/fluentd/blob/fcef949ce40472547fde295ddd2cfe297e1eddd6/lib/fluent/plugin_helper/event_emitter.rb#L50 + old_agent, @root_agent = @root_agent, new_agent + begin + @root_agent.configure(conf) + rescue + @root_agent = old_agent + raise end - new_agent.configure(conf) unless @suppress_config_dump $log.info :supervisor, "using configuration file: #{conf.to_s.rstrip}" end - old_agent, @root_agent = @root_agent, new_agent - # supervisor doesn't handle actual data. so the following code is unnecessary. if supervisor old_agent.shutdown # to close thread created in #configure diff --git a/lib/fluent/root_agent.rb b/lib/fluent/root_agent.rb index 7a43c94bf2..0bc00ea862 100644 --- a/lib/fluent/root_agent.rb +++ b/lib/fluent/root_agent.rb @@ -63,16 +63,6 @@ def initialize(log:, system_config: SystemConfig.new) attr_reader :inputs attr_reader :labels - def find_unreloadable_plugin - lifecycle do |instance| - if instance.respond_to?(:reloadable_plugin?) && !instance.reloadable_plugin? - return instance - end - end - - nil - end - def configure(conf) used_worker_ids = [] available_worker_ids = (0..Fluent::Engine.system_config.workers - 1).to_a diff --git a/test/test_engine.rb b/test/test_engine.rb new file mode 100644 index 0000000000..648c821b07 --- /dev/null +++ b/test/test_engine.rb @@ -0,0 +1,202 @@ +require_relative 'helper' +require 'fluent/engine' +require 'fluent/config' +require 'fluent/input' +require 'fluent/system_config' + +class EngineTest < ::Test::Unit::TestCase + class DummyEngineTestOutput < Fluent::Plugin::Output + Fluent::Plugin.register_output('dummy_engine_test', self) + def write(chunk); end + end + + class DummyEngineTest2Output < Fluent::Plugin::Output + Fluent::Plugin.register_output('dummy_engine_test2', self) + def write(chunk); end + end + + class DummyEngineTestInput < Fluent::Plugin::Input + Fluent::Plugin.register_input('dummy_engine_test', self) + def multi_workers_ready?; true; end + end + + class DummyEngineTest2Input < Fluent::Plugin::Input + Fluent::Plugin.register_input('dummy_engine_test2', self) + def multi_workers_ready?; true; end + end + + class DummyEngineClassVarTestInput < Fluent::Plugin::Input + Fluent::Plugin.register_input('dummy_engine_class_var_test', self) + @@test = nil + def multi_workers_ready?; true; end + end + + sub_test_case '#reload_config' do + test 'reload new configuration' do + conf_data = <<-CONF + + @type dummy_engine_test + + + @type dummy_engine_test + + CONF + + conf = Fluent::Config.parse(conf_data, '(test)', '(test_dir)', true) + system_config = Fluent::SystemConfig.create(conf) + + engine = Fluent::EngineClass.new + engine.init(system_config) + engine.configure(conf) + + assert_kind_of DummyEngineTestInput, engine.root_agent.inputs[0] + assert_kind_of DummyEngineTestOutput, engine.root_agent.outputs[0] + + new_conf_data = <<-CONF + + @type dummy_engine_test2 + + + @type dummy_engine_test2 + + CONF + + new_conf = Fluent::Config.parse(new_conf_data, '(test)', '(test_dir)', true) + + agent = Fluent::RootAgent.new(log: $log, system_config: system_config) + stub(Fluent::RootAgent).new do + stub(agent).start.once + agent + end + + engine.reload_config(new_conf) + + assert_kind_of DummyEngineTest2Input, engine.root_agent.inputs[0] + assert_kind_of DummyEngineTest2Output, engine.root_agent.outputs[0] + end + + test "doesn't start RootAgent when supervisor is true" do + conf_data = <<-CONF + + @type dummy_engine_test + + + @type dummy_engine_test + + CONF + + conf = Fluent::Config.parse(conf_data, '(test)', '(test_dir)', true) + system_config = Fluent::SystemConfig.create(conf) + + engine = Fluent::EngineClass.new + engine.init(system_config) + engine.configure(conf) + + assert_kind_of DummyEngineTestInput, engine.root_agent.inputs[0] + assert_kind_of DummyEngineTestOutput, engine.root_agent.outputs[0] + + new_conf_data = <<-CONF + + @type dummy_engine_test2 + + + @type dummy_engine_test2 + + CONF + + new_conf = Fluent::Config.parse(new_conf_data, '(test)', '(test_dir)', true) + + agent = Fluent::RootAgent.new(log: $log, system_config: system_config) + stub(Fluent::RootAgent).new do + stub(agent).start.never + agent + end + + engine.reload_config(new_conf, supervisor: true) + + assert_kind_of DummyEngineTest2Input, engine.root_agent.inputs[0] + assert_kind_of DummyEngineTest2Output, engine.root_agent.outputs[0] + end + + test 'raise an error when conf is invalid' do + conf_data = <<-CONF + + @type dummy_engine_test + + + @type dummy_engine_test + + CONF + + conf = Fluent::Config.parse(conf_data, '(test)', '(test_dir)', true) + system_config = Fluent::SystemConfig.create(conf) + + engine = Fluent::EngineClass.new + engine.init(system_config) + engine.configure(conf) + + assert_kind_of DummyEngineTestInput, engine.root_agent.inputs[0] + assert_kind_of DummyEngineTestOutput, engine.root_agent.outputs[0] + + new_conf_data = <<-CONF + + @type + + CONF + + new_conf = Fluent::Config.parse(new_conf_data, '(test)', '(test_dir)', true) + + agent = Fluent::RootAgent.new(log: $log, system_config: system_config) + stub(Fluent::RootAgent).new do + stub(agent).start.never + agent + end + + assert_raise(Fluent::ConfigError.new("Missing '@type' parameter on directive")) do + engine.reload_config(new_conf) + end + + assert_kind_of DummyEngineTestInput, engine.root_agent.inputs[0] + assert_kind_of DummyEngineTestOutput, engine.root_agent.outputs[0] + end + + test 'raise an error when unreloadable exists' do + conf_data = <<-CONF + + @type dummy_engine_test + + + @type dummy_engine_test + + CONF + + conf = Fluent::Config.parse(conf_data, '(test)', '(test_dir)', true) + system_config = Fluent::SystemConfig.create(conf) + + engine = Fluent::EngineClass.new + engine.init(system_config) + engine.configure(conf) + + assert_kind_of DummyEngineTestInput, engine.root_agent.inputs[0] + assert_kind_of DummyEngineTestOutput, engine.root_agent.outputs[0] + + conf_data = <<-CONF + + @type dummy_engine_class_var_test + + + @type dummy_engine_test + + CONF + + new_conf = Fluent::Config.parse(conf_data, '(test)', '(test_dir)', true) + + assert_raise(Fluent::ConfigError.new('Unreloadable plugin: EngineTest::DummyEngineClassVarTestInput')) do + engine.reload_config(new_conf) + end + + assert_kind_of DummyEngineTestInput, engine.root_agent.inputs[0] + assert_kind_of DummyEngineTestOutput, engine.root_agent.outputs[0] + end + end +end