diff --git a/lib/fluent/command/fluentd.rb b/lib/fluent/command/fluentd.rb index da25b74cca..c5c4684089 100644 --- a/lib/fluent/command/fluentd.rb +++ b/lib/fluent/command/fluentd.rb @@ -127,6 +127,12 @@ cmd_opts[:without_source] = b } +unless Fluent.windows? + op.on('--with-source-only', "Invoke a fluentd only with input plugins. The data is stored in a temporary buffer. Send SIGWINCH to cancel this mode and process the data (Not supported on Windows).", TrueClass) {|b| + cmd_opts[:with_source_only] = b + } +end + op.on('--config-file-type VALU', 'guessing file type of fluentd configuration. yaml/yml or guess') { |s| if (s == 'yaml') || (s == 'yml') cmd_opts[:config_file_type] = s.to_sym diff --git a/lib/fluent/engine.rb b/lib/fluent/engine.rb index afac3167ca..b6677c6443 100644 --- a/lib/fluent/engine.rb +++ b/lib/fluent/engine.rb @@ -43,6 +43,8 @@ def initialize @system_config = SystemConfig.new @supervisor_mode = false + + @root_agent_mutex = Mutex.new end MAINLOOP_SLEEP_INTERVAL = 0.3 @@ -133,7 +135,15 @@ def emit_stream(tag, es) end def flush! - @root_agent.flush! + @root_agent_mutex.synchronize do + @root_agent.flush! + end + end + + def cancel_source_only! + @root_agent_mutex.synchronize do + @root_agent.cancel_source_only! + end end def now @@ -144,7 +154,9 @@ def now def run begin $log.info "starting fluentd worker", pid: Process.pid, ppid: Process.ppid, worker: worker_id - start + @root_agent_mutex.synchronize do + start + end @fluent_log_event_router.start @@ -158,47 +170,51 @@ def run raise end - stop_phase(@root_agent) + @root_agent_mutex.synchronize do + stop_phase(@root_agent) + end end # @param conf [Fluent::Config] # @param supervisor [Bool] # @reutrn nil def reload_config(conf, supervisor: false) - # configure first to reduce down time while restarting - new_agent = RootAgent.new(log: log, system_config: @system_config) - ret = Fluent::StaticConfigAnalysis.call(conf, workers: system_config.workers) - - ret.all_plugins.each do |plugin| - if plugin.respond_to?(:reloadable_plugin?) && !plugin.reloadable_plugin? - raise Fluent::ConfigError, "Unreloadable plugin plugin: #{Fluent::Plugin.lookup_type_from_class(plugin.class)}, plugin_id: #{plugin.plugin_id}, class_name: #{plugin.class})" + @root_agent_mutex.synchronize do + # configure first to reduce down time while restarting + new_agent = RootAgent.new(log: log, system_config: @system_config) + ret = Fluent::StaticConfigAnalysis.call(conf, workers: system_config.workers) + + ret.all_plugins.each do |plugin| + if plugin.respond_to?(:reloadable_plugin?) && !plugin.reloadable_plugin? + raise Fluent::ConfigError, "Unreloadable plugin plugin: #{Fluent::Plugin.lookup_type_from_class(plugin.class)}, plugin_id: #{plugin.plugin_id}, class_name: #{plugin.class})" + end end - end - # Assign @root_agent to new root_agent - # for https://github.com/fluent/fluentd/blob/fcef949ce40472547fde295ddd2cfe297e1eddd6/lib/fluent/plugin_helper/event_emitter.rb#L50 - old_agent, @root_agent = @root_agent, new_agent - begin - @root_agent.configure(conf) - rescue - @root_agent = old_agent - raise - end + # Assign @root_agent to new root_agent + # for https://github.com/fluent/fluentd/blob/fcef949ce40472547fde295ddd2cfe297e1eddd6/lib/fluent/plugin_helper/event_emitter.rb#L50 + old_agent, @root_agent = @root_agent, new_agent + begin + @root_agent.configure(conf) + rescue + @root_agent = old_agent + raise + end - unless @suppress_config_dump - $log.info :supervisor, "using configuration file: #{conf.to_s.rstrip}" - end + unless @suppress_config_dump + $log.info :supervisor, "using configuration file: #{conf.to_s.rstrip}" + end - # supervisor doesn't handle actual data. so the following code is unnecessary. - if supervisor - old_agent.shutdown # to close thread created in #configure - return - end + # supervisor doesn't handle actual data. so the following code is unnecessary. + if supervisor + old_agent.shutdown # to close thread created in #configure + return + end - stop_phase(old_agent) + stop_phase(old_agent) - $log.info 'restart fluentd worker', worker: worker_id - start_phase(new_agent) + $log.info 'restart fluentd worker', worker: worker_id + start_phase(new_agent) + end end def stop diff --git a/lib/fluent/env.rb b/lib/fluent/env.rb index e25711e77a..1b6c38384f 100644 --- a/lib/fluent/env.rb +++ b/lib/fluent/env.rb @@ -14,6 +14,8 @@ # limitations under the License. # +require 'securerandom' + require 'serverengine/utils' require 'fluent/oj_options' @@ -25,6 +27,7 @@ module Fluent DEFAULT_OJ_OPTIONS = Fluent::OjOptions.load_env DEFAULT_DIR_PERMISSION = 0755 DEFAULT_FILE_PERMISSION = 0644 + INSTANCE_ID = ENV['FLUENT_INSTANCE_ID'] || SecureRandom.uuid def self.windows? ServerEngine.windows? diff --git a/lib/fluent/plugin/out_buffer.rb b/lib/fluent/plugin/out_buffer.rb new file mode 100644 index 0000000000..6f3fe8119e --- /dev/null +++ b/lib/fluent/plugin/out_buffer.rb @@ -0,0 +1,40 @@ +# +# Fluentd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +require 'fluent/plugin/output' + +module Fluent::Plugin + class BufferOutput < Output + Fluent::Plugin.register_output("buffer", self) + helpers :event_emitter + + config_section :buffer do + config_set_default :@type, "file" + config_set_default :chunk_keys, ["tag"] + config_set_default :flush_mode, :interval + config_set_default :flush_interval, 10 + end + + def multi_workers_ready? + true + end + + def write(chunk) + return if chunk.empty? + router.emit_stream(chunk.metadata.tag, Fluent::MessagePackEventStream.new(chunk.read)) + end + end +end diff --git a/lib/fluent/plugin/output.rb b/lib/fluent/plugin/output.rb index 0aed67db95..c600e35287 100644 --- a/lib/fluent/plugin/output.rb +++ b/lib/fluent/plugin/output.rb @@ -1384,6 +1384,7 @@ def retry_state(randomize) end def submit_flush_once + return unless @buffer_config.flush_thread_count > 0 # Without locks: it is rough but enough to select "next" writer selection @output_flush_thread_current_position = (@output_flush_thread_current_position + 1) % @buffer_config.flush_thread_count state = @output_flush_threads[@output_flush_thread_current_position] @@ -1406,6 +1407,7 @@ def force_flush end def submit_flush_all + return unless @buffer_config.flush_thread_count > 0 while !@retry && @buffer.queued? submit_flush_once sleep @buffer_config.flush_thread_burst_interval diff --git a/lib/fluent/plugin_helper/event_emitter.rb b/lib/fluent/plugin_helper/event_emitter.rb index ba089e485a..23bc15ce53 100644 --- a/lib/fluent/plugin_helper/event_emitter.rb +++ b/lib/fluent/plugin_helper/event_emitter.rb @@ -26,6 +26,9 @@ module EventEmitter def router @_event_emitter_used_actually = true + + return Engine.root_agent.source_only_router if @_event_emitter_force_source_only_router + if @_event_emitter_lazy_init @router = @primary_instance.router end @@ -48,6 +51,14 @@ def event_emitter_used_actually? @_event_emitter_used_actually end + def event_emitter_apply_source_only + @_event_emitter_force_source_only_router = true + end + + def event_emitter_cancel_source_only + @_event_emitter_force_source_only_router = false + end + def event_emitter_router(label_name) if label_name if label_name == "@ROOT" @@ -72,6 +83,7 @@ def initialize super @_event_emitter_used_actually = false @_event_emitter_lazy_init = false + @_event_emitter_force_source_only_router = false @router = nil end diff --git a/lib/fluent/root_agent.rb b/lib/fluent/root_agent.rb index d10f366044..c66ccb489c 100644 --- a/lib/fluent/root_agent.rb +++ b/lib/fluent/root_agent.rb @@ -22,6 +22,7 @@ require 'fluent/plugin' require 'fluent/system_config' require 'fluent/time' +require 'fluent/source_only_buffer_agent' module Fluent # @@ -54,17 +55,22 @@ def initialize(log:, system_config: SystemConfig.new) @inputs = [] @suppress_emit_error_log_interval = 0 @next_emit_error_log_time = nil - @without_source = false - @enable_input_metrics = false + @without_source = system_config.without_source || false + @with_source_only = system_config.with_source_only || false + @source_only_buffer_agent = nil + @enable_input_metrics = system_config.enable_input_metrics || false suppress_interval(system_config.emit_error_log_interval) unless system_config.emit_error_log_interval.nil? - @without_source = system_config.without_source unless system_config.without_source.nil? - @enable_input_metrics = !!system_config.enable_input_metrics end attr_reader :inputs attr_reader :labels + def source_only_router + raise "[BUG] 'RootAgent#source_only_router' should not be called when 'with_source_only' is false" unless @with_source_only + @source_only_buffer_agent.event_router + end + def configure(conf) used_worker_ids = [] available_worker_ids = (0..Fluent::Engine.system_config.workers - 1).to_a @@ -148,6 +154,8 @@ def configure(conf) super + setup_source_only_buffer_agent if @with_source_only + # initialize elements if @without_source log.info :worker0, "'--without-source' is applied. Ignore sections" @@ -169,16 +177,33 @@ def setup_error_label(e) @error_collector = error_label.event_router end - def lifecycle(desc: false, kind_callback: nil) - kind_or_label_list = if desc - [:output, :filter, @labels.values.reverse, :output_with_router, :input].flatten - else - [:input, :output_with_router, @labels.values, :filter, :output].flatten - end - kind_or_label_list.each do |kind| + def setup_source_only_buffer_agent(flush: false) + @source_only_buffer_agent = SourceOnlyBufferAgent.new(log: log, system_config: Fluent::Engine.system_config) + @source_only_buffer_agent.configure(flush: flush) + end + + def cleanup_source_only_buffer_agent + @source_only_buffer_agent&.cleanup + end + + def lifecycle(desc: false, kind_callback: nil, kind_or_agent_list: nil) + unless kind_or_agent_list + if @with_source_only + kind_or_agent_list = [:input, @source_only_buffer_agent] + elsif @source_only_buffer_agent + # source_only_buffer_agent can re-reroute events, so the priority is equal to output_with_router. + kind_or_agent_list = [:input, :output_with_router, @source_only_buffer_agent, @labels.values, :filter, :output].flatten + else + kind_or_agent_list = [:input, :output_with_router, @labels.values, :filter, :output].flatten + end + + kind_or_agent_list.reverse! if desc + end + + kind_or_agent_list.each do |kind| if kind.respond_to?(:lifecycle) - label = kind - label.lifecycle(desc: desc) do |plugin, display_kind| + agent = kind + agent.lifecycle(desc: desc) do |plugin, display_kind| yield plugin, display_kind end else @@ -198,8 +223,8 @@ def lifecycle(desc: false, kind_callback: nil) end end - def start - lifecycle(desc: true) do |i| # instance + def start(kind_or_agent_list: nil) + lifecycle(desc: true, kind_or_agent_list: kind_or_agent_list) do |i| # instance i.start unless i.started? # Input#start sometimes emits lots of events with in_tail/`read_from_head true` case # and it causes deadlock for small buffer/queue output. To avoid such problem, @@ -231,13 +256,46 @@ def flush! flushing_threads.each{|t| t.join } end - def shutdown # Fluentd's shutdown sequence is stop, before_shutdown, shutdown, after_shutdown, close, terminate for plugins + def cancel_source_only! + unless @with_source_only + log.info "do nothing for canceling with-source-only because the current mode is not with-source-only." + return + end + + log.info "cancel with-source-only mode and start the other plugins" + all_plugins = [:input, :output_with_router, @labels.values, :filter, :output].flatten.reverse + start(kind_or_agent_list: all_plugins) + + lifecycle_control_list[:input].each(&:event_emitter_cancel_source_only) + + # Want to make sure that the source_only_router finishes all process before + # shutting down the agent. + # Strictly speaking, it would be necessary to have exclusive lock between + # EventRouter and the shutting down process of this agent. + # However, adding lock to EventRouter would worsen its performance, and + # the entire shutting down process does not care about it either. + # So, sleep here just in case. + sleep 1 + + shutdown(kind_or_agent_list: [@source_only_buffer_agent]) + @source_only_buffer_agent = nil + + # This agent can stop after flushing its all buffer, but it is not implemented for now. + log.info "starts the loading agent for with-source-only" + setup_source_only_buffer_agent(flush: true) + start(kind_or_agent_list: [@source_only_buffer_agent]) + + @with_source_only = false + end + + def shutdown(kind_or_agent_list: nil) + # Fluentd's shutdown sequence is stop, before_shutdown, shutdown, after_shutdown, close, terminate for plugins # These method callers does `rescue Exception` to call methods of shutdown sequence as far as possible # if plugin methods does something like infinite recursive call, `exit`, unregistering signal handlers or others. # Plugins should be separated and be in sandbox to protect data in each plugins/buffers. lifecycle_safe_sequence = ->(method, checker) { - lifecycle do |instance, kind| + lifecycle(kind_or_agent_list: kind_or_agent_list) do |instance, kind| begin log.debug "calling #{method} on #{kind} plugin", type: Plugin.lookup_type_from_class(instance.class), plugin_id: instance.plugin_id instance.__send__(method) unless instance.__send__(checker) @@ -260,7 +318,7 @@ def shutdown # Fluentd's shutdown sequence is stop, before_shutdown, shutdown, a operation_threads.each{|t| t.join } operation_threads.clear } - lifecycle(kind_callback: callback) do |instance, kind| + lifecycle(kind_callback: callback, kind_or_agent_list: kind_or_agent_list) do |instance, kind| t = Thread.new do Thread.current.abort_on_exception = true begin @@ -301,6 +359,8 @@ def shutdown # Fluentd's shutdown sequence is stop, before_shutdown, shutdown, a lifecycle_unsafe_sequence.call(:close, :closed?) lifecycle_safe_sequence.call(:terminate, :terminated?) + + cleanup_source_only_buffer_agent unless kind_or_agent_list end def suppress_interval(interval_time) @@ -318,6 +378,7 @@ def add_source(type, conf) # See also 'fluentd/plugin/input.rb' input.context_router = @event_router input.configure(conf) + input.event_emitter_apply_source_only if @with_source_only if @enable_input_metrics @event_router.add_metric_callbacks(input.plugin_id, Proc.new {|es| input.metric_callback(es) }) end diff --git a/lib/fluent/source_only_buffer_agent.rb b/lib/fluent/source_only_buffer_agent.rb new file mode 100644 index 0000000000..5695fcfa3c --- /dev/null +++ b/lib/fluent/source_only_buffer_agent.rb @@ -0,0 +1,102 @@ +# +# Fluentd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +require 'fluent/agent' +require 'fluent/system_config' + +module Fluent + class SourceOnlyBufferAgent < Agent + # Use INSTANCE_ID to use the same base dir as the other workers. + # This will make recovery easier. + BUFFER_DIR_NAME = Fluent::INSTANCE_ID + + def initialize(log:, system_config:) + super(log: log) + + @default_buffer_path = File.join(system_config.root_dir || DEFAULT_BACKUP_DIR, 'source-only-buffer', BUFFER_DIR_NAME) + @optional_buffer_config = system_config.source_only_buffer.to_h.transform_keys(&:to_s) + @base_buffer_dir = nil + @actual_buffer_dir = nil + end + + def configure(flush: false) + buffer_config = @optional_buffer_config.compact + buffer_config['flush_at_shutdown'] = flush ? 'true' : 'false' + buffer_config['flush_thread_count'] = 0 unless flush + buffer_config['path'] ||= @default_buffer_path + + super( + Config::Element.new('SOURCE_ONLY_BUFFER', '', {}, [ + Config::Element.new('match', '**', {'@type' => 'buffer', '@label' => '@ROOT'}, [ + Config::Element.new('buffer', '', buffer_config, []) + ]) + ]) + ) + + @base_buffer_dir = buffer_config['path'] + # It can be "#{@base_buffer_dir}/worker#{fluentd_worker_id}/" when using multiple workers + @actual_buffer_dir = File.dirname(outputs[0].buffer.path) + + unless flush + log.info "with-source-only: the emitted data will be stored in the buffer files under" + + " #{@base_buffer_dir}. You can send SIGWINCH to the supervisor process to cancel" + + " with-source-only mode and process data." + end + end + + def cleanup + unless (Dir.empty?(@actual_buffer_dir) rescue true) + log.warn "some buffer files remain in #{@base_buffer_dir}." + + " Please consider recovering or saving the buffer files in the directory." + + " To recover them, you can set the buffer path manually to system config and" + + " retry, i.e., restart Fluentd with with-source-only mode and send SIGWINCH again." + + " Config Example:\n#{config_example_to_recover(@base_buffer_dir)}" + return + end + + begin + FileUtils.remove_dir(@base_buffer_dir) + rescue Errno::ENOENT + # This worker doesn't need to do anything. Another worker may remove the dir first. + rescue => e + log.warn "failed to remove the buffer directory: #{@base_buffer_dir}", error: e + end + end + + def emit_error_event(tag, time, record, error) + error_info = {error: error, location: (error.backtrace ? error.backtrace.first : nil), tag: tag, time: time, record: record} + log.warn "SourceOnlyBufferAgent: dump an error event:", error_info + end + + def handle_emits_error(tag, es, error) + error_info = {error: error, location: (error.backtrace ? error.backtrace.first : nil), tag: tag} + log.warn "SourceOnlyBufferAgent: emit transaction failed:", error_info + log.warn_backtrace + end + + private + + def config_example_to_recover(path) + <<~EOC + + + path #{path} + + + EOC + end + end +end diff --git a/lib/fluent/supervisor.rb b/lib/fluent/supervisor.rb index d565abf600..a76bd1d40a 100644 --- a/lib/fluent/supervisor.rb +++ b/lib/fluent/supervisor.rb @@ -189,6 +189,11 @@ def install_supervisor_signal_handlers $log.debug 'fluentd supervisor process got SIGUSR2' supervisor_sigusr2_handler end + + trap :WINCH do + $log.debug 'fluentd supervisor process got SIGWINCH' + cancel_source_only + end end if Fluent.windows? @@ -312,6 +317,10 @@ def supervisor_sigusr2_handler $log.error "Failed to reload config file: #{e}" end + def cancel_source_only + send_signal_to_workers(:WINCH) + end + def supervisor_dump_handler_for_windows # As for UNIX-like, SIGCONT signal to each process makes the process output its dump-file, # and it is implemented before the implementation of the function for Windows. @@ -409,6 +418,7 @@ def spawn(process_manager) main_cmd = config[:main_cmd] env = { 'SERVERENGINE_WORKER_ID' => @worker_id.to_i.to_s, + 'FLUENT_INSTANCE_ID' => Fluent::INSTANCE_ID, } @pm = process_manager.spawn(env, *main_cmd) end @@ -486,6 +496,7 @@ def self.default_options suppress_repeated_stacktrace: true, ignore_repeated_log_interval: nil, without_source: nil, + with_source_only: nil, enable_input_metrics: nil, enable_size_metrics: nil, use_v1_config: true, @@ -550,6 +561,10 @@ def run_supervisor(dry_run: false) raise Fluent::ConfigError, "invalid number of workers (must be > 0):#{@system_config.workers}" end + if Fluent.windows? && @system_config.with_source_only + raise Fluent::ConfigError, "with-source-only is not supported on Windows" + end + root_dir = @system_config.root_dir if root_dir if File.exist?(root_dir) @@ -601,6 +616,10 @@ def run_worker raise Fluent::ConfigError, "invalid number of workers (must be 1 or unspecified) with --no-supervisor: #{@system_config.workers}" end + if Fluent.windows? && @system_config.with_source_only + raise Fluent::ConfigError, "with-source-only is not supported on Windows" + end + install_main_process_signal_handlers # This is the only log messsage for @standalone_worker @@ -840,6 +859,10 @@ def install_main_process_signal_handlers trap :CONT do dump_non_windows end + + trap :WINCH do + cancel_source_only + end end end @@ -893,6 +916,18 @@ def flush_buffer end end + def cancel_source_only + Thread.new do + begin + $log.debug "fluentd main process get SIGWINCH" + $log.info "try to cancel with-source-only mode" + Fluent::Engine.cancel_source_only! + rescue Exception => e + $log.warn "failed to cancel source only", error: e + end + end + end + def reload_config Thread.new do $log.debug('worker got SIGUSR2') diff --git a/lib/fluent/system_config.rb b/lib/fluent/system_config.rb index 917889018d..e6bd487f45 100644 --- a/lib/fluent/system_config.rb +++ b/lib/fluent/system_config.rb @@ -25,7 +25,7 @@ class SystemConfig :workers, :restart_worker_interval, :root_dir, :log_level, :suppress_repeated_stacktrace, :emit_error_log_interval, :suppress_config_dump, :log_event_verbose, :ignore_repeated_log_interval, :ignore_same_log_interval, - :without_source, :rpc_endpoint, :enable_get_dump, :process_name, + :without_source, :with_source_only, :rpc_endpoint, :enable_get_dump, :process_name, :file_permission, :dir_permission, :counter_server, :counter_client, :strict_config_value, :enable_msgpack_time_support, :disable_shared_socket, :metrics, :enable_input_metrics, :enable_size_metrics, :enable_jit @@ -41,7 +41,8 @@ class SystemConfig config_param :emit_error_log_interval, :time, default: nil config_param :suppress_config_dump, :bool, default: nil config_param :log_event_verbose, :bool, default: nil - config_param :without_source, :bool, default: nil + config_param :without_source, :bool, default: nil + config_param :with_source_only, :bool, default: nil config_param :rpc_endpoint, :string, default: nil config_param :enable_get_dump, :bool, default: nil config_param :process_name, :string, default: nil @@ -104,6 +105,16 @@ class SystemConfig config_param :labels, :hash, default: {} end + config_section :source_only_buffer, init: true, multi: false do + config_param :flush_thread_count, :integer, default: 1 + config_param :overflow_action, :enum, list: [:throw_exception, :block, :drop_oldest_chunk], default: :drop_oldest_chunk + config_param :path, :string, default: nil + config_param :flush_interval, :time, default: nil + config_param :chunk_limit_size, :size, default: nil + config_param :total_limit_size, :size, default: nil + config_param :compress, :enum, list: [:text, :gzip], default: nil + end + def self.create(conf, strict_config_value=false) systems = conf.elements(name: 'system') return SystemConfig.new if systems.empty? diff --git a/test/command/test_fluentd.rb b/test/command/test_fluentd.rb index 0b2106f129..0b40b93c2a 100644 --- a/test/command/test_fluentd.rb +++ b/test/command/test_fluentd.rb @@ -1335,4 +1335,19 @@ def multi_workers_ready?; true; end end end end + + sub_test_case "--with-source-only" do + setup do + omit "Not supported on Windows" if Fluent.windows? + end + + test "should work without error" do + conf_path = create_conf_file("empty.conf", "") + assert File.exist?(conf_path) + assert_log_matches(create_cmdline(conf_path, "--with-source-only"), + "with-source-only: the emitted data will be stored in the buffer files under", + "fluentd worker is now running", + patterns_not_match: ["[error]"]) + end + end end diff --git a/test/config/test_system_config.rb b/test/config/test_system_config.rb index 085930559d..9a37a3d272 100644 --- a/test/config/test_system_config.rb +++ b/test/config/test_system_config.rb @@ -21,6 +21,7 @@ def initialize(**opt) log_event_label: nil, log_event_verbose: nil, without_source: nil, + with_source_only: nil, enable_input_metrics: nil, enable_size_metrics: nil, emit_error_log_interval: nil, @@ -73,6 +74,7 @@ def parse_text(text) assert_nil(sc.emit_error_log_interval) assert_nil(sc.suppress_config_dump) assert_nil(sc.without_source) + assert_nil(sc.with_source_only) assert_nil(sc.enable_input_metrics) assert_nil(sc.enable_size_metrics) assert_nil(sc.enable_msgpack_time_support) @@ -92,6 +94,7 @@ def parse_text(text) 'log_event_verbose' => ['log_event_verbose', true], 'suppress_config_dump' => ['suppress_config_dump', true], 'without_source' => ['without_source', true], + 'with_source_only' => ['with_source_only', true], 'strict_config_value' => ['strict_config_value', true], 'enable_msgpack_time_support' => ['enable_msgpack_time_support', true], 'enable_input_metrics' => ['enable_input_metrics', true], @@ -194,5 +197,45 @@ def parse_text(text) assert_equal(size, sc.log.rotate_size) end end + + test "source-only-buffer parameters" do + conf = parse_text(<<~EOS) + + + flush_thread_count 4 + overflow_action throw_exception + path /tmp/source-only-buffer + flush_interval 1 + chunk_limit_size 100 + total_limit_size 1000 + compress gzip + + + EOS + s = FakeSupervisor.new + sc = Fluent::SystemConfig.new(conf) + sc.overwrite_variables(**s.for_system_config) + + assert_equal( + [ + 4, + :throw_exception, + "/tmp/source-only-buffer", + 1, + 100, + 1000, + :gzip, + ], + [ + sc.source_only_buffer.flush_thread_count, + sc.source_only_buffer.overflow_action, + sc.source_only_buffer.path, + sc.source_only_buffer.flush_interval, + sc.source_only_buffer.chunk_limit_size, + sc.source_only_buffer.total_limit_size, + sc.source_only_buffer.compress, + ] + ) + end end end diff --git a/test/plugin/test_out_buffer.rb b/test/plugin/test_out_buffer.rb new file mode 100644 index 0000000000..afaac1bcdd --- /dev/null +++ b/test/plugin/test_out_buffer.rb @@ -0,0 +1,54 @@ +require_relative '../helper' +require 'fluent/test/driver/output' +require 'fluent/plugin/out_buffer' + +class BufferOutputTest < Test::Unit::TestCase + def setup + Fluent::Test.setup + end + + def create_driver(conf = "") + Fluent::Test::Driver::Output.new(Fluent::Plugin::BufferOutput).configure(conf) + end + + test "default setting" do + d = create_driver( + config_element( + "ROOT", "", {}, + [config_element("buffer", "", {"path" => "test"})] + ) + ) + + assert_equal( + [ + "file", + ["tag"], + :interval, + 10, + ], + [ + d.instance.buffer_config["@type"], + d.instance.buffer_config.chunk_keys, + d.instance.buffer_config.flush_mode, + d.instance.buffer_config.flush_interval, + ] + ) + end + + test "#write" do + d = create_driver( + config_element( + "ROOT", "", {}, + [config_element("buffer", "", {"@type" => "memory", "flush_mode" => "immediate"})] + ) + ) + + time = event_time + record = {"message" => "test"} + d.run(default_tag: 'test') do + d.feed(time, record) + end + + assert_equal [["test", time, record]], d.events + end +end diff --git a/test/plugin_helper/test_event_emitter.rb b/test/plugin_helper/test_event_emitter.rb index 409b9800bf..26580f0087 100644 --- a/test/plugin_helper/test_event_emitter.rb +++ b/test/plugin_helper/test_event_emitter.rb @@ -77,4 +77,33 @@ class Dummy < Fluent::Plugin::TestBase router = d.event_emitter_router("@ROOT") assert_equal Fluent::Engine.root_agent.event_router, router end + + test '#router should return the root router by default' do + stub(Fluent::Engine.root_agent).event_router { "root_event_router" } + stub(Fluent::Engine.root_agent).source_only_router { "source_only_router" } + + d = Dummy.new + d.configure(Fluent::Config::Element.new('source', '', {}, [])) + assert_equal "root_event_router", d.router + end + + test '#router should return source_only_router during source-only' do + stub(Fluent::Engine.root_agent).event_router { "root_event_router" } + stub(Fluent::Engine.root_agent).source_only_router { "source_only_router" } + + d = Dummy.new + d.configure(Fluent::Config::Element.new('source', '', {}, [])) + d.event_emitter_apply_source_only + + router_when_source_only = d.router + + d.event_emitter_cancel_source_only + + router_after_canceled = d.router + + assert_equal( + ["source_only_router", "root_event_router"], + [router_when_source_only, router_after_canceled] + ) + end end diff --git a/test/test_plugin_classes.rb b/test/test_plugin_classes.rb index 8af95aa20c..ccd6a1469c 100644 --- a/test/test_plugin_classes.rb +++ b/test/test_plugin_classes.rb @@ -91,9 +91,13 @@ def shutdown class FluentTestGenInput < ::Fluent::Plugin::Input ::Fluent::Plugin.register_input('test_in_gen', self) + helpers :thread + attr_reader :started config_param :num, :integer, default: 10000 + config_param :interval_sec, :float, default: nil + config_param :async, :bool, default: false def initialize super @@ -106,8 +110,18 @@ def start super @started = true + if @async + thread_create(:test_in_gen, &method(:emit)) + else + emit + end + end + + def emit @num.times { |i| - router.emit("test.evet", Fluent::EventTime.now, {'message' => 'Hello!', 'key' => "value#{i}", 'num' => i}) + break if @async and not thread_current_running? + router.emit("test.event", Fluent::EventTime.now, {'message' => 'Hello!', 'key' => "value#{i}", 'num' => i}) + sleep @interval_sec if @interval_sec } end diff --git a/test/test_root_agent.rb b/test/test_root_agent.rb index 17a6b68f36..d10174d0f3 100644 --- a/test/test_root_agent.rb +++ b/test/test_root_agent.rb @@ -948,4 +948,107 @@ def configure_ra(conf_str) assert ra.error_collector end end + + sub_test_case 'start with-source-only' do + def conf + <<~EOC + + @type test_in_gen + @id test_in_gen + num 20 + interval_sec 0.1 + async + + + + @type record_transformer + @id record_transformer + + foo foo + + + + + @type test_out + @id test_out + + EOC + end + + def setup + omit "Not supported on Windows" if Fluent.windows? + system_config = SystemConfig.new( + Config::Element.new('system', '', { + 'with_source_only' => true, + }, [ + Config::Element.new('source_only_buffer', '', { + 'flush_interval' => 1, + }, []), + ]) + ) + @root_agent = RootAgent.new(log: $log, system_config: system_config) + stub(Engine).root_agent { @root_agent } + stub(Engine).system_config { system_config } + @root_agent.configure(Config.parse(conf, "(test)", "(test_dir)")) + end + + test 'only input plugins should start' do + @root_agent.start + + assert_equal( + { + "input started?" => [true], + "filter started?" => [false], + "output started?" => [false], + }, + { + "input started?" => @root_agent.inputs.map { |plugin| plugin.started? }, + "filter started?" => @root_agent.filters.map { |plugin| plugin.started? }, + "output started?" => @root_agent.outputs.map { |plugin| plugin.started? }, + } + ) + ensure + @root_agent.shutdown + # Buffer files remain because not cancelling source-only. + # As a test, they should be clean-up-ed. + buf_dir = @root_agent.instance_variable_get(:@source_only_buffer_agent).instance_variable_get(:@base_buffer_dir) + FileUtils.remove_dir(buf_dir) + end + + test '#cancel_source_only! should start all plugins' do + @root_agent.start + @root_agent.cancel_source_only! + + assert_equal( + { + "input started?" => [true], + "filter started?" => [true], + "output started?" => [true], + }, + { + "input started?" => @root_agent.inputs.map { |plugin| plugin.started? }, + "filter started?" => @root_agent.filters.map { |plugin| plugin.started? }, + "output started?" => @root_agent.outputs.map { |plugin| plugin.started? }, + } + ) + ensure + @root_agent.shutdown + end + + test 'buffer should be loaded after #cancel_source_only!' do + @root_agent.start + sleep 1 + @root_agent.cancel_source_only! + + waiting(3) do + # Wait buffer loaded after source-only cancelled + sleep 1 until @root_agent.outputs[0].events["test.event"].any? { |record| record["num"] == 0 } + end + + # all data should be outputted + assert { @root_agent.outputs[0].events["test.event"].size == 20 } + ensure + @root_agent.shutdown + end + end end diff --git a/test/test_source_only_buffer_agent.rb b/test/test_source_only_buffer_agent.rb new file mode 100644 index 0000000000..a2524e6343 --- /dev/null +++ b/test/test_source_only_buffer_agent.rb @@ -0,0 +1,254 @@ +require_relative 'helper' + +class SourceOnlyBufferAgentTest < ::Test::Unit::TestCase + def log + logger = ServerEngine::DaemonLogger.new( + Fluent::Test::DummyLogDevice.new, + { log_level: ServerEngine::DaemonLogger::INFO } + ) + Fluent::Log.new(logger) + end + + def setup + omit "Not supported on Windows" if Fluent.windows? + @log = log + end + + sub_test_case "#configure" do + test "default" do + system_config = Fluent::SystemConfig.new + root_agent = Fluent::RootAgent.new(log: @log, system_config: system_config) + stub(Fluent::Engine).root_agent { root_agent } + stub(Fluent::Engine).system_config { system_config } + root_agent.configure(config_element) + + agent = Fluent::SourceOnlyBufferAgent.new(log: @log, system_config: system_config) + agent.configure + + assert_equal( + { + "num of filter plugins" => 0, + "num of output plugins" => 1, + "base_buffer_dir" => agent.instance_variable_get(:@default_buffer_path), + "actual_buffer_dir" => agent.instance_variable_get(:@default_buffer_path), + "EventRouter of BufferOutput" => root_agent.event_router.object_id, + "flush_thread_count" => 0, + "flush_at_shutdown" => false, + }, + { + "num of filter plugins" => agent.filters.size, + "num of output plugins" => agent.outputs.size, + "base_buffer_dir" => agent.instance_variable_get(:@base_buffer_dir), + "actual_buffer_dir" => agent.instance_variable_get(:@actual_buffer_dir), + "EventRouter of BufferOutput" => agent.outputs[0].router.object_id, + "flush_thread_count" => agent.outputs[0].buffer_config.flush_thread_count, + "flush_at_shutdown" => agent.outputs[0].buffer_config.flush_at_shutdown, + } + ) + + assert do + @log.out.logs.any? { |log| log.include? "the emitted data will be stored in the buffer files" } + end + end + + test "flush: true" do + system_config = Fluent::SystemConfig.new + root_agent = Fluent::RootAgent.new(log: @log, system_config: system_config) + stub(Fluent::Engine).root_agent { root_agent } + stub(Fluent::Engine).system_config { system_config } + root_agent.configure(config_element) + + agent = Fluent::SourceOnlyBufferAgent.new(log: @log, system_config: system_config) + agent.configure(flush: true) + + assert_equal( + { + "num of filter plugins" => 0, + "num of output plugins" => 1, + "base_buffer_dir" => agent.instance_variable_get(:@default_buffer_path), + "actual_buffer_dir" => agent.instance_variable_get(:@default_buffer_path), + "EventRouter of BufferOutput" => root_agent.event_router.object_id, + "flush_thread_count" => 1, + "flush_at_shutdown" => true, + }, + { + "num of filter plugins" => agent.filters.size, + "num of output plugins" => agent.outputs.size, + "base_buffer_dir" => agent.instance_variable_get(:@base_buffer_dir), + "actual_buffer_dir" => agent.instance_variable_get(:@actual_buffer_dir), + "EventRouter of BufferOutput" => agent.outputs[0].router.object_id, + "flush_thread_count" => agent.outputs[0].buffer_config.flush_thread_count, + "flush_at_shutdown" => agent.outputs[0].buffer_config.flush_at_shutdown, + } + ) + + assert do + not @log.out.logs.any? { |log| log.include? "the emitted data will be stored in the buffer files" } + end + end + + test "multiple workers" do + system_config = Fluent::SystemConfig.new(config_element("system", "", {"workers" => 2})) + root_agent = Fluent::RootAgent.new(log: @log, system_config: system_config) + stub(Fluent::Engine).root_agent { root_agent } + stub(Fluent::Engine).system_config { system_config } + root_agent.configure(config_element) + + agent = Fluent::SourceOnlyBufferAgent.new(log: @log, system_config: system_config) + agent.configure + + assert_equal( + { + "num of filter plugins" => 0, + "num of output plugins" => 1, + "base_buffer_dir" => agent.instance_variable_get(:@default_buffer_path), + "actual_buffer_dir" => "#{agent.instance_variable_get(:@default_buffer_path)}/worker0", + "EventRouter of BufferOutput" => root_agent.event_router.object_id, + "flush_thread_count" => 0, + "flush_at_shutdown" => false, + }, + { + "num of filter plugins" => agent.filters.size, + "num of output plugins" => agent.outputs.size, + "base_buffer_dir" => agent.instance_variable_get(:@base_buffer_dir), + "actual_buffer_dir" => agent.instance_variable_get(:@actual_buffer_dir), + "EventRouter of BufferOutput" => agent.outputs[0].router.object_id, + "flush_thread_count" => agent.outputs[0].buffer_config.flush_thread_count, + "flush_at_shutdown" => agent.outputs[0].buffer_config.flush_at_shutdown, + } + ) + end + + test "full setting with flush:true" do + system_config = Fluent::SystemConfig.new(config_element("system", "", {}, [ + config_element("source_only_buffer", "", { + "flush_thread_count" => 4, + "overflow_action" => :throw_exception, + "path" => "tmp_buffer_path", + "flush_interval" => 1, + "chunk_limit_size" => 100, + "total_limit_size" => 1000, + "compress" => :gzip, + }) + ])) + root_agent = Fluent::RootAgent.new(log: @log, system_config: system_config) + stub(Fluent::Engine).root_agent { root_agent } + stub(Fluent::Engine).system_config { system_config } + root_agent.configure(config_element) + + agent = Fluent::SourceOnlyBufferAgent.new(log: @log, system_config: system_config) + agent.configure(flush: true) + + assert_equal( + { + "num of filter plugins" => 0, + "num of output plugins" => 1, + "base_buffer_dir" => "tmp_buffer_path", + "actual_buffer_dir" => "tmp_buffer_path", + "EventRouter of BufferOutput" => root_agent.event_router.object_id, + "flush_thread_count" => 4, + "flush_at_shutdown" => true, + "overflow_action" => :throw_exception, + "flush_interval" => 1, + "chunk_limit_size" => 100, + "total_limit_size" => 1000, + "compress" => :gzip, + }, + { + "num of filter plugins" => agent.filters.size, + "num of output plugins" => agent.outputs.size, + "base_buffer_dir" => agent.instance_variable_get(:@base_buffer_dir), + "actual_buffer_dir" => agent.instance_variable_get(:@actual_buffer_dir), + "EventRouter of BufferOutput" => agent.outputs[0].router.object_id, + "flush_thread_count" => agent.outputs[0].buffer_config.flush_thread_count, + "flush_at_shutdown" => agent.outputs[0].buffer_config.flush_at_shutdown, + "overflow_action" => agent.outputs[0].buffer_config.overflow_action, + "flush_interval" => agent.outputs[0].buffer_config.flush_interval, + "chunk_limit_size" => agent.outputs[0].buffer.chunk_limit_size, + "total_limit_size" => agent.outputs[0].buffer.total_limit_size, + "compress" => agent.outputs[0].buffer.compress, + } + ) + end + end + + sub_test_case "#cleanup" do + test "do not remove the buffer if it is not empty" do + system_config = Fluent::SystemConfig.new + root_agent = Fluent::RootAgent.new(log: @log, system_config: system_config) + stub(Fluent::Engine).root_agent { root_agent } + stub(Fluent::Engine).system_config { system_config } + root_agent.configure(config_element) + + agent = Fluent::SourceOnlyBufferAgent.new(log: @log, system_config: system_config) + agent.configure + + stub(Dir).empty?(agent.instance_variable_get(:@actual_buffer_dir)) { false } + mock(FileUtils).remove_dir.never + + agent.cleanup + + assert do + @log.out.logs.any? { |log| log.include? "some buffer files remain in" } + end + end + + test "remove the buffer if it is empty" do + system_config = Fluent::SystemConfig.new + root_agent = Fluent::RootAgent.new(log: @log, system_config: system_config) + stub(Fluent::Engine).root_agent { root_agent } + stub(Fluent::Engine).system_config { system_config } + root_agent.configure(config_element) + + agent = Fluent::SourceOnlyBufferAgent.new(log: @log, system_config: system_config) + agent.configure + + stub(Dir).empty?(agent.instance_variable_get(:@actual_buffer_dir)) { true } + mock(FileUtils).remove_dir(agent.instance_variable_get(:@base_buffer_dir)).times(1) + + agent.cleanup + + assert do + not @log.out.logs.any? { |log| log.include? "some buffer files remain in" } + end + end + end + + sub_test_case "error" do + test "#emit_error_event" do + system_config = Fluent::SystemConfig.new + root_agent = Fluent::RootAgent.new(log: @log, system_config: system_config) + stub(Fluent::Engine).root_agent { root_agent } + stub(Fluent::Engine).system_config { system_config } + root_agent.configure(config_element) + + agent = Fluent::SourceOnlyBufferAgent.new(log: @log, system_config: system_config) + agent.configure + + agent.event_router.emit_error_event("tag", 0, "hello", Exception.new) + + assert do + @log.out.logs.any? { |log| log.include? "SourceOnlyBufferAgent: dump an error event" } + end + end + + test "#handle_emits_error" do + system_config = Fluent::SystemConfig.new + root_agent = Fluent::RootAgent.new(log: @log, system_config: system_config) + stub(Fluent::Engine).root_agent { root_agent } + stub(Fluent::Engine).system_config { system_config } + root_agent.configure(config_element) + + agent = Fluent::SourceOnlyBufferAgent.new(log: @log, system_config: system_config) + agent.configure + + stub(agent.outputs[0]).emit_events { raise "test error" } + + agent.event_router.emit("foo", 0, "hello") + + assert do + @log.out.logs.any? { |log| log.include? "SourceOnlyBufferAgent: emit transaction failed" } + end + end + end +end diff --git a/test/test_supervisor.rb b/test/test_supervisor.rb index 3b5b343483..6d3363630e 100644 --- a/test/test_supervisor.rb +++ b/test/test_supervisor.rb @@ -62,6 +62,7 @@ def test_system_config suppress_repeated_stacktrace false suppress_config_dump true without_source true + with_source_only true enable_get_dump true process_name "process_name" log_level info @@ -82,6 +83,15 @@ def test_system_config port 24321 timeout 2 + + flush_thread_count 4 + overflow_action throw_exception + path /tmp/source-only-buffer + flush_interval 1 + chunk_limit_size 100 + total_limit_size 1000 + compress gzip + EOC conf = Fluent::Config.parse(conf_data, "(test)", "(test_dir)", true) @@ -91,6 +101,7 @@ def test_system_config assert_equal false, sys_conf.suppress_repeated_stacktrace assert_equal true, sys_conf.suppress_config_dump assert_equal true, sys_conf.without_source + assert_equal true, sys_conf.with_source_only assert_equal true, sys_conf.enable_get_dump assert_equal "process_name", sys_conf.process_name assert_equal 2, sys_conf.log_level @@ -107,6 +118,14 @@ def test_system_config assert_equal '127.0.0.1', counter_client.host assert_equal 24321, counter_client.port assert_equal 2, counter_client.timeout + source_only_buffer = sys_conf.source_only_buffer + assert_equal 4, source_only_buffer.flush_thread_count + assert_equal :throw_exception, source_only_buffer.overflow_action + assert_equal "/tmp/source-only-buffer", source_only_buffer.path + assert_equal 1, source_only_buffer.flush_interval + assert_equal 100, source_only_buffer.chunk_limit_size + assert_equal 1000, source_only_buffer.total_limit_size + assert_equal :gzip, source_only_buffer.compress end sub_test_case "yaml config" do @@ -131,6 +150,7 @@ def test_system_config suppress_repeated_stacktrace: true suppress_config_dump: true without_source: true + with_source_only: true enable_get_dump: true process_name: "process_name" log_level: info @@ -148,12 +168,21 @@ def test_system_config host: 127.0.0.1 port: 24321 timeout: 2 + source_only_buffer: + flush_thread_count: 4 + overflow_action: throw_exception + path: /tmp/source-only-buffer + flush_interval: 1 + chunk_limit_size: 100 + total_limit_size: 1000 + compress: gzip EOC conf = parse_yaml(conf_data) sys_conf = sv.__send__(:build_system_config, conf) counter_client = sys_conf.counter_client counter_server = sys_conf.counter_server + source_only_buffer = sys_conf.source_only_buffer assert_equal( [ '127.0.0.1:24445', @@ -161,6 +190,7 @@ def test_system_config true, true, true, + true, "process_name", 2, @tmp_root_dir, @@ -174,12 +204,20 @@ def test_system_config '127.0.0.1', 24321, 2, + 4, + :throw_exception, + "/tmp/source-only-buffer", + 1, + 100, + 1000, + :gzip, ], [ sys_conf.rpc_endpoint, sys_conf.suppress_repeated_stacktrace, sys_conf.suppress_config_dump, sys_conf.without_source, + sys_conf.with_source_only, sys_conf.enable_get_dump, sys_conf.process_name, sys_conf.log_level, @@ -194,6 +232,13 @@ def test_system_config counter_client.host, counter_client.port, counter_client.timeout, + source_only_buffer.flush_thread_count, + source_only_buffer.overflow_action, + source_only_buffer.path, + source_only_buffer.flush_interval, + source_only_buffer.chunk_limit_size, + source_only_buffer.total_limit_size, + source_only_buffer.compress, ]) end end @@ -254,6 +299,25 @@ def test_term_cont_in_main_process_signal_handlers File.delete(@sigdump_path) if File.exist?(@sigdump_path) end + def test_winch_in_main_process_signal_handlers + omit "Windows cannot handle signals" if Fluent.windows? + + mock(Fluent::Engine).cancel_source_only! + create_info_dummy_logger + + sv = Fluent::Supervisor.new({}) + sv.send(:install_main_process_signal_handlers) + + Process.kill :WINCH, Process.pid + + sleep 1 + + info_msg = "[info]: try to cancel with-source-only mode\n" + assert{ $log.out.logs.first.end_with?(info_msg) } + ensure + $log.out.reset if $log&.out&.respond_to?(:reset) + end + def test_main_process_command_handlers omit "Only for Windows, alternative to UNIX signals" unless Fluent.windows? @@ -327,6 +391,25 @@ def test_term_cont_in_supervisor_signal_handler File.delete(@sigdump_path) if File.exist?(@sigdump_path) end + def test_winch_in_supervisor_signal_handler + omit "Windows cannot handle signals" if Fluent.windows? + + create_debug_dummy_logger + + server = DummyServer.new + server.install_supervisor_signal_handlers + + Process.kill :WINCH, Process.pid + + sleep 1 + + debug_msg = '[debug]: fluentd supervisor process got SIGWINCH' + logs = $log.out.logs + assert{ logs.any?{|log| log.include?(debug_msg) } } + ensure + $log.out.reset if $log&.out&.respond_to?(:reset) + end + def test_windows_shutdown_event omit "Only for Windows platform" unless Fluent.windows?