diff --git a/lib/fluent/config.rb b/lib/fluent/config.rb index 4c321ff9bd..b2411853e1 100644 --- a/lib/fluent/config.rb +++ b/lib/fluent/config.rb @@ -20,6 +20,25 @@ module Fluent module Config + # @param config_path [String] config file path + # @param encoding [String] encoding of config file + # @param additional_config [String] config which is added to last of config body + # @param use_v1_config [Bool] config is formatted with v1 or not + # @return [Fluent::Config] + def self.build(config_path:, encoding: 'utf-8', additional_config: nil, use_v1_config: true) + config_fname = File.basename(config_path) + config_basedir = File.dirname(config_path) + config_data = File.open(config_path, "r:#{encoding}:utf-8") do |f| + s = f.read + if additional_config + c = additional_config.gsub("\\n", "\n") + s += "\n#{c}" + end + s + end + Fluent::Config.parse(config_data, config_fname, config_basedir, use_v1_config) + end + def self.parse(str, fname, basepath = Dir.pwd, v1_config = nil, syntax: :v1) parser = if fname =~ /\.rb$/ || syntax == :ruby :ruby diff --git a/lib/fluent/engine.rb b/lib/fluent/engine.rb index b03ee0fb9d..c4202c41d8 100644 --- a/lib/fluent/engine.rb +++ b/lib/fluent/engine.rb @@ -23,6 +23,7 @@ require 'fluent/system_config' require 'fluent/plugin' require 'fluent/fluent_log_event_router' +require 'fluent/static_config_analysis' module Fluent class EngineClass @@ -157,14 +158,47 @@ def run raise end - unless @log_event_verbose - $log.enable_event(false) - @fluent_log_event_router.graceful_stop + stop_phase(@root_agent) + end + + # @param conf [Fluent::Config] + # @param supervisor [Bool] + # @reutrn nil + def reload_config(conf, supervisor: false) + # configure first to reduce down time while restarting + new_agent = RootAgent.new(log: log, system_config: @system_config) + ret = Fluent::StaticConfigAnalysis.call(conf, workers: system_config.workers) + + ret.all_plugins.each do |plugin| + if plugin.respond_to?(:reloadable_plugin?) && !plugin.reloadable_plugin? + raise Fluent::ConfigError, "Unreloadable plugin plugin: #{Fluent::Plugin.lookup_type_from_class(plugin.class)}, plugin_id: #{plugin.plugin_id}, class_name: #{plugin.class})" + end end - $log.info "shutting down fluentd worker", worker: worker_id - shutdown - @fluent_log_event_router.stop + # Assign @root_agent to new root_agent + # for https://github.com/fluent/fluentd/blob/fcef949ce40472547fde295ddd2cfe297e1eddd6/lib/fluent/plugin_helper/event_emitter.rb#L50 + old_agent, @root_agent = @root_agent, new_agent + begin + @root_agent.configure(conf) + rescue + @root_agent = old_agent + raise + end + + unless @suppress_config_dump + $log.info :supervisor, "using configuration file: #{conf.to_s.rstrip}" + end + + # supervisor doesn't handle actual data. so the following code is unnecessary. + if supervisor + old_agent.shutdown # to close thread created in #configure + return + end + + stop_phase(old_agent) + + $log.info 'restart fluentd worker', worker: worker_id + start_phase(new_agent) end def stop @@ -189,12 +223,29 @@ def worker_id end private - def start + + def stop_phase(root_agent) + unless @log_event_verbose + $log.enable_event(false) + @fluent_log_event_router.graceful_stop + end + $log.info 'shutting down fluentd worker', worker: worker_id + root_agent.shutdown + + @fluent_log_event_router.stop + end + + def start_phase(root_agent) + @fluent_log_event_router = FluentLogEventRouter.build(root_agent) + if @fluent_log_event_router.emittable? + $log.enable_event(true) + end + @root_agent.start end - def shutdown - @root_agent.shutdown + def start + @root_agent.start end end diff --git a/lib/fluent/plugin/base.rb b/lib/fluent/plugin/base.rb index e8e444f33d..64ec5d0967 100644 --- a/lib/fluent/plugin/base.rb +++ b/lib/fluent/plugin/base.rb @@ -187,6 +187,11 @@ def inspect # https://github.com/ruby/ruby/blob/trunk/gc.c#L788 "#<%s:%014x>" % [self.class.name, '0x%014x' % (__id__ << 1)] end + + def reloadable_plugin? + # Engine can't capture all class variables. so it's forbbiden to use class variables in each plugins if enabling reload. + self.class.class_variables.empty? + end end end end diff --git a/lib/fluent/plugin/buf_file.rb b/lib/fluent/plugin/buf_file.rb index 459b183ce2..75e93dc8ec 100644 --- a/lib/fluent/plugin/buf_file.rb +++ b/lib/fluent/plugin/buf_file.rb @@ -19,6 +19,7 @@ require 'fluent/plugin/buffer' require 'fluent/plugin/buffer/file_chunk' require 'fluent/system_config' +require 'fluent/variable_store' module Fluent module Plugin @@ -43,19 +44,20 @@ class FileBuffer < Fluent::Plugin::Buffer config_param :file_permission, :string, default: nil # '0644' config_param :dir_permission, :string, default: nil # '0755' - @@buffer_paths = {} - def initialize super @symlink_path = nil @multi_workers_available = false @additional_resume_path = nil @buffer_path = nil + @variable_store = nil end def configure(conf) super + @variable_store = Fluent::VariableStore.fetch_or_build(:buf_file) + multi_workers_configured = owner.system_config.workers > 1 ? true : false using_plugin_root_dir = false @@ -69,13 +71,13 @@ def configure(conf) end type_of_owner = Plugin.lookup_type_from_class(@_owner.class) - if @@buffer_paths.has_key?(@path) && !called_in_test? - type_using_this_path = @@buffer_paths[@path] + if @variable_store.has_key?(@path) && !called_in_test? + type_using_this_path = @variable_store[@path] raise ConfigError, "Other '#{type_using_this_path}' plugin already use same buffer path: type = #{type_of_owner}, buffer path = #{@path}" end @buffer_path = @path - @@buffer_paths[@buffer_path] = type_of_owner + @variable_store[@buffer_path] = type_of_owner specified_directory_exists = File.exist?(@path) && File.directory?(@path) unexisting_path_for_directory = !File.exist?(@path) && !@path.include?('.*') @@ -125,7 +127,9 @@ def start end def stop - @@buffer_paths.delete(@buffer_path) + if @variable_store + @variable_store.delete(@buffer_path) + end super end diff --git a/lib/fluent/plugin/buf_file_single.rb b/lib/fluent/plugin/buf_file_single.rb index 7116c951d8..e15d74a90a 100644 --- a/lib/fluent/plugin/buf_file_single.rb +++ b/lib/fluent/plugin/buf_file_single.rb @@ -19,6 +19,7 @@ require 'fluent/plugin/buffer' require 'fluent/plugin/buffer/file_single_chunk' require 'fluent/system_config' +require 'fluent/variable_store' module Fluent module Plugin @@ -48,18 +49,19 @@ class FileSingleBuffer < Fluent::Plugin::Buffer desc 'The permission of chunk directory. If no specified, setting or 0755 is used' config_param :dir_permission, :string, default: nil - @@buffer_paths = {} - def initialize super @multi_workers_available = false @additional_resume_path = nil + @variable_store = nil end def configure(conf) super + @variable_store = Fluent::VariableStore.fetch_or_build(:buf_file_single) + if @chunk_format == :auto @chunk_format = owner.formatted_to_msgpack_binary? ? :msgpack : :text end @@ -117,12 +119,12 @@ def configure(conf) end type_of_owner = Plugin.lookup_type_from_class(@_owner.class) - if @@buffer_paths.has_key?(@path) && !called_in_test? - type_using_this_path = @@buffer_paths[@path] + if @variable_store.has_key?(@path) && !called_in_test? + type_using_this_path = @variable_store[@path] raise Fluent::ConfigError, "Other '#{type_using_this_path}' plugin already uses same buffer path: type = #{type_of_owner}, buffer path = #{@path}" end - @@buffer_paths[@path] = type_of_owner + @variable_store[@path] = type_of_owner @dir_permission = if @dir_permission @dir_permission.to_i(8) else @@ -145,7 +147,9 @@ def start end def stop - @@buffer_paths.delete(@path) + if @variable_store + @variable_store.delete(@path) + end super end diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index 6a226a544d..f8bb655700 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -21,6 +21,7 @@ require 'fluent/event' require 'fluent/plugin/buffer' require 'fluent/plugin/parser_multiline' +require 'fluent/variable_store' if Fluent.windows? require_relative 'file_wrapper' @@ -103,9 +104,8 @@ def initialize attr_reader :paths - @@pos_file_paths = {} - def configure(conf) + @variable_store = Fluent::VariableStore.fetch_or_build(:in_tail) compat_parameters_convert(conf, :parser) parser_config = conf.elements('parse').first unless parser_config @@ -132,11 +132,11 @@ def configure(conf) # TODO: Use plugin_root_dir and storage plugin to store positions if available if @pos_file - if @@pos_file_paths.has_key?(@pos_file) && !called_in_test? - plugin_id_using_this_path = @@pos_file_paths[@pos_file] + if @variable_store.key?(@pos_file) && !called_in_test? + plugin_id_using_this_path = @variable_store[@pos_file] raise Fluent::ConfigError, "Other 'in_tail' plugin already use same pos_file path: plugin_id = #{plugin_id_using_this_path}, pos_file path = #{@pos_file}" end - @@pos_file_paths[@pos_file] = self.plugin_id + @variable_store[@pos_file] = self.plugin_id else $log.warn "'pos_file PATH' parameter is not set to a 'tail' source." $log.warn "this parameter is highly recommended to save the position to resume tailing." @@ -205,7 +205,9 @@ def start end def stop - @@pos_file_paths.delete(@pos_file) + if @variable_store + @variable_store.delete(@pos_file) + end super end diff --git a/lib/fluent/plugin_id.rb b/lib/fluent/plugin_id.rb index 0441387ad5..1a4fe4065d 100644 --- a/lib/fluent/plugin_id.rb +++ b/lib/fluent/plugin_id.rb @@ -15,26 +15,29 @@ # require 'set' +require 'fluent/variable_store' module Fluent module PluginId - @@configured_ids = Set.new def initialize super + + @_plugin_id_variable_store = nil @_plugin_root_dir = nil @id = nil end def configure(conf) + @_plugin_id_variable_store = Fluent::VariableStore.fetch_or_build(:pluing_id, default_value: Set.new) @id = conf['@id'] @_id_configured = !!@id # plugin id is explicitly configured by users (or not) if @id @id = @id.to_s - if @@configured_ids.include?(@id) && !plugin_id_for_test? + if @_plugin_id_variable_store.include?(@id) && !plugin_id_for_test? raise Fluent::ConfigError, "Duplicated plugin id `#{@id}`. Check whole configuration and fix it." end - @@configured_ids.add(@id) + @_plugin_id_variable_store.add(@id) end super @@ -79,7 +82,9 @@ def plugin_root_dir end def stop - @@configured_ids.delete(@id) + if @_plugin_id_variable_store + @_plugin_id_variable_store.delete(@id) + end super end diff --git a/lib/fluent/static_config_analysis.rb b/lib/fluent/static_config_analysis.rb new file mode 100644 index 0000000000..5aabdba970 --- /dev/null +++ b/lib/fluent/static_config_analysis.rb @@ -0,0 +1,194 @@ +# +# Fluentd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +require 'fluent/config' +require 'fluent/plugin' + +module Fluent + # Static Analysis means analysing all plugins and Fluent::Element without invokeing Plugin#configure + class StaticConfigAnalysis + module Elem + Input = Struct.new(:plugin, :config) + Output = Struct.new(:plugin, :config) + Filter = Struct.new(:plugin, :config) + Label = Struct.new(:name, :config, :nodes) + Worker = Struct.new(:ids, :config, :nodes) + end + + Result = Struct.new(:tree, :outputs, :inputs, :filters, :labels) do + def all_plugins + (outputs + inputs + filters).map(&:plugin) + end + end + + # @param workers [Integer] Number of workers + # @return [Fluent::StaticConfigAnalysis::Result] + def self.call(conf, workers: 1) + new(workers).call(conf) + end + + def initialize(workers) + @workers = workers + + reset + end + + def call(config) + reset + + tree = [ + static_worker_analyse(config), + static_label_analyse(config), + static_filter_and_output_analyse(config), + static_input_analyse(config), + ].flatten + + Result.new(tree, @outputs, @inputs, @filters, @labels.values) + end + + private + + def reset + @outputs = [] + @inputs = [] + @filters = [] + @labels = {} + end + + def static_worker_analyse(conf) + available_worker_ids = [*0...@workers] + + ret = [] + conf.elements(name: 'worker').each do |config| + ids = parse_worker_id(config) + ids.each do |id| + if available_worker_ids.include?(id) + available_worker_ids.delete(id) + else + raise Fluent::ConfigError, "specified worker_id<#{id}> collisions is detected on directive. Available worker id(s): #{available_worker_ids}" + end + end + + config.elements.each do |elem| + unless %w[source match filter label].include?(elem.name) + raise Fluent::ConfigError, " section cannot have <#{elem.name}> directive" + end + end + + nodes = [ + static_label_analyse(config), + static_filter_and_output_analyse(config), + static_input_analyse(config), + ].flatten + ret << Elem::Worker.new(ids, config, nodes) + end + + ret + end + + def parse_worker_id(conf) + worker_id_str = conf.arg + + if worker_id_str.empty? + raise Fluent::ConfigError, 'Missing worker id on directive' + end + + l, r = + begin + worker_id_str.split('-', 2).map { |v| Integer(v) } + rescue TypeError, ArgumentError + raise Fluent::ConfigError, "worker id should be integer: #{worker_id_str}" + end + + if l < 0 || l >= @workers + raise Fluent::ConfigError, "worker id #{l} specified by directive is not allowed. Available worker id is between 0 and #{@workers-1}" + end + + # e.g. specified one worker id like `` + if r.nil? + return [l] + end + + if r < 0 || r >= @workers + raise Fluent::ConfigError, "worker id #{r} specified by directive is not allowed. Available worker id is between 0 and #{@workers-1}" + end + + if l > r + raise Fluent::ConfigError, "greater first_worker_id<#{l}> than last_worker_id<#{r}> specified by directive is not allowed. Available multi worker assign syntax is -" + end + + [l, r] + end + + def static_label_analyse(conf) + ret = [] + conf.elements(name: 'label').each do |e| + name = e.arg + if name.empty? + raise ConfigError, 'Missing symbol argument on