Skip to content

Commit

Permalink
root_agent: refactor shutdown process
Browse files Browse the repository at this point in the history
To make it easier to re-use plugin shutdown process.

Signed-off-by: Daijiro Fukuda <[email protected]>
  • Loading branch information
daipom committed Aug 30, 2024
1 parent 7f13c7a commit 6218372
Showing 1 changed file with 60 additions and 48 deletions.
108 changes: 60 additions & 48 deletions lib/fluent/root_agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -231,76 +231,88 @@ def flush!
flushing_threads.each{|t| t.join }
end

class ShutdownSequence
attr_reader :method, :checker
def initialize(method, checker, is_safe)
@method = method
@checker = checker
@is_safe = is_safe
end

def safe?
@is_safe
end
end

SHUTDOWN_SEQUENCES = [
ShutdownSequence.new(:stop, :stopped?, true),
# before_shutdown does force_flush for output plugins: it should block, so it's unsafe operation
ShutdownSequence.new(:shutdown, :shutdown?, false),
ShutdownSequence.new(:after_shutdown, :after_shutdown?, true),
ShutdownSequence.new(:close, :closed?, false),
ShutdownSequence.new(:terminate, :terminated?, true),
]

def shutdown # Fluentd's shutdown sequence is stop, before_shutdown, shutdown, after_shutdown, close, terminate for plugins
# These method callers does `rescue Exception` to call methods of shutdown sequence as far as possible
# if plugin methods does something like infinite recursive call, `exit`, unregistering signal handlers or others.
# Plugins should be separated and be in sandbox to protect data in each plugins/buffers.

lifecycle_safe_sequence = ->(method, checker) {
lifecycle do |instance, kind|
begin
log.debug "calling #{method} on #{kind} plugin", type: Plugin.lookup_type_from_class(instance.class), plugin_id: instance.plugin_id
instance.__send__(method) unless instance.__send__(checker)
rescue Exception => e
log.warn "unexpected error while calling #{method} on #{kind} plugin", plugin: instance.class, plugin_id: instance.plugin_id, error: e
log.warn_backtrace
SHUTDOWN_SEQUENCES.each do |sequence|
if sequence.safe?
lifecycle do |instance, kind|
execute_shutdown_sequence(sequence, instance, kind)
end
next
end
}

lifecycle_unsafe_sequence = ->(method, checker) {
operation = case method
when :shutdown then "shutting down"
when :close then "closing"
else
raise "BUG: unknown method name '#{method}'"
end
operation_threads = []
callback = ->(){
operation_threads.each{|t| t.join }
operation_threads.each { |t| t.join }
operation_threads.clear
}
lifecycle(kind_callback: callback) do |instance, kind|
t = Thread.new do
Thread.current.abort_on_exception = true
begin
if method == :shutdown
# To avoid Input#shutdown and Output#before_shutdown mismatch problem, combine before_shutdown and shutdown call in one sequence.
# The problem is in_tail flushes buffered multiline in shutdown but output's flush_at_shutdown is invoked in before_shutdown
operation = "preparing shutdown" # for logging
log.debug "#{operation} #{kind} plugin", type: Plugin.lookup_type_from_class(instance.class), plugin_id: instance.plugin_id
begin
instance.__send__(:before_shutdown) unless instance.__send__(:before_shutdown?)
rescue Exception => e
log.warn "unexpected error while #{operation} on #{kind} plugin", plugin: instance.class, plugin_id: instance.plugin_id, error: e
log.warn_backtrace
end
operation = "shutting down"
log.info "#{operation} #{kind} plugin", type: Plugin.lookup_type_from_class(instance.class), plugin_id: instance.plugin_id
instance.__send__(:shutdown) unless instance.__send__(:shutdown?)
else
log.debug "#{operation} #{kind} plugin", type: Plugin.lookup_type_from_class(instance.class), plugin_id: instance.plugin_id
instance.__send__(method) unless instance.__send__(checker)
end
rescue Exception => e
log.warn "unexpected error while #{operation} on #{kind} plugin", plugin: instance.class, plugin_id: instance.plugin_id, error: e
log.warn_backtrace
end
execute_shutdown_sequence(sequence, instance, kind)
end
operation_threads << t
end
}

lifecycle_safe_sequence.call(:stop, :stopped?)
end
end

# before_shutdown does force_flush for output plugins: it should block, so it's unsafe operation
lifecycle_unsafe_sequence.call(:shutdown, :shutdown?)
def execute_shutdown_sequence(sequence, instance, kind)
unless sequence.method == :shutdown
begin
log.debug "calling #{sequence.method} on #{kind} plugin", type: Plugin.lookup_type_from_class(instance.class), plugin_id: instance.plugin_id
instance.__send__(sequence.method) unless instance.__send__(sequence.checker)
rescue Exception => e
log.warn "unexpected error while calling #{sequence.method} on #{kind} plugin", plugin: instance.class, plugin_id: instance.plugin_id, error: e
log.warn_backtrace
end

lifecycle_safe_sequence.call(:after_shutdown, :after_shutdown?)
return
end

lifecycle_unsafe_sequence.call(:close, :closed?)
# To avoid Input#shutdown and Output#before_shutdown mismatch problem, combine before_shutdown and shutdown call in one sequence.
# The problem is in_tail flushes buffered multiline in shutdown but output's flush_at_shutdown is invoked in before_shutdown
begin
operation = "preparing shutdown" # for logging
log.debug "#{operation} #{kind} plugin", type: Plugin.lookup_type_from_class(instance.class), plugin_id: instance.plugin_id
instance.__send__(:before_shutdown) unless instance.__send__(:before_shutdown?)
rescue Exception => e
log.warn "unexpected error while #{operation} on #{kind} plugin", plugin: instance.class, plugin_id: instance.plugin_id, error: e
log.warn_backtrace
end

lifecycle_safe_sequence.call(:terminate, :terminated?)
begin
operation = "shutting down"
log.info "#{operation} #{kind} plugin", type: Plugin.lookup_type_from_class(instance.class), plugin_id: instance.plugin_id
instance.__send__(:shutdown) unless instance.__send__(sequence.checker)
rescue Exception => e
log.warn "unexpected error while #{operation} on #{kind} plugin", plugin: instance.class, plugin_id: instance.plugin_id, error: e
log.warn_backtrace
end
end

def suppress_interval(interval_time)
Expand Down

0 comments on commit 6218372

Please sign in to comment.