From 7e1a8119b9e21bc21ccb7c0b9e311f8de1a6d39c Mon Sep 17 00:00:00 2001 From: Yuta Iwama Date: Mon, 2 Dec 2019 14:22:37 -0800 Subject: [PATCH 01/17] Move method building config to Fluent::Config's class method Signed-off-by: Yuta Iwama --- lib/fluent/config.rb | 19 +++++++++++++++++++ lib/fluent/supervisor.rb | 13 +------------ 2 files changed, 20 insertions(+), 12 deletions(-) diff --git a/lib/fluent/config.rb b/lib/fluent/config.rb index 4c321ff9bd..b2411853e1 100644 --- a/lib/fluent/config.rb +++ b/lib/fluent/config.rb @@ -20,6 +20,25 @@ module Fluent module Config + # @param config_path [String] config file path + # @param encoding [String] encoding of config file + # @param additional_config [String] config which is added to last of config body + # @param use_v1_config [Bool] config is formatted with v1 or not + # @return [Fluent::Config] + def self.build(config_path:, encoding: 'utf-8', additional_config: nil, use_v1_config: true) + config_fname = File.basename(config_path) + config_basedir = File.dirname(config_path) + config_data = File.open(config_path, "r:#{encoding}:utf-8") do |f| + s = f.read + if additional_config + c = additional_config.gsub("\\n", "\n") + s += "\n#{c}" + end + s + end + Fluent::Config.parse(config_data, config_fname, config_basedir, use_v1_config) + end + def self.parse(str, fname, basepath = Dir.pwd, v1_config = nil, syntax: :v1) parser = if fname =~ /\.rb$/ || syntax == :ruby :ruby diff --git a/lib/fluent/supervisor.rb b/lib/fluent/supervisor.rb index 1c34256a9e..07af24c130 100644 --- a/lib/fluent/supervisor.rb +++ b/lib/fluent/supervisor.rb @@ -557,8 +557,7 @@ def configure(supervisor: false) if @inline_config == '-' @inline_config = STDIN.read end - - @conf = read_config + @conf = Fluent::Config.build(config_path: @config_path, encoding: @conf_encoding, additional_config: @inline_config, use_v1_config: @use_v1_config) @system_config = build_system_config(@conf) @log.level = @system_config.log_level @@ -769,16 +768,6 @@ def main_process(&block) exit!(unrecoverable_error ? 2 : 1) end - def read_config - config_fname = File.basename(@config_path) - config_basedir = File.dirname(@config_path) - config_data = File.open(@config_path, "r:#{@conf_encoding}:utf-8") {|f| f.read } - if @inline_config - config_data << "\n" << @inline_config.gsub("\\n", "\n") - end - Fluent::Config.parse(config_data, config_fname, config_basedir, @use_v1_config) - end - def build_system_config(conf) system_config = SystemConfig.create(conf, @cl_opt[:strict_config_value]) opt = {} From b70bd2b009d09522a7190ef4a72cd848d84d6cce Mon Sep 17 00:00:00 2001 From: Yuta Iwama Date: Tue, 3 Dec 2019 14:41:03 -0800 Subject: [PATCH 02/17] Assign ivar to make fluentd_conf changable Signed-off-by: Yuta Iwama --- lib/fluent/supervisor.rb | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/lib/fluent/supervisor.rb b/lib/fluent/supervisor.rb index 07af24c130..9e7d6b29b4 100644 --- a/lib/fluent/supervisor.rb +++ b/lib/fluent/supervisor.rb @@ -42,6 +42,7 @@ module Fluent module ServerModule def before_run + @fluentd_conf = config[:fluentd_conf] @rpc_server = nil @counter = nil @@ -210,11 +211,11 @@ def kill_worker end def supervisor_dump_config_handler - $log.info config[:fluentd_conf] + $log.info @fluentd_conf end def supervisor_get_dump_config_handler - {conf: config[:fluentd_conf]} + { conf: @fluentd_conf } end end From b2d49a859a52aa9cf01fca3252336b4694ba4c6b Mon Sep 17 00:00:00 2001 From: Yuta Iwama Date: Mon, 9 Dec 2019 15:58:27 +0900 Subject: [PATCH 03/17] Add new class variable store to control all state Signed-off-by: Yuta Iwama --- lib/fluent/plugin/buf_file.rb | 14 +++--- lib/fluent/plugin/buf_file_single.rb | 14 +++--- lib/fluent/plugin/in_tail.rb | 12 ++--- lib/fluent/plugin_id.rb | 11 +++-- lib/fluent/variable_store.rb | 42 ++++++++++++++++++ test/test_variable_store.rb | 65 ++++++++++++++++++++++++++++ 6 files changed, 136 insertions(+), 22 deletions(-) create mode 100644 lib/fluent/variable_store.rb create mode 100644 test/test_variable_store.rb diff --git a/lib/fluent/plugin/buf_file.rb b/lib/fluent/plugin/buf_file.rb index 459b183ce2..beab1913c5 100644 --- a/lib/fluent/plugin/buf_file.rb +++ b/lib/fluent/plugin/buf_file.rb @@ -19,6 +19,7 @@ require 'fluent/plugin/buffer' require 'fluent/plugin/buffer/file_chunk' require 'fluent/system_config' +require 'fluent/variable_store' module Fluent module Plugin @@ -43,19 +44,20 @@ class FileBuffer < Fluent::Plugin::Buffer config_param :file_permission, :string, default: nil # '0644' config_param :dir_permission, :string, default: nil # '0755' - @@buffer_paths = {} - def initialize super @symlink_path = nil @multi_workers_available = false @additional_resume_path = nil @buffer_path = nil + @variable_store = nil end def configure(conf) super + @variable_store = Fluent::VariableStore.fetch_or_build(:buf_file_single) + multi_workers_configured = owner.system_config.workers > 1 ? true : false using_plugin_root_dir = false @@ -69,13 +71,13 @@ def configure(conf) end type_of_owner = Plugin.lookup_type_from_class(@_owner.class) - if @@buffer_paths.has_key?(@path) && !called_in_test? - type_using_this_path = @@buffer_paths[@path] + if @variable_store.has_key?(@path) && !called_in_test? + type_using_this_path = @variable_store[@path] raise ConfigError, "Other '#{type_using_this_path}' plugin already use same buffer path: type = #{type_of_owner}, buffer path = #{@path}" end @buffer_path = @path - @@buffer_paths[@buffer_path] = type_of_owner + @variable_store[@buffer_path] = type_of_owner specified_directory_exists = File.exist?(@path) && File.directory?(@path) unexisting_path_for_directory = !File.exist?(@path) && !@path.include?('.*') @@ -125,7 +127,7 @@ def start end def stop - @@buffer_paths.delete(@buffer_path) + @variable_store.delete(@buffer_path) super end diff --git a/lib/fluent/plugin/buf_file_single.rb b/lib/fluent/plugin/buf_file_single.rb index 7116c951d8..fdbcb12175 100644 --- a/lib/fluent/plugin/buf_file_single.rb +++ b/lib/fluent/plugin/buf_file_single.rb @@ -19,6 +19,7 @@ require 'fluent/plugin/buffer' require 'fluent/plugin/buffer/file_single_chunk' require 'fluent/system_config' +require 'fluent/variable_store' module Fluent module Plugin @@ -48,18 +49,19 @@ class FileSingleBuffer < Fluent::Plugin::Buffer desc 'The permission of chunk directory. If no specified, setting or 0755 is used' config_param :dir_permission, :string, default: nil - @@buffer_paths = {} - def initialize super @multi_workers_available = false @additional_resume_path = nil + @variable_store = nil end def configure(conf) super + @variable_store = Fluent::VariableStore.fetch_or_build(:buf_file_single) + if @chunk_format == :auto @chunk_format = owner.formatted_to_msgpack_binary? ? :msgpack : :text end @@ -117,12 +119,12 @@ def configure(conf) end type_of_owner = Plugin.lookup_type_from_class(@_owner.class) - if @@buffer_paths.has_key?(@path) && !called_in_test? - type_using_this_path = @@buffer_paths[@path] + if @variable_store.has_key?(@path) && !called_in_test? + type_using_this_path = @variable_store[@path] raise Fluent::ConfigError, "Other '#{type_using_this_path}' plugin already uses same buffer path: type = #{type_of_owner}, buffer path = #{@path}" end - @@buffer_paths[@path] = type_of_owner + @variable_store[@path] = type_of_owner @dir_permission = if @dir_permission @dir_permission.to_i(8) else @@ -145,7 +147,7 @@ def start end def stop - @@buffer_paths.delete(@path) + @variable_store.delete(@path) super end diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index 6a226a544d..7a9de0fe4c 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -21,6 +21,7 @@ require 'fluent/event' require 'fluent/plugin/buffer' require 'fluent/plugin/parser_multiline' +require 'fluent/variable_store' if Fluent.windows? require_relative 'file_wrapper' @@ -103,9 +104,8 @@ def initialize attr_reader :paths - @@pos_file_paths = {} - def configure(conf) + @variable_store = Fluent::VariableStore.fetch_or_build(:in_tail) compat_parameters_convert(conf, :parser) parser_config = conf.elements('parse').first unless parser_config @@ -132,11 +132,11 @@ def configure(conf) # TODO: Use plugin_root_dir and storage plugin to store positions if available if @pos_file - if @@pos_file_paths.has_key?(@pos_file) && !called_in_test? - plugin_id_using_this_path = @@pos_file_paths[@pos_file] + if @variable_store.key?(@pos_file) && !called_in_test? + plugin_id_using_this_path = @variable_store[@pos_file] raise Fluent::ConfigError, "Other 'in_tail' plugin already use same pos_file path: plugin_id = #{plugin_id_using_this_path}, pos_file path = #{@pos_file}" end - @@pos_file_paths[@pos_file] = self.plugin_id + @variable_store[@pos_file] = self.plugin_id else $log.warn "'pos_file PATH' parameter is not set to a 'tail' source." $log.warn "this parameter is highly recommended to save the position to resume tailing." @@ -205,7 +205,7 @@ def start end def stop - @@pos_file_paths.delete(@pos_file) + @variable_store.delete(@pos_file) super end diff --git a/lib/fluent/plugin_id.rb b/lib/fluent/plugin_id.rb index 0441387ad5..cbe4707b98 100644 --- a/lib/fluent/plugin_id.rb +++ b/lib/fluent/plugin_id.rb @@ -15,26 +15,29 @@ # require 'set' +require 'fluent/variable_store' module Fluent module PluginId - @@configured_ids = Set.new def initialize super + + @_plugin_id_variable_store = nil @_plugin_root_dir = nil @id = nil end def configure(conf) + @_plugin_id_variable_store = Fluent::VariableStore.fetch_or_build(:pluing_id, default_value: Set.new) @id = conf['@id'] @_id_configured = !!@id # plugin id is explicitly configured by users (or not) if @id @id = @id.to_s - if @@configured_ids.include?(@id) && !plugin_id_for_test? + if @_plugin_id_variable_store.include?(@id) && !plugin_id_for_test? raise Fluent::ConfigError, "Duplicated plugin id `#{@id}`. Check whole configuration and fix it." end - @@configured_ids.add(@id) + @_plugin_id_variable_store.add(@id) end super @@ -79,7 +82,7 @@ def plugin_root_dir end def stop - @@configured_ids.delete(@id) + @_plugin_id_variable_store.delete(@id) super end diff --git a/lib/fluent/variable_store.rb b/lib/fluent/variable_store.rb new file mode 100644 index 0000000000..e05b7bb2dc --- /dev/null +++ b/lib/fluent/variable_store.rb @@ -0,0 +1,42 @@ +# +# Fluentd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +require 'forwardable' + +module Fluent + # VariableStore provides all pluigns with the way to shared variable without using class variable + # it's for safe reloading mechanism + class VariableStore + @data = {} + + class << self + def fetch_or_build(namespace, default_value: {}) + @data[namespace] ||= default_value + end + + def try_to_reset + @data, old = {}, @data + + begin + yield + rescue + @data = old + raise + end + end + end + end +end diff --git a/test/test_variable_store.rb b/test/test_variable_store.rb new file mode 100644 index 0000000000..08dcfacde6 --- /dev/null +++ b/test/test_variable_store.rb @@ -0,0 +1,65 @@ +require_relative 'helper' +require 'fluent/variable_store' + +class VariableStoreTest < Test::Unit::TestCase + def setup + end + + def teardown + Fluent::VariableStore.try_to_reset do + # nothing + end + end + + sub_test_case '#fetch_or_build' do + test 'fetch same object when the same key is passed' do + c1 = Fluent::VariableStore.fetch_or_build(:test) + c2 = Fluent::VariableStore.fetch_or_build(:test) + + assert_equal c1, c2 + assert_equal c1.object_id, c2.object_id + + c3 = Fluent::VariableStore.fetch_or_build(:test2) + assert_not_equal c1.object_id, c3.object_id + end + + test 'can be passed a default value' do + c1 = Fluent::VariableStore.fetch_or_build(:test, default_value: Set.new) + c2 = Fluent::VariableStore.fetch_or_build(:test) + + assert_kind_of Set, c1 + assert_equal c1, c2 + assert_equal c1.object_id, c2.object_id + end + end + + sub_test_case '#try_to_reset' do + test 'reset all values' do + c1 = Fluent::VariableStore.fetch_or_build(:test) + c1[:k1] = 1 + assert_equal 1, c1[:k1] + + Fluent::VariableStore.try_to_reset do + # nothing + end + + c1 = Fluent::VariableStore.fetch_or_build(:test) + assert_nil c1[:k1] + end + + test 'rollback resetting if error raised' do + c1 = Fluent::VariableStore.fetch_or_build(:test) + c1[:k1] = 1 + assert_equal 1, c1[:k1] + + assert_raise(RuntimeError.new('pass')) do + Fluent::VariableStore.try_to_reset do + raise 'pass' + end + end + + c1 = Fluent::VariableStore.fetch_or_build(:test) + assert_equal 1, c1[:k1] + end + end +end From c0bbf6673a2c0690f1a1fc1188761e1dda462054 Mon Sep 17 00:00:00 2001 From: Yuta Iwama Date: Mon, 9 Dec 2019 17:45:07 +0900 Subject: [PATCH 04/17] light reload Signed-off-by: Yuta Iwama --- lib/fluent/engine.rb | 44 +++++++++++++++++ lib/fluent/plugin/base.rb | 5 ++ lib/fluent/root_agent.rb | 10 ++++ lib/fluent/supervisor.rb | 100 +++++++++++++++++++++++++++++++++----- 4 files changed, 147 insertions(+), 12 deletions(-) diff --git a/lib/fluent/engine.rb b/lib/fluent/engine.rb index b03ee0fb9d..9275d9819d 100644 --- a/lib/fluent/engine.rb +++ b/lib/fluent/engine.rb @@ -167,6 +167,50 @@ def run @fluent_log_event_router.stop end + # @param conf [Fluent::Config] + # @param supervisor [Bool] + # @reutrn nil + def reload_config(conf, supervisor: false) + # configure first to reduce down time while restarting + new_agent = RootAgent.new(log: log, system_config: @system_config) + if (plugin = new_agent.find_unreloadable_plugin) + raise Fluent::ConfigError, "Unreloadable plugin: #{plugin.class}" + end + + new_agent.configure(conf) + unless @suppress_config_dump + $log.info :supervisor, "using configuration file: #{conf.to_s.rstrip}" + end + + old_agent, @root_agent = @root_agent, new_agent + + # supervisor doesn't handle actual data. so the following code is unnecessary. + if supervisor + old_agent.shutdown # to close thread created in #configure + return + end + + # Stop phaze + unless @log_event_verbose + $log.enable_event(false) + @fluent_log_event_router.graceful_stop + end + $log.info 'shutting down fluentd worker', worker: worker_id + old_agent.shutdown # Stop first but we can still accept sockets, thanks to serverengine SocketManager, + + @fluent_log_event_router.stop + + # Restart phaze + new_fleunt_log_event_router = FluentLogEventRouter.build(new_agent) + if new_fleunt_log_event_router.emittable? + $log.enable_event(true) + end + @fluent_log_event_router = new_fleunt_log_event_router + + $log.info 'restart fluentd worker', worker: worker_id + @root_agent.start + end + def stop @engine_stopped = true nil diff --git a/lib/fluent/plugin/base.rb b/lib/fluent/plugin/base.rb index e8e444f33d..64ec5d0967 100644 --- a/lib/fluent/plugin/base.rb +++ b/lib/fluent/plugin/base.rb @@ -187,6 +187,11 @@ def inspect # https://github.com/ruby/ruby/blob/trunk/gc.c#L788 "#<%s:%014x>" % [self.class.name, '0x%014x' % (__id__ << 1)] end + + def reloadable_plugin? + # Engine can't capture all class variables. so it's forbbiden to use class variables in each plugins if enabling reload. + self.class.class_variables.empty? + end end end end diff --git a/lib/fluent/root_agent.rb b/lib/fluent/root_agent.rb index 0bc00ea862..7a43c94bf2 100644 --- a/lib/fluent/root_agent.rb +++ b/lib/fluent/root_agent.rb @@ -63,6 +63,16 @@ def initialize(log:, system_config: SystemConfig.new) attr_reader :inputs attr_reader :labels + def find_unreloadable_plugin + lifecycle do |instance| + if instance.respond_to?(:reloadable_plugin?) && !instance.reloadable_plugin? + return instance + end + end + + nil + end + def configure(conf) used_worker_ids = [] available_worker_ids = (0..Fluent::Engine.system_config.workers - 1).to_a diff --git a/lib/fluent/supervisor.rb b/lib/fluent/supervisor.rb index 9e7d6b29b4..488f0948cf 100644 --- a/lib/fluent/supervisor.rb +++ b/lib/fluent/supervisor.rb @@ -26,6 +26,7 @@ require 'fluent/rpc' require 'fluent/system_config' require 'fluent/msgpack_factory' +require 'fluent/variable_store' require 'serverengine' if Fluent.windows? @@ -157,6 +158,11 @@ def install_supervisor_signal_handlers $log.debug "fluentd supervisor process get SIGUSR1" supervisor_sigusr1_handler end unless Fluent.windows? + + trap :USR2 do + $log.debug 'fluentd supervisor process got SIGUSR2' + supervisor_sigusr2_handler + end unless Fluent.windows? end def install_windows_event_handler @@ -180,20 +186,36 @@ def supervisor_sighup_handler end def supervisor_sigusr1_handler - if log = config[:logger_initializer] - # Creating new thread due to mutex can't lock - # in main thread during trap context - Thread.new do - log.reopen! - end - end + reopen_log + send_signal_to_workers(:USR1) + end - if config[:worker_pid] - config[:worker_pid].each_value do |pid| - Process.kill(:USR1, pid) - # don't rescue Errno::ESRCH here (invalid status) + def supervisor_sigusr2_handler + conf = nil + Thread.new do + $log.info 'Reloading new config' + + begin + # Validate that loading config is valid at first + conf = Fluent::Config.build( + config_path: config[:config_path], + encoding: config[:conf_encoding], + additional_config: config[:inline_config], + use_v1_config: config[:use_v1_config], + ) + + Fluent::VariableStore.try_to_reset do + Fluent::Engine.reload_config(conf) + end + rescue => e + $log.error "Failed to reload config file: #{e}" + next end - end + end.join + + reopen_log + send_signal_to_workers(:USR2) + @fluentd_conf = conf.to_s end def kill_worker @@ -217,6 +239,27 @@ def supervisor_dump_config_handler def supervisor_get_dump_config_handler { conf: @fluentd_conf } end + + private + + def reopen_log + if (log = config[:logger_initializer]) + # Creating new thread due to mutex can't lock + # in main thread during trap context + Thread.new do + log.reopen! + end + end + end + + def send_signal_to_workers(signal) + return unless config[:worker_pid] + + config[:worker_pid].each_value do |pid| + # don't rescue Errno::ESRCH here (invalid status) + Process.kill(signal, pid) + end + end end module WorkerModule @@ -303,6 +346,9 @@ def self.load_config(path, params = {}) JSON.dump(params)], command_sender: command_sender, fluentd_conf: params['fluentd_conf'], + conf_encoding: params['conf_encoding'], + inline_config: params['inline_config'], + config_path: path, main_cmd: params['main_cmd'], signame: params['signame'], } @@ -673,6 +719,10 @@ def install_main_process_signal_handlers flush_buffer end unless Fluent.windows? + trap :USR2 do + reload_config + end unless Fluent.windows? + if Fluent.windows? command_pipe = STDIN.dup STDIN.reopen(File::NULL, "rb") @@ -713,6 +763,32 @@ def flush_buffer end end + def reload_config + Thread.new do + $log.debug('worker got SIGUSR2') + + begin + conf = Fluent::Config.build( + config_path: @config_path, + encoding: @conf_encoding, + additional_config: @inline_config, + use_v1_config: @use_v1_config, + ) + + Fluent::VariableStore.try_to_reset do + Fluent::Engine.reload_config(conf, supervisor: true) + end + rescue => e + # it is guranteed that config file is valid by supervisor side. but it's not atomic becuase of using signals to commnicate between worker and super + # So need this rescue code + $log.error("failed to reload config: #{e}") + next + end + + @conf = conf + end + end + def logging_with_console_output yield $log unless @log.stdout? From 2fe2e9dbb33883dd5bbf80f3306d74254bbccf67 Mon Sep 17 00:00:00 2001 From: Yuta Iwama Date: Tue, 10 Dec 2019 17:43:00 +0900 Subject: [PATCH 05/17] remove needless require Signed-off-by: Yuta Iwama --- lib/fluent/variable_store.rb | 2 -- 1 file changed, 2 deletions(-) diff --git a/lib/fluent/variable_store.rb b/lib/fluent/variable_store.rb index e05b7bb2dc..ec6a6dadff 100644 --- a/lib/fluent/variable_store.rb +++ b/lib/fluent/variable_store.rb @@ -14,8 +14,6 @@ # limitations under the License. # -require 'forwardable' - module Fluent # VariableStore provides all pluigns with the way to shared variable without using class variable # it's for safe reloading mechanism From e6a38ed8a982d22a09e918fc63b8e958f4246861 Mon Sep 17 00:00:00 2001 From: Yuta Iwama Date: Tue, 10 Dec 2019 17:59:07 +0900 Subject: [PATCH 06/17] block nil Signed-off-by: Yuta Iwama --- lib/fluent/plugin/buf_file.rb | 4 +++- lib/fluent/plugin/buf_file_single.rb | 4 +++- lib/fluent/plugin/in_tail.rb | 4 +++- lib/fluent/plugin_id.rb | 4 +++- 4 files changed, 12 insertions(+), 4 deletions(-) diff --git a/lib/fluent/plugin/buf_file.rb b/lib/fluent/plugin/buf_file.rb index beab1913c5..61bb5150ea 100644 --- a/lib/fluent/plugin/buf_file.rb +++ b/lib/fluent/plugin/buf_file.rb @@ -127,7 +127,9 @@ def start end def stop - @variable_store.delete(@buffer_path) + if @variable_store + @variable_store.delete(@buffer_path) + end super end diff --git a/lib/fluent/plugin/buf_file_single.rb b/lib/fluent/plugin/buf_file_single.rb index fdbcb12175..e15d74a90a 100644 --- a/lib/fluent/plugin/buf_file_single.rb +++ b/lib/fluent/plugin/buf_file_single.rb @@ -147,7 +147,9 @@ def start end def stop - @variable_store.delete(@path) + if @variable_store + @variable_store.delete(@path) + end super end diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index 7a9de0fe4c..f8bb655700 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -205,7 +205,9 @@ def start end def stop - @variable_store.delete(@pos_file) + if @variable_store + @variable_store.delete(@pos_file) + end super end diff --git a/lib/fluent/plugin_id.rb b/lib/fluent/plugin_id.rb index cbe4707b98..1a4fe4065d 100644 --- a/lib/fluent/plugin_id.rb +++ b/lib/fluent/plugin_id.rb @@ -82,7 +82,9 @@ def plugin_root_dir end def stop - @_plugin_id_variable_store.delete(@id) + if @_plugin_id_variable_store + @_plugin_id_variable_store.delete(@id) + end super end From ffbf1c58a049087a8de2b273cfaaae76010f93c5 Mon Sep 17 00:00:00 2001 From: Yuta Iwama Date: Wed, 11 Dec 2019 16:08:53 +0900 Subject: [PATCH 07/17] Move reading config test Signed-off-by: Yuta Iwama --- test/test_config.rb | 32 ++++++++++++++--- test/test_supervisor.rb | 77 +---------------------------------------- 2 files changed, 28 insertions(+), 81 deletions(-) diff --git a/test/test_config.rb b/test/test_config.rb index 2382f7d8bc..f6099ba426 100644 --- a/test/test_config.rb +++ b/test/test_config.rb @@ -151,16 +151,16 @@ def test_check_not_fetchd assert_equal before_size, match_conf.unused.size end - def write_config(path, data) + def write_config(path, data, encoding: 'utf-8') FileUtils.mkdir_p(File.dirname(path)) - File.open(path, "w") {|f| f.write data } + File.open(path, "w:#{encoding}:utf-8") {|f| f.write data } end def test_inline prepare_config opts = { - :config_path => "#{TMP_DIR}/config_test_1.conf", - :inline_config => "\n type http\n port 2222\n " + :config_path => "#{TMP_DIR}/config_test_1.conf", + :inline_config => "\n type http\n port 2222\n " } assert_nothing_raised do Fluent::Supervisor.new(opts) @@ -175,5 +175,27 @@ def create_warn_dummy_logger logger = ServerEngine::DaemonLogger.new(logdev, dl_opts) $log = Fluent::Log.new(logger) end -end + sub_test_case '.build'do + test 'read config' do + write_config("#{TMP_DIR}/build/config_build.conf", 'key value') + c = Fluent::Config.build(config_path: "#{TMP_DIR}/build/config_build.conf") + assert_equal('value', c['key']) + end + + test 'read config with encoding' do + write_config("#{TMP_DIR}/build/config_build2.conf", "#てすと\nkey value", encoding: 'shift_jis') + + c = Fluent::Config.build(config_path: "#{TMP_DIR}/build/config_build2.conf", encoding: 'shift_jis') + assert_equal('value', c['key']) + end + + test 'read config with additional_config' do + write_config("#{TMP_DIR}/build/config_build2.conf", "key value") + + c = Fluent::Config.build(config_path: "#{TMP_DIR}/build/config_build2.conf", additional_config: 'key2 value2') + assert_equal('value', c['key']) + assert_equal('value2', c['key2']) + end + end +end diff --git a/test/test_supervisor.rb b/test/test_supervisor.rb index 7808cca6b4..12f5776970 100644 --- a/test/test_supervisor.rb +++ b/test/test_supervisor.rb @@ -30,81 +30,6 @@ def write_config(path, data) File.open(path, "w") {|f| f.write data } end - def test_read_config - create_info_dummy_logger - - tmp_dir = "#{TMP_DIR}/dir/test_read_config.conf" - conf_str = %[ - - @type forward - @id forward_input - - - @type stdout - @id stdout_output - -] - write_config tmp_dir, conf_str - opts = Fluent::Supervisor.default_options - sv = Fluent::Supervisor.new(opts) - - use_v1_config = {} - use_v1_config['use_v1_config'] = true - - sv.instance_variable_set(:@config_path, tmp_dir) - sv.instance_variable_set(:@use_v1_config, use_v1_config) - - conf = sv.__send__(:read_config) - elem = conf.elements.find { |e| e.name == 'source' } - assert_equal "forward", elem['@type'] - assert_equal "forward_input", elem['@id'] - - elem = conf.elements.find { |e| e.name == 'match' } - assert_equal "debug.**", elem.arg - assert_equal "stdout", elem['@type'] - assert_equal "stdout_output", elem['@id'] - - $log.out.reset - end - - def test_read_config_with_multibyte_string - tmp_path = "#{TMP_DIR}/dir/test_multibyte_config.conf" - conf_str = %[ - - @type forward - @id forward_input - @label @INPUT - - -] - FileUtils.mkdir_p(File.dirname(tmp_path)) - File.open(tmp_path, "w:utf-8") {|file| file.write(conf_str) } - - opts = Fluent::Supervisor.default_options - sv = Fluent::Supervisor.new(opts) - - use_v1_config = {} - use_v1_config['use_v1_config'] = true - - sv.instance_variable_set(:@config_path, tmp_path) - sv.instance_variable_set(:@use_v1_config, use_v1_config) - - conf = sv.__send__(:read_config) - label = conf.elements.detect {|e| e.name == "label" } - filter = label.elements.detect {|e| e.name == "filter" } - record_transformer = filter.elements.detect {|e| e.name = "record_transformer" } - assert_equal(Encoding::UTF_8, record_transformer["message"].encoding) - end def test_system_config opts = Fluent::Supervisor.default_options @@ -416,7 +341,7 @@ def test_inline_config inline_config = '\n@type stdout\n' stub(STDIN).read { inline_config } - stub(sv).read_config # to skip + stub(Fluent::Config).build # to skip stub(sv).build_system_config { Fluent::SystemConfig.new } # to skip sv.configure From f1c23e02e085c5427209faa8f043739df7be380c Mon Sep 17 00:00:00 2001 From: Yuta Iwama Date: Wed, 18 Dec 2019 15:52:01 +0900 Subject: [PATCH 08/17] Merge duplicated logic Signed-off-by: Yuta Iwama --- lib/fluent/engine.rb | 37 +++++++++++++++---------------------- 1 file changed, 15 insertions(+), 22 deletions(-) diff --git a/lib/fluent/engine.rb b/lib/fluent/engine.rb index 9275d9819d..b69cdf36ad 100644 --- a/lib/fluent/engine.rb +++ b/lib/fluent/engine.rb @@ -157,14 +157,7 @@ def run raise end - unless @log_event_verbose - $log.enable_event(false) - @fluent_log_event_router.graceful_stop - end - $log.info "shutting down fluentd worker", worker: worker_id - shutdown - - @fluent_log_event_router.stop + stop_phase(@root_agent) end # @param conf [Fluent::Config] @@ -190,17 +183,9 @@ def reload_config(conf, supervisor: false) return end - # Stop phaze - unless @log_event_verbose - $log.enable_event(false) - @fluent_log_event_router.graceful_stop - end - $log.info 'shutting down fluentd worker', worker: worker_id - old_agent.shutdown # Stop first but we can still accept sockets, thanks to serverengine SocketManager, - - @fluent_log_event_router.stop + stop_phase(old_agent) - # Restart phaze + # Restart phase new_fleunt_log_event_router = FluentLogEventRouter.build(new_agent) if new_fleunt_log_event_router.emittable? $log.enable_event(true) @@ -233,12 +218,20 @@ def worker_id end private - def start - @root_agent.start + + def stop_phase(root_agent) + unless @log_event_verbose + $log.enable_event(false) + @fluent_log_event_router.graceful_stop + end + $log.info 'shutting down fluentd worker', worker: worker_id + root_agent.shutdown + + @fluent_log_event_router.stop end - def shutdown - @root_agent.shutdown + def start + @root_agent.start end end From 137823ff1f255951beda89bacc1e0764d5a3d00f Mon Sep 17 00:00:00 2001 From: Yuta Iwama Date: Wed, 18 Dec 2019 15:52:41 +0900 Subject: [PATCH 09/17] Add StaticConfigAnalysis to check if config is valid before invoking configure Signed-off-by: Yuta Iwama --- lib/fluent/static_config_analysis.rb | 194 +++++++++++++++++++++++++++ test/test_static_config_analysis.rb | 177 ++++++++++++++++++++++++ 2 files changed, 371 insertions(+) create mode 100644 lib/fluent/static_config_analysis.rb create mode 100644 test/test_static_config_analysis.rb diff --git a/lib/fluent/static_config_analysis.rb b/lib/fluent/static_config_analysis.rb new file mode 100644 index 0000000000..0c2b373cf0 --- /dev/null +++ b/lib/fluent/static_config_analysis.rb @@ -0,0 +1,194 @@ +# +# Fluentd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +require 'fluent/config' +require 'fluent/plugin' + +module Fluent + # Static Analysis means analysing all plugins and Fluent::Element without invokeing Plugin#configure + class StaticConfigAnalysis + module Elem + Input = Struct.new(:plugin, :config) + Output = Struct.new(:plugin, :config) + Format = Struct.new(:plugin, :config) + Label = Struct.new(:name, :config, :nodes) + Worker = Struct.new(:ids, :config, :nodes) + end + + Result = Struct.new(:tree, :outputs, :inputs, :filters, :labels) do + def all_plugins + (outputs + inputs + filters).map(&:plugin) + end + end + + # @param workers [Integer] Number of workers + # @return [Fluent::StaticConfigAnalysis::Result] + def self.call(conf, workers: 1) + new(workers).call(conf) + end + + def initialize(workers) + @workers = workers + + reset + end + + def call(config) + reset + + tree = [ + static_worker_analyse(config), + static_label_analyse(config), + static_filter_and_output_analyse(config), + static_input_analyse(config), + ].flatten + + Result.new(tree, @outputs, @inputs, @filters, @labels.values) + end + + private + + def reset + @outputs = [] + @inputs = [] + @filters = [] + @labels = {} + end + + def static_worker_analyse(conf) + available_worker_ids = [*0...@workers] + + ret = [] + conf.elements(name: 'worker').each do |config| + ids = parse_worker_id(config) + ids.each do |id| + if available_worker_ids.include?(id) + available_worker_ids.delete(id) + else + raise Fluent::ConfigError, "specified worker_id<#{id}> collisions is detected on directive. Available worker id(s): #{available_worker_ids}" + end + end + + config.elements.each do |elem| + unless %w[source match filter label].include?(elem.name) + raise Fluent::ConfigError, " section cannot have <#{elem.name}> directive" + end + end + + nodes = [ + static_label_analyse(config), + static_filter_and_output_analyse(config), + static_input_analyse(config), + ].flatten + ret << Elem::Worker.new(ids, config, nodes) + end + + ret + end + + def parse_worker_id(conf) + worker_id_str = conf.arg + + if worker_id_str.empty? + raise Fluent::ConfigError, 'Missing worker id on directive' + end + + l, r = + begin + worker_id_str.split('-', 2).map { |v| Integer(v) } + rescue TypeError, ArgumentError + raise Fluent::ConfigError, "worker id should be integer: #{worker_id_str}" + end + + if l < 0 || l >= @workers + raise Fluent::ConfigError, "worker id #{l} specified by directive is not allowed. Available worker id is between 0 and #{@workers-1}" + end + + # e.g. specified one worker id like `` + if r.nil? + return [l] + end + + if r < 0 || r >= @workers + raise Fluent::ConfigError, "worker id #{r} specified by directive is not allowed. Available worker id is between 0 and #{@workers-1}" + end + + if l > r + raise Fluent::ConfigError, "greater first_worker_id<#{l}> than last_worker_id<#{r}> specified by directive is not allowed. Available multi worker assign syntax is -" + end + + [l, r] + end + + def static_label_analyse(conf) + ret = [] + conf.elements(name: 'label').each do |e| + name = e.arg + if name.empty? + raise ConfigError, 'Missing symbol argument on