Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

root_agent: refactor shutdown process #4623

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 60 additions & 48 deletions lib/fluent/root_agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -231,76 +231,88 @@ def flush!
flushing_threads.each{|t| t.join }
end

class ShutdownSequence
attr_reader :method, :checker
def initialize(method, checker, is_safe)
@method = method
@checker = checker
@is_safe = is_safe
end

def safe?
@is_safe
end
end

SHUTDOWN_SEQUENCES = [
ShutdownSequence.new(:stop, :stopped?, true),
# before_shutdown does force_flush for output plugins: it should block, so it's unsafe operation
ShutdownSequence.new(:shutdown, :shutdown?, false),
ShutdownSequence.new(:after_shutdown, :after_shutdown?, true),
ShutdownSequence.new(:close, :closed?, false),
ShutdownSequence.new(:terminate, :terminated?, true),
]

def shutdown # Fluentd's shutdown sequence is stop, before_shutdown, shutdown, after_shutdown, close, terminate for plugins
# These method callers does `rescue Exception` to call methods of shutdown sequence as far as possible
# if plugin methods does something like infinite recursive call, `exit`, unregistering signal handlers or others.
# Plugins should be separated and be in sandbox to protect data in each plugins/buffers.

lifecycle_safe_sequence = ->(method, checker) {
lifecycle do |instance, kind|
begin
log.debug "calling #{method} on #{kind} plugin", type: Plugin.lookup_type_from_class(instance.class), plugin_id: instance.plugin_id
instance.__send__(method) unless instance.__send__(checker)
rescue Exception => e
log.warn "unexpected error while calling #{method} on #{kind} plugin", plugin: instance.class, plugin_id: instance.plugin_id, error: e
log.warn_backtrace
SHUTDOWN_SEQUENCES.each do |sequence|
if sequence.safe?
lifecycle do |instance, kind|
execute_shutdown_sequence(sequence, instance, kind)
end
next
end
}

lifecycle_unsafe_sequence = ->(method, checker) {
operation = case method
when :shutdown then "shutting down"
when :close then "closing"
else
raise "BUG: unknown method name '#{method}'"
end
operation_threads = []
callback = ->(){
operation_threads.each{|t| t.join }
operation_threads.each { |t| t.join }
operation_threads.clear
}
lifecycle(kind_callback: callback) do |instance, kind|
t = Thread.new do
Thread.current.abort_on_exception = true
begin
if method == :shutdown
# To avoid Input#shutdown and Output#before_shutdown mismatch problem, combine before_shutdown and shutdown call in one sequence.
# The problem is in_tail flushes buffered multiline in shutdown but output's flush_at_shutdown is invoked in before_shutdown
operation = "preparing shutdown" # for logging
log.debug "#{operation} #{kind} plugin", type: Plugin.lookup_type_from_class(instance.class), plugin_id: instance.plugin_id
begin
instance.__send__(:before_shutdown) unless instance.__send__(:before_shutdown?)
rescue Exception => e
log.warn "unexpected error while #{operation} on #{kind} plugin", plugin: instance.class, plugin_id: instance.plugin_id, error: e
log.warn_backtrace
end
operation = "shutting down"
log.info "#{operation} #{kind} plugin", type: Plugin.lookup_type_from_class(instance.class), plugin_id: instance.plugin_id
instance.__send__(:shutdown) unless instance.__send__(:shutdown?)
else
log.debug "#{operation} #{kind} plugin", type: Plugin.lookup_type_from_class(instance.class), plugin_id: instance.plugin_id
instance.__send__(method) unless instance.__send__(checker)
end
rescue Exception => e
log.warn "unexpected error while #{operation} on #{kind} plugin", plugin: instance.class, plugin_id: instance.plugin_id, error: e
log.warn_backtrace
end
execute_shutdown_sequence(sequence, instance, kind)
end
operation_threads << t
end
}

lifecycle_safe_sequence.call(:stop, :stopped?)
end
end

# before_shutdown does force_flush for output plugins: it should block, so it's unsafe operation
lifecycle_unsafe_sequence.call(:shutdown, :shutdown?)
def execute_shutdown_sequence(sequence, instance, kind)
unless sequence.method == :shutdown
begin
log.debug "calling #{sequence.method} on #{kind} plugin", type: Plugin.lookup_type_from_class(instance.class), plugin_id: instance.plugin_id
instance.__send__(sequence.method) unless instance.__send__(sequence.checker)
rescue Exception => e
log.warn "unexpected error while calling #{sequence.method} on #{kind} plugin", plugin: instance.class, plugin_id: instance.plugin_id, error: e
log.warn_backtrace
end

lifecycle_safe_sequence.call(:after_shutdown, :after_shutdown?)
return
end

lifecycle_unsafe_sequence.call(:close, :closed?)
# To avoid Input#shutdown and Output#before_shutdown mismatch problem, combine before_shutdown and shutdown call in one sequence.
# The problem is in_tail flushes buffered multiline in shutdown but output's flush_at_shutdown is invoked in before_shutdown
begin
operation = "preparing shutdown" # for logging
log.debug "#{operation} #{kind} plugin", type: Plugin.lookup_type_from_class(instance.class), plugin_id: instance.plugin_id
instance.__send__(:before_shutdown) unless instance.__send__(:before_shutdown?)
rescue Exception => e
log.warn "unexpected error while #{operation} on #{kind} plugin", plugin: instance.class, plugin_id: instance.plugin_id, error: e
log.warn_backtrace
end

lifecycle_safe_sequence.call(:terminate, :terminated?)
begin
operation = "shutting down"
log.info "#{operation} #{kind} plugin", type: Plugin.lookup_type_from_class(instance.class), plugin_id: instance.plugin_id
instance.__send__(:shutdown) unless instance.__send__(sequence.checker)
rescue Exception => e
log.warn "unexpected error while #{operation} on #{kind} plugin", plugin: instance.class, plugin_id: instance.plugin_id, error: e
log.warn_backtrace
end
end

def suppress_interval(interval_time)
Expand Down
Loading