Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PoC] Update/Reload without downtime #4654

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 62 additions & 48 deletions lib/fluent/root_agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -231,76 +231,90 @@ def flush!
flushing_threads.each{|t| t.join }
end

class ShutdownSequence
attr_reader :method, :checker
def initialize(method, checker, is_safe)
@method = method
@checker = checker
@is_safe = is_safe
end

def safe?
@is_safe
end
end

SHUTDOWN_SEQUENCES = [
ShutdownSequence.new(:stop, :stopped?, true),
# before_shutdown does force_flush for output plugins: it should block, so it's unsafe operation
ShutdownSequence.new(:shutdown, :shutdown?, false),
ShutdownSequence.new(:after_shutdown, :after_shutdown?, true),
ShutdownSequence.new(:close, :closed?, false),
ShutdownSequence.new(:terminate, :terminated?, true),
]

def shutdown # Fluentd's shutdown sequence is stop, before_shutdown, shutdown, after_shutdown, close, terminate for plugins
# These method callers does `rescue Exception` to call methods of shutdown sequence as far as possible
# if plugin methods does something like infinite recursive call, `exit`, unregistering signal handlers or others.
# Plugins should be separated and be in sandbox to protect data in each plugins/buffers.

lifecycle_safe_sequence = ->(method, checker) {
lifecycle do |instance, kind|
begin
log.debug "calling #{method} on #{kind} plugin", type: Plugin.lookup_type_from_class(instance.class), plugin_id: instance.plugin_id
instance.__send__(method) unless instance.__send__(checker)
rescue Exception => e
log.warn "unexpected error while calling #{method} on #{kind} plugin", plugin: instance.class, plugin_id: instance.plugin_id, error: e
log.warn_backtrace
SHUTDOWN_SEQUENCES.each do |sequence|
if sequence.safe?
lifecycle do |instance, kind|
execute_shutdown_sequence(sequence, instance, kind)
end
next
end
}

lifecycle_unsafe_sequence = ->(method, checker) {
operation = case method
when :shutdown then "shutting down"
when :close then "closing"
else
raise "BUG: unknown method name '#{method}'"
end
operation_threads = []
callback = ->(){
operation_threads.each{|t| t.join }
operation_threads.each { |t| t.join }
operation_threads.clear
# TODO: サーバープロセスにワーカーが停止したことをシグナル送信。もう少しマシな方法はないのか?
Process.kill 34, Process.ppid
}
lifecycle(kind_callback: callback) do |instance, kind|
t = Thread.new do
Thread.current.abort_on_exception = true
begin
if method == :shutdown
# To avoid Input#shutdown and Output#before_shutdown mismatch problem, combine before_shutdown and shutdown call in one sequence.
# The problem is in_tail flushes buffered multiline in shutdown but output's flush_at_shutdown is invoked in before_shutdown
operation = "preparing shutdown" # for logging
log.debug "#{operation} #{kind} plugin", type: Plugin.lookup_type_from_class(instance.class), plugin_id: instance.plugin_id
begin
instance.__send__(:before_shutdown) unless instance.__send__(:before_shutdown?)
rescue Exception => e
log.warn "unexpected error while #{operation} on #{kind} plugin", plugin: instance.class, plugin_id: instance.plugin_id, error: e
log.warn_backtrace
end
operation = "shutting down"
log.info "#{operation} #{kind} plugin", type: Plugin.lookup_type_from_class(instance.class), plugin_id: instance.plugin_id
instance.__send__(:shutdown) unless instance.__send__(:shutdown?)
else
log.debug "#{operation} #{kind} plugin", type: Plugin.lookup_type_from_class(instance.class), plugin_id: instance.plugin_id
instance.__send__(method) unless instance.__send__(checker)
end
rescue Exception => e
log.warn "unexpected error while #{operation} on #{kind} plugin", plugin: instance.class, plugin_id: instance.plugin_id, error: e
log.warn_backtrace
end
execute_shutdown_sequence(sequence, instance, kind)
end
operation_threads << t
end
}

lifecycle_safe_sequence.call(:stop, :stopped?)
end
end

# before_shutdown does force_flush for output plugins: it should block, so it's unsafe operation
lifecycle_unsafe_sequence.call(:shutdown, :shutdown?)
def execute_shutdown_sequence(sequence, instance, kind)
unless sequence.method == :shutdown
begin
log.debug "calling #{sequence.method} on #{kind} plugin", type: Plugin.lookup_type_from_class(instance.class), plugin_id: instance.plugin_id
instance.__send__(sequence.method) unless instance.__send__(sequence.checker)
rescue Exception => e
log.warn "unexpected error while calling #{sequence.method} on #{kind} plugin", plugin: instance.class, plugin_id: instance.plugin_id, error: e
log.warn_backtrace
end

lifecycle_safe_sequence.call(:after_shutdown, :after_shutdown?)
return
end

lifecycle_unsafe_sequence.call(:close, :closed?)
# To avoid Input#shutdown and Output#before_shutdown mismatch problem, combine before_shutdown and shutdown call in one sequence.
# The problem is in_tail flushes buffered multiline in shutdown but output's flush_at_shutdown is invoked in before_shutdown
begin
operation = "preparing shutdown" # for logging
log.debug "#{operation} #{kind} plugin", type: Plugin.lookup_type_from_class(instance.class), plugin_id: instance.plugin_id
instance.__send__(:before_shutdown) unless instance.__send__(:before_shutdown?)
rescue Exception => e
log.warn "unexpected error while #{operation} on #{kind} plugin", plugin: instance.class, plugin_id: instance.plugin_id, error: e
log.warn_backtrace
end

lifecycle_safe_sequence.call(:terminate, :terminated?)
begin
operation = "shutting down"
log.info "#{operation} #{kind} plugin", type: Plugin.lookup_type_from_class(instance.class), plugin_id: instance.plugin_id
instance.__send__(:shutdown) unless instance.__send__(sequence.checker)
rescue Exception => e
log.warn "unexpected error while #{operation} on #{kind} plugin", plugin: instance.class, plugin_id: instance.plugin_id, error: e
log.warn_backtrace
end
end

def suppress_interval(interval_time)
Expand Down
69 changes: 58 additions & 11 deletions lib/fluent/supervisor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ def before_run
@rpc_endpoint = nil
@rpc_server = nil
@counter = nil
@socket_manager_server = nil

@fluentd_lock_dir = Dir.mktmpdir("fluentd-lock-")
ENV['FLUENTD_LOCK_DIR'] = @fluentd_lock_dir
Expand All @@ -66,8 +67,12 @@ def before_run
if config[:disable_shared_socket]
$log.info "shared socket for multiple workers is disabled"
else
server = ServerEngine::SocketManager::Server.open
ENV['SERVERENGINE_SOCKETMANAGER_PATH'] = server.path.to_s
if ENV.key?('SERVERENGINE_SOCKETMANAGER_PATH')
@socket_manager_server = ServerEngine::SocketManager::Server.take_over_another_server(ENV['SERVERENGINE_SOCKETMANAGER_PATH'])
else
@socket_manager_server = ServerEngine::SocketManager::Server.open
ENV['SERVERENGINE_SOCKETMANAGER_PATH'] = @socket_manager_server.path.to_s
end
end
end

Expand Down Expand Up @@ -138,7 +143,7 @@ def run_rpc_server
@rpc_server.mount_proc('/api/config.gracefulReload') { |req, res|
$log.debug "fluentd RPC got /api/config.gracefulReload request"
if Fluent.windows?
supervisor_sigusr2_handler
graceful_reload
else
Process.kill :USR2, Process.pid
end
Expand Down Expand Up @@ -186,8 +191,34 @@ def install_supervisor_signal_handlers
end

trap :USR2 do
# ダウンタイム無し更新
$log.debug 'fluentd supervisor process got SIGUSR2'
supervisor_sigusr2_handler

# TODO: Worker を終了させた際に ServerEngine によって自動的に再起動されないようにする
scale_down

# ダウンタイム無し更新、worker をすべて停止
send_signal_to_workers(:TERM)

@prepared_zero_downtime_updating = true
end

trap 34 do
# ワーカーが完全停止したら 34 シグナルを受信する
if @prepared_zero_downtime_updating
# ダウンタイム無し更新の処理途中なら残りを実施する

@prepared_zero_downtime_updating = false

# 新しいプロセスを起動して更新を反映する
start_new_supervisor

# TODO: 少なくとも worker を1つでも起動できるように戻さないと自身が死ななかった
scale_up
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

タイミングによっては、scale_up によってワーカーが起動してしまうかもしれない。
scale_up しなくても「古いプロセス(自分自身)を止める」ことができるようにしないと危ないだろう。


# 古いプロセス(自分自身)を止める
Process.kill :TERM, Process.pid
end
end
end

Expand Down Expand Up @@ -254,7 +285,7 @@ def install_windows_event_handler
when :usr1
supervisor_sigusr1_handler
when :usr2
supervisor_sigusr2_handler
graceful_reload
when :cont
supervisor_dump_handler_for_windows
when :stop_event_thread
Expand Down Expand Up @@ -284,7 +315,7 @@ def supervisor_sigusr1_handler
send_signal_to_workers(:USR1)
end

def supervisor_sigusr2_handler
def graceful_reload
conf = nil
t = Thread.new do
$log.info 'Reloading new config'
Expand Down Expand Up @@ -312,6 +343,14 @@ def supervisor_sigusr2_handler
$log.error "Failed to reload config file: #{e}"
end

def start_new_supervisor
commands = [ServerEngine.ruby_bin_path, $0] + ARGV
env_to_add = {"SERVERENGINE_SOCKETMANAGER_INTERNAL_TOKEN" => ServerEngine::SocketManager::INTERNAL_TOKEN}
Process.spawn(env_to_add, commands.join(" "))
rescue => e
$log.error "Failed to start a new supervisor: #{e}"
end

def supervisor_dump_handler_for_windows
# As for UNIX-like, SIGCONT signal to each process makes the process output its dump-file,
# and it is implemented before the implementation of the function for Windows.
Expand All @@ -332,6 +371,13 @@ def supervisor_dump_handler_for_windows
$log.error "failed to dump: #{e}"
end

def scale_down
self.scale_workers(0)
end
def scale_up
self.scale_workers(1)
end

def kill_worker
if config[:worker_pid]
pids = config[:worker_pid].clone
Expand Down Expand Up @@ -500,11 +546,12 @@ def self.default_options
end

def self.cleanup_resources
unless Fluent.windows?
if ENV.has_key?('SERVERENGINE_SOCKETMANAGER_PATH')
FileUtils.rm_f(ENV['SERVERENGINE_SOCKETMANAGER_PATH'])
end
end
# TODO: 2回目の USR2 シグナル発火時に SERVERENGINE_SOCKETMANAGER_PATH のファイルが見つからずに起動に失敗するためコメントアウト
# unless Fluent.windows?
# if ENV.has_key?('SERVERENGINE_SOCKETMANAGER_PATH')
# FileUtils.rm_f(ENV['SERVERENGINE_SOCKETMANAGER_PATH'])
# end
# end
end

def initialize(cl_opt)
Expand Down
Loading