From 7973c2a5d8771b03c7d655b2138d7a64fc2c4a30 Mon Sep 17 00:00:00 2001 From: Daijiro Fukuda Date: Fri, 30 Aug 2024 01:08:29 +0900 Subject: [PATCH] root_agent: refactor shutdown process To make it easier to re-use plugin shutdown process. Signed-off-by: Daijiro Fukuda --- lib/fluent/root_agent.rb | 108 ++++++++++++++++++++++----------------- 1 file changed, 60 insertions(+), 48 deletions(-) diff --git a/lib/fluent/root_agent.rb b/lib/fluent/root_agent.rb index d10f366044..1165dbadcf 100644 --- a/lib/fluent/root_agent.rb +++ b/lib/fluent/root_agent.rb @@ -231,76 +231,88 @@ def flush! flushing_threads.each{|t| t.join } end + class ShutdownSequence + attr_reader :method, :checker + def initialize(method, checker, is_safe) + @method = method + @checker = checker + @is_safe = is_safe + end + + def safe? + @is_safe + end + end + + SHUTDOWN_SEQUENCES = [ + ShutdownSequence.new(:stop, :stopped?, true), + # before_shutdown does force_flush for output plugins: it should block, so it's unsafe operation + ShutdownSequence.new(:shutdown, :shutdown?, false), + ShutdownSequence.new(:after_shutdown, :after_shutdown?, true), + ShutdownSequence.new(:close, :closed?, false), + ShutdownSequence.new(:terminate, :terminated?, true), + ] + def shutdown # Fluentd's shutdown sequence is stop, before_shutdown, shutdown, after_shutdown, close, terminate for plugins # These method callers does `rescue Exception` to call methods of shutdown sequence as far as possible # if plugin methods does something like infinite recursive call, `exit`, unregistering signal handlers or others. # Plugins should be separated and be in sandbox to protect data in each plugins/buffers. - lifecycle_safe_sequence = ->(method, checker) { - lifecycle do |instance, kind| - begin - log.debug "calling #{method} on #{kind} plugin", type: Plugin.lookup_type_from_class(instance.class), plugin_id: instance.plugin_id - instance.__send__(method) unless instance.__send__(checker) - rescue Exception => e - log.warn "unexpected error while calling #{method} on #{kind} plugin", plugin: instance.class, plugin_id: instance.plugin_id, error: e - log.warn_backtrace + SHUTDOWN_SEQUENCES.each do |sequence| + if sequence.safe? + lifecycle do |instance, kind| + execute_shutdown_sequence(sequence, instance, kind) end + next end - } - lifecycle_unsafe_sequence = ->(method, checker) { - operation = case method - when :shutdown then "shutting down" - when :close then "closing" - else - raise "BUG: unknown method name '#{method}'" - end operation_threads = [] callback = ->(){ - operation_threads.each{|t| t.join } + operation_threads.each { |t| t.join } operation_threads.clear } lifecycle(kind_callback: callback) do |instance, kind| t = Thread.new do Thread.current.abort_on_exception = true - begin - if method == :shutdown - # To avoid Input#shutdown and Output#before_shutdown mismatch problem, combine before_shutdown and shutdown call in one sequence. - # The problem is in_tail flushes buffered multiline in shutdown but output's flush_at_shutdown is invoked in before_shutdown - operation = "preparing shutdown" # for logging - log.debug "#{operation} #{kind} plugin", type: Plugin.lookup_type_from_class(instance.class), plugin_id: instance.plugin_id - begin - instance.__send__(:before_shutdown) unless instance.__send__(:before_shutdown?) - rescue Exception => e - log.warn "unexpected error while #{operation} on #{kind} plugin", plugin: instance.class, plugin_id: instance.plugin_id, error: e - log.warn_backtrace - end - operation = "shutting down" - log.info "#{operation} #{kind} plugin", type: Plugin.lookup_type_from_class(instance.class), plugin_id: instance.plugin_id - instance.__send__(:shutdown) unless instance.__send__(:shutdown?) - else - log.debug "#{operation} #{kind} plugin", type: Plugin.lookup_type_from_class(instance.class), plugin_id: instance.plugin_id - instance.__send__(method) unless instance.__send__(checker) - end - rescue Exception => e - log.warn "unexpected error while #{operation} on #{kind} plugin", plugin: instance.class, plugin_id: instance.plugin_id, error: e - log.warn_backtrace - end + execute_shutdown_sequence(sequence, instance, kind) end operation_threads << t end - } - - lifecycle_safe_sequence.call(:stop, :stopped?) + end + end - # before_shutdown does force_flush for output plugins: it should block, so it's unsafe operation - lifecycle_unsafe_sequence.call(:shutdown, :shutdown?) + def execute_shutdown_sequence(sequence, instance, kind) + unless sequence.method == :shutdown + begin + log.debug "calling #{sequence.method} on #{kind} plugin", type: Plugin.lookup_type_from_class(instance.class), plugin_id: instance.plugin_id + instance.__send__(sequence.method) unless instance.__send__(sequence.checker) + rescue Exception => e + log.warn "unexpected error while calling #{sequence.method} on #{kind} plugin", plugin: instance.class, plugin_id: instance.plugin_id, error: e + log.warn_backtrace + end - lifecycle_safe_sequence.call(:after_shutdown, :after_shutdown?) + return + end - lifecycle_unsafe_sequence.call(:close, :closed?) + # To avoid Input#shutdown and Output#before_shutdown mismatch problem, combine before_shutdown and shutdown call in one sequence. + # The problem is in_tail flushes buffered multiline in shutdown but output's flush_at_shutdown is invoked in before_shutdown + begin + operation = "preparing shutdown" # for logging + log.debug "#{operation} #{kind} plugin", type: Plugin.lookup_type_from_class(instance.class), plugin_id: instance.plugin_id + instance.__send__(:before_shutdown) unless instance.__send__(:before_shutdown?) + rescue Exception => e + log.warn "unexpected error while #{operation} on #{kind} plugin", plugin: instance.class, plugin_id: instance.plugin_id, error: e + log.warn_backtrace + end - lifecycle_safe_sequence.call(:terminate, :terminated?) + begin + operation = "shutting down" + log.info "#{operation} #{kind} plugin", type: Plugin.lookup_type_from_class(instance.class), plugin_id: instance.plugin_id + instance.__send__(:shutdown) unless instance.__send__(sequence.checker) + rescue Exception => e + log.warn "unexpected error while #{operation} on #{kind} plugin", plugin: instance.class, plugin_id: instance.plugin_id, error: e + log.warn_backtrace + end end def suppress_interval(interval_time)