Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PoC] Update/Reload without downtime #4654

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions lib/fluent/root_agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,8 @@ def shutdown # Fluentd's shutdown sequence is stop, before_shutdown, shutdown, a
callback = ->(){
operation_threads.each { |t| t.join }
operation_threads.clear
# TODO: サーバープロセスにワーカーが停止したことをシグナル送信。もう少しマシな方法はないのか?
Process.kill 34, Process.ppid
}
lifecycle(kind_callback: callback) do |instance, kind|
t = Thread.new do
Expand Down
69 changes: 58 additions & 11 deletions lib/fluent/supervisor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ def before_run
@rpc_endpoint = nil
@rpc_server = nil
@counter = nil
@socket_manager_server = nil

@fluentd_lock_dir = Dir.mktmpdir("fluentd-lock-")
ENV['FLUENTD_LOCK_DIR'] = @fluentd_lock_dir
Expand All @@ -66,8 +67,12 @@ def before_run
if config[:disable_shared_socket]
$log.info "shared socket for multiple workers is disabled"
else
server = ServerEngine::SocketManager::Server.open
ENV['SERVERENGINE_SOCKETMANAGER_PATH'] = server.path.to_s
if ENV.key?('SERVERENGINE_SOCKETMANAGER_PATH')
@socket_manager_server = ServerEngine::SocketManager::Server.take_over_another_server(ENV['SERVERENGINE_SOCKETMANAGER_PATH'])
else
@socket_manager_server = ServerEngine::SocketManager::Server.open
ENV['SERVERENGINE_SOCKETMANAGER_PATH'] = @socket_manager_server.path.to_s
end
end
end

Expand Down Expand Up @@ -138,7 +143,7 @@ def run_rpc_server
@rpc_server.mount_proc('/api/config.gracefulReload') { |req, res|
$log.debug "fluentd RPC got /api/config.gracefulReload request"
if Fluent.windows?
supervisor_sigusr2_handler
graceful_reload
else
Process.kill :USR2, Process.pid
end
Expand Down Expand Up @@ -186,8 +191,34 @@ def install_supervisor_signal_handlers
end

trap :USR2 do
# ダウンタイム無し更新
$log.debug 'fluentd supervisor process got SIGUSR2'
supervisor_sigusr2_handler

# TODO: Worker を終了させた際に ServerEngine によって自動的に再起動されないようにする
scale_down

# ダウンタイム無し更新、worker をすべて停止
send_signal_to_workers(:TERM)

@prepared_zero_downtime_updating = true
end

trap 34 do
# ワーカーが完全停止したら 34 シグナルを受信する
if @prepared_zero_downtime_updating
# ダウンタイム無し更新の処理途中なら残りを実施する

@prepared_zero_downtime_updating = false

# 新しいプロセスを起動して更新を反映する
start_new_supervisor

# TODO: 少なくとも worker を1つでも起動できるように戻さないと自身が死ななかった
scale_up
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

タイミングによっては、scale_up によってワーカーが起動してしまうかもしれない。
scale_up しなくても「古いプロセス(自分自身)を止める」ことができるようにしないと危ないだろう。


# 古いプロセス(自分自身)を止める
Process.kill :TERM, Process.pid
end
end
end

Expand Down Expand Up @@ -254,7 +285,7 @@ def install_windows_event_handler
when :usr1
supervisor_sigusr1_handler
when :usr2
supervisor_sigusr2_handler
graceful_reload
when :cont
supervisor_dump_handler_for_windows
when :stop_event_thread
Expand Down Expand Up @@ -284,7 +315,7 @@ def supervisor_sigusr1_handler
send_signal_to_workers(:USR1)
end

def supervisor_sigusr2_handler
def graceful_reload
conf = nil
t = Thread.new do
$log.info 'Reloading new config'
Expand Down Expand Up @@ -312,6 +343,14 @@ def supervisor_sigusr2_handler
$log.error "Failed to reload config file: #{e}"
end

def start_new_supervisor
commands = [ServerEngine.ruby_bin_path, $0] + ARGV
env_to_add = {"SERVERENGINE_SOCKETMANAGER_INTERNAL_TOKEN" => ServerEngine::SocketManager::INTERNAL_TOKEN}
Process.spawn(env_to_add, commands.join(" "))
rescue => e
$log.error "Failed to start a new supervisor: #{e}"
end

def supervisor_dump_handler_for_windows
# As for UNIX-like, SIGCONT signal to each process makes the process output its dump-file,
# and it is implemented before the implementation of the function for Windows.
Expand All @@ -332,6 +371,13 @@ def supervisor_dump_handler_for_windows
$log.error "failed to dump: #{e}"
end

def scale_down
self.scale_workers(0)
end
def scale_up
self.scale_workers(1)
end

def kill_worker
if config[:worker_pid]
pids = config[:worker_pid].clone
Expand Down Expand Up @@ -500,11 +546,12 @@ def self.default_options
end

def self.cleanup_resources
unless Fluent.windows?
if ENV.has_key?('SERVERENGINE_SOCKETMANAGER_PATH')
FileUtils.rm_f(ENV['SERVERENGINE_SOCKETMANAGER_PATH'])
end
end
# TODO: 2回目の USR2 シグナル発火時に SERVERENGINE_SOCKETMANAGER_PATH のファイルが見つからずに起動に失敗するためコメントアウト
# unless Fluent.windows?
# if ENV.has_key?('SERVERENGINE_SOCKETMANAGER_PATH')
# FileUtils.rm_f(ENV['SERVERENGINE_SOCKETMANAGER_PATH'])
# end
# end
end

def initialize(cl_opt)
Expand Down
Loading