From dc5e56748fb125a6239ea48c88dc3e0c215cb1bf Mon Sep 17 00:00:00 2001 From: Daijiro Fukuda Date: Thu, 3 Oct 2024 09:54:51 +0900 Subject: [PATCH] Restart without downtime Add a new feature: Update/Reload without downtime. 1. The current supervisor receives a signal. 2. The current supervisor sends signals to its workers, and the workers stop all plugins that cannot run in parallel. 3. The current supervisor starts a new supervisor. * => Old processes and new processes run in parallel. 4. After the new supervisor and its workers start to work, the current supervisor and its workers stop. ref: nginx's feature for upgrading on the fly * http://nginx.org/en/docs/control.html#upgrade Problem to solve: Updating Fluentd or reloading a config causes downtime. Plugins that receive data as a server, such as `in_udp`, `in_tcp`, and `in_syslog`, cannot receive data during this time. This means that the data sent by a client is lost during this time unless the client has a re-sending feature. This makes updating Fluentd or reloading a config difficult in some cases. Specific feature: Run only limited Input plugins in parallel, such as `in_tcp`, `in_udp`, and `in_syslog`. Stop all plugins except those Input plugins, and prepare an agent for forwarding data to the new workers. After the new workers start, they receive events from the old workers. Note: need https://github.com/treasure-data/serverengine/pull/146 Signed-off-by: Daijiro Fukuda --- lib/fluent/engine.rb | 4 + lib/fluent/plugin/in_tcp.rb | 4 + lib/fluent/plugin/in_udp.rb | 4 + lib/fluent/plugin/input.rb | 9 +++ lib/fluent/plugin_helper/event_emitter.rb | 8 ++ lib/fluent/root_agent.rb | 98 ++++++++++++++++++++++- lib/fluent/supervisor.rb | 47 +++++++++-- 7 files changed, 163 insertions(+), 11 deletions(-) diff --git a/lib/fluent/engine.rb b/lib/fluent/engine.rb index afac3167ca..5e45084a39 100644 --- a/lib/fluent/engine.rb +++ b/lib/fluent/engine.rb @@ -136,6 +136,10 @@ def flush! @root_agent.flush! end + def shift_to_limited_mode! + @root_agent.shift_to_limited_mode! + end + def now # TODO thread update Fluent::EventTime.now diff --git a/lib/fluent/plugin/in_tcp.rb b/lib/fluent/plugin/in_tcp.rb index bd2ea83e5b..ccc28209ec 100644 --- a/lib/fluent/plugin/in_tcp.rb +++ b/lib/fluent/plugin/in_tcp.rb @@ -101,6 +101,10 @@ def multi_workers_ready? true end + def limited_mode_ready? + true + end + def start super diff --git a/lib/fluent/plugin/in_udp.rb b/lib/fluent/plugin/in_udp.rb index c2d436115f..dd65526ffa 100644 --- a/lib/fluent/plugin/in_udp.rb +++ b/lib/fluent/plugin/in_udp.rb @@ -65,6 +65,10 @@ def multi_workers_ready? true end + def limited_mode_ready? + true + end + def start super diff --git a/lib/fluent/plugin/input.rb b/lib/fluent/plugin/input.rb index 7a6909f7a9..347e742f7c 100644 --- a/lib/fluent/plugin/input.rb +++ b/lib/fluent/plugin/input.rb @@ -70,6 +70,15 @@ def metric_callback(es) def multi_workers_ready? false end + + def limited_mode_ready? + false + end + + def shift_to_limited_mode! + raise "BUG: use shift_to_limited_mode although this plugin is not ready for the limited mode" unless limited_mode_ready? + event_emitter_force_limited_router + end end end end diff --git a/lib/fluent/plugin_helper/event_emitter.rb b/lib/fluent/plugin_helper/event_emitter.rb index ba089e485a..5c573d7299 100644 --- a/lib/fluent/plugin_helper/event_emitter.rb +++ b/lib/fluent/plugin_helper/event_emitter.rb @@ -26,6 +26,9 @@ module EventEmitter def router @_event_emitter_used_actually = true + + return Engine.root_agent.limited_router if @_event_emitter_force_limited_router + if @_event_emitter_lazy_init @router = @primary_instance.router end @@ -48,6 +51,10 @@ def event_emitter_used_actually? @_event_emitter_used_actually end + def event_emitter_force_limited_router + @_event_emitter_force_limited_router = true + end + def event_emitter_router(label_name) if label_name if label_name == "@ROOT" @@ -72,6 +79,7 @@ def initialize super @_event_emitter_used_actually = false @_event_emitter_lazy_init = false + @_event_emitter_force_limited_router = false @router = nil end diff --git a/lib/fluent/root_agent.rb b/lib/fluent/root_agent.rb index 1165dbadcf..c301737fee 100644 --- a/lib/fluent/root_agent.rb +++ b/lib/fluent/root_agent.rb @@ -60,10 +60,16 @@ def initialize(log:, system_config: SystemConfig.new) suppress_interval(system_config.emit_error_log_interval) unless system_config.emit_error_log_interval.nil? @without_source = system_config.without_source unless system_config.without_source.nil? @enable_input_metrics = !!system_config.enable_input_metrics + + @limited_mode_agent = nil + @limited_router = nil + @limited_mode_forwarding_port = "29140" + @limited_mode_forwarding_buf_path = File.join(system_config.root_dir || DEFAULT_BACKUP_DIR, "limited_mode_buffer") end attr_reader :inputs attr_reader :labels + attr_reader :limited_router def configure(conf) used_worker_ids = [] @@ -161,6 +167,9 @@ def configure(conf) add_source(type, e) } end + + # TODO Stop doing this when it is not needed. + add_source_to_receive_from_limited_mode_agent end def setup_error_label(e) @@ -192,10 +201,15 @@ def lifecycle(desc: false, kind_callback: nil) yield instance, display_kind end end - if kind_callback - kind_callback.call - end + + kind_callback&.call + end + + return unless @limited_mode_agent + @limited_mode_agent.lifecycle do |plugin, display_kind| + yield plugin, display_kind end + kind_callback&.call end def start @@ -231,6 +245,84 @@ def flush! flushing_threads.each{|t| t.join } end + def shift_to_limited_mode! + log.info "shifts to the limited mode" + + limited_mode_agent = create_limited_mode_agent + @limited_router = limited_mode_agent.event_router + limited_mode_agent.lifecycle(desc: true) do |plugin| + plugin.start unless plugin.started? + plugin.after_start unless plugin.after_started? + end + + lifecycle_control_list[:input].select do |instance| + instance.limited_mode_ready? + end.each do |instance| + instance.shift_to_limited_mode! + end + + SHUTDOWN_SEQUENCES.each do |sequence| + if sequence.safe? + lifecycle do |instance, kind| + next if kind == :input and instance.limited_mode_ready? + execute_shutdown_sequence(sequence, instance, kind) + end + next + end + + operation_threads = [] + callback = ->(){ + operation_threads.each { |t| t.join } + operation_threads.clear + } + lifecycle(kind_callback: callback) do |instance, kind| + next if kind == :input and instance.limited_mode_ready? + t = Thread.new do + Thread.current.abort_on_exception = true + execute_shutdown_sequence(sequence, instance, kind) + end + operation_threads << t + end + end + + @limited_mode_agent = limited_mode_agent + end + + def create_limited_mode_agent + limited_mode_agent = Agent.new(log: log) + limited_mode_agent.configure( + Config::Element.new('LIMITED_MODE_OUTPUT', '', {}, [ + Config::Element.new('match', '**', {'@type' => 'forward'}, [ + Config::Element.new('server', '', { + 'host' => 'localhost', + 'port' => @limited_mode_forwarding_port, + }, []), + Config::Element.new('buffer', '', { + '@type' => 'file', + 'path' => @limited_mode_forwarding_buf_path, + 'flush_at_shutdown' => 'true', + 'retry_type' => 'periodic', + 'retry_wait' => '10s', + 'retry_randomize' => 'false', + }, []), + ]) + ]) + ) + limited_mode_agent + end + + def add_source_to_receive_from_limited_mode_agent + add_source( + 'forward', + Config::Element.new('source', '', { + '@type' => 'forward', + 'bind' => 'localhost', + 'port' => @limited_mode_forwarding_port, + }, [] + ), + ) + end + class ShutdownSequence attr_reader :method, :checker def initialize(method, checker, is_safe) diff --git a/lib/fluent/supervisor.rb b/lib/fluent/supervisor.rb index 5c1018c9c7..421939a25f 100644 --- a/lib/fluent/supervisor.rb +++ b/lib/fluent/supervisor.rb @@ -43,6 +43,8 @@ def before_run @rpc_endpoint = nil @rpc_server = nil @counter = nil + @socket_manager_server = nil + @is_limited_mode = false @fluentd_lock_dir = Dir.mktmpdir("fluentd-lock-") ENV['FLUENTD_LOCK_DIR'] = @fluentd_lock_dir @@ -66,8 +68,12 @@ def before_run if config[:disable_shared_socket] $log.info "shared socket for multiple workers is disabled" else - server = ServerEngine::SocketManager::Server.open - ENV['SERVERENGINE_SOCKETMANAGER_PATH'] = server.path.to_s + if ENV.key?('SERVERENGINE_SOCKETMANAGER_PATH') + @socket_manager_server = ServerEngine::SocketManager::Server.take_over_another_server(ENV['SERVERENGINE_SOCKETMANAGER_PATH']) + else + @socket_manager_server = ServerEngine::SocketManager::Server.open + ENV['SERVERENGINE_SOCKETMANAGER_PATH'] = @socket_manager_server.path.to_s + end end end @@ -76,7 +82,7 @@ def after_run stop_rpc_server if @rpc_endpoint stop_counter_server if @counter cleanup_lock_dir - Fluent::Supervisor.cleanup_resources + Fluent::Supervisor.cleanup_resources unless @is_limited_mode end def cleanup_lock_dir @@ -138,7 +144,7 @@ def run_rpc_server @rpc_server.mount_proc('/api/config.gracefulReload') { |req, res| $log.debug "fluentd RPC got /api/config.gracefulReload request" if Fluent.windows? - supervisor_sigusr2_handler + graceful_reload else Process.kill :USR2, Process.pid end @@ -187,7 +193,11 @@ def install_supervisor_signal_handlers trap :USR2 do $log.debug 'fluentd supervisor process got SIGUSR2' - supervisor_sigusr2_handler + if Fluent.windows? + graceful_reload + else + start_new_supervisor + end end end @@ -254,7 +264,7 @@ def install_windows_event_handler when :usr1 supervisor_sigusr1_handler when :usr2 - supervisor_sigusr2_handler + graceful_reload when :cont supervisor_dump_handler_for_windows when :stop_event_thread @@ -284,7 +294,7 @@ def supervisor_sigusr1_handler send_signal_to_workers(:USR1) end - def supervisor_sigusr2_handler + def graceful_reload conf = nil t = Thread.new do $log.info 'Reloading new config' @@ -312,6 +322,17 @@ def supervisor_sigusr2_handler $log.error "Failed to reload config file: #{e}" end + def start_new_supervisor + send_signal_to_workers(:USR2) + sleep 5 # TODO Wait until all workers finish shifting to the limited mode. How? + @is_limited_mode = true + commands = [ServerEngine.ruby_bin_path, $0] + ARGV + env_to_add = {"SERVERENGINE_SOCKETMANAGER_INTERNAL_TOKEN" => ServerEngine::SocketManager::INTERNAL_TOKEN} + Process.spawn(env_to_add, commands.join(" ")) + rescue => e + $log.error "Failed to start a new supervisor: #{e}" + end + def supervisor_dump_handler_for_windows # As for UNIX-like, SIGCONT signal to each process makes the process output its dump-file, # and it is implemented before the implementation of the function for Windows. @@ -832,7 +853,7 @@ def install_main_process_signal_handlers end trap :USR2 do - reload_config + shift_to_limited_mode end trap :CONT do @@ -891,6 +912,16 @@ def flush_buffer end end + def shift_to_limited_mode + Thread.new do + begin + Fluent::Engine.shift_to_limited_mode! + rescue Exception => e + $log.warn "failed to shift to the limited mode: #{e}" + end + end + end + def reload_config Thread.new do $log.debug('worker got SIGUSR2')