diff --git a/lib/fluent/engine.rb b/lib/fluent/engine.rb index b6677c6443..263d1cfc5b 100644 --- a/lib/fluent/engine.rb +++ b/lib/fluent/engine.rb @@ -51,7 +51,7 @@ def initialize attr_reader :root_agent, :system_config, :supervisor_mode - def init(system_config, supervisor_mode: false) + def init(system_config, supervisor_mode: false, start_in_parallel: false) @system_config = system_config @supervisor_mode = supervisor_mode @@ -60,7 +60,7 @@ def init(system_config, supervisor_mode: false) @log_event_verbose = system_config.log_event_verbose unless system_config.log_event_verbose.nil? - @root_agent = RootAgent.new(log: log, system_config: @system_config) + @root_agent = RootAgent.new(log: log, system_config: @system_config, start_in_parallel: start_in_parallel) self end diff --git a/lib/fluent/plugin/in_syslog.rb b/lib/fluent/plugin/in_syslog.rb index 28ad1c5dd9..3d4b205c98 100644 --- a/lib/fluent/plugin/in_syslog.rb +++ b/lib/fluent/plugin/in_syslog.rb @@ -156,6 +156,10 @@ def multi_workers_ready? true end + def zero_downtime_restart_ready? + true + end + def start super diff --git a/lib/fluent/plugin/in_tcp.rb b/lib/fluent/plugin/in_tcp.rb index bd2ea83e5b..9ecb6b793d 100644 --- a/lib/fluent/plugin/in_tcp.rb +++ b/lib/fluent/plugin/in_tcp.rb @@ -101,6 +101,10 @@ def multi_workers_ready? true end + def zero_downtime_restart_ready? + true + end + def start super diff --git a/lib/fluent/plugin/in_udp.rb b/lib/fluent/plugin/in_udp.rb index c2d436115f..645adf8f08 100644 --- a/lib/fluent/plugin/in_udp.rb +++ b/lib/fluent/plugin/in_udp.rb @@ -65,6 +65,10 @@ def multi_workers_ready? true end + def zero_downtime_restart_ready? + true + end + def start super diff --git a/lib/fluent/plugin/input.rb b/lib/fluent/plugin/input.rb index 7a6909f7a9..d465ed3a22 100644 --- a/lib/fluent/plugin/input.rb +++ b/lib/fluent/plugin/input.rb @@ -70,6 +70,10 @@ def metric_callback(es) def multi_workers_ready? false end + + def zero_downtime_restart_ready? + false + end end end end diff --git a/lib/fluent/root_agent.rb b/lib/fluent/root_agent.rb index c66ccb489c..0a7a34b1cb 100644 --- a/lib/fluent/root_agent.rb +++ b/lib/fluent/root_agent.rb @@ -48,7 +48,35 @@ module Fluent class RootAgent < Agent ERROR_LABEL = "@ERROR".freeze # @ERROR is built-in error label - def initialize(log:, system_config: SystemConfig.new) + class SourceOnlyMode + DISABELD = 0 + NORMAL = 1 + ONLY_ZERO_DOWNTIME_RESTART_READY = 2 + + def initialize(with_source_only, start_in_parallel) + if start_in_parallel + @mode = ONLY_ZERO_DOWNTIME_RESTART_READY + elsif with_source_only + @mode = NORMAL + else + @mode = DISABELD + end + end + + def enabled? + @mode != DISABELD + end + + def only_zero_downtime_restart_ready? + @mode == ONLY_ZERO_DOWNTIME_RESTART_READY + end + + def disable! + @mode = DISABELD + end + end + + def initialize(log:, system_config: SystemConfig.new, start_in_parallel: false) super(log: log) @labels = {} @@ -56,7 +84,7 @@ def initialize(log:, system_config: SystemConfig.new) @suppress_emit_error_log_interval = 0 @next_emit_error_log_time = nil @without_source = system_config.without_source || false - @with_source_only = system_config.with_source_only || false + @source_only_mode = SourceOnlyMode.new(system_config.with_source_only, start_in_parallel) @source_only_buffer_agent = nil @enable_input_metrics = system_config.enable_input_metrics || false @@ -67,7 +95,7 @@ def initialize(log:, system_config: SystemConfig.new) attr_reader :labels def source_only_router - raise "[BUG] 'RootAgent#source_only_router' should not be called when 'with_source_only' is false" unless @with_source_only + raise "[BUG] 'RootAgent#source_only_router' should not be called when 'with_source_only' is false" unless @source_only_mode.enabled? @source_only_buffer_agent.event_router end @@ -154,7 +182,7 @@ def configure(conf) super - setup_source_only_buffer_agent if @with_source_only + setup_source_only_buffer_agent if @source_only_mode.enabled? # initialize elements if @without_source @@ -187,9 +215,12 @@ def cleanup_source_only_buffer_agent end def lifecycle(desc: false, kind_callback: nil, kind_or_agent_list: nil) + only_zero_downtime_restart_ready = false + unless kind_or_agent_list - if @with_source_only + if @source_only_mode.enabled? kind_or_agent_list = [:input, @source_only_buffer_agent] + only_zero_downtime_restart_ready = @source_only_mode.only_zero_downtime_restart_ready? elsif @source_only_buffer_agent # source_only_buffer_agent can re-reroute events, so the priority is equal to output_with_router. kind_or_agent_list = [:input, :output_with_router, @source_only_buffer_agent, @labels.values, :filter, :output].flatten @@ -214,6 +245,9 @@ def lifecycle(desc: false, kind_callback: nil, kind_or_agent_list: nil) end display_kind = (kind == :output_with_router ? :output : kind) list.each do |instance| + if only_zero_downtime_restart_ready + next unless instance.respond_to?(:zero_downtime_restart_ready?) and instance.zero_downtime_restart_ready? + end yield instance, display_kind end end @@ -257,7 +291,7 @@ def flush! end def cancel_source_only! - unless @with_source_only + unless @source_only_mode.enabled? log.info "do nothing for canceling with-source-only because the current mode is not with-source-only." return end @@ -285,7 +319,7 @@ def cancel_source_only! setup_source_only_buffer_agent(flush: true) start(kind_or_agent_list: [@source_only_buffer_agent]) - @with_source_only = false + @source_only_mode.disable! end def shutdown(kind_or_agent_list: nil) @@ -378,7 +412,7 @@ def add_source(type, conf) # See also 'fluentd/plugin/input.rb' input.context_router = @event_router input.configure(conf) - input.event_emitter_apply_source_only if @with_source_only + input.event_emitter_apply_source_only if @source_only_mode.enabled? if @enable_input_metrics @event_router.add_metric_callbacks(input.plugin_id, Proc.new {|es| input.metric_callback(es) }) end diff --git a/lib/fluent/supervisor.rb b/lib/fluent/supervisor.rb index a76bd1d40a..a16ab4f0c1 100644 --- a/lib/fluent/supervisor.rb +++ b/lib/fluent/supervisor.rb @@ -43,6 +43,10 @@ def before_run @rpc_endpoint = nil @rpc_server = nil @counter = nil + @socket_manager_server = nil + @starting_new_supervisor_without_downtime = false + @new_supervisor_pid = nil + start_in_parallel = ENV.key?("FLUENT_RUNNING_IN_PARALLEL_WITH_OLD") @fluentd_lock_dir = Dir.mktmpdir("fluentd-lock-") ENV['FLUENTD_LOCK_DIR'] = @fluentd_lock_dir @@ -65,10 +69,21 @@ def before_run if config[:disable_shared_socket] $log.info "shared socket for multiple workers is disabled" + elsif start_in_parallel + begin + raise "[BUG] SERVERENGINE_SOCKETMANAGER_PATH env var must exist when starting in parallel" unless ENV.key?('SERVERENGINE_SOCKETMANAGER_PATH') + @socket_manager_server = ServerEngine::SocketManager::Server.share_sockets_with_another_server(ENV['SERVERENGINE_SOCKETMANAGER_PATH']) + $log.info "restart-without-downtime: took over the shared sockets", path: ENV['SERVERENGINE_SOCKETMANAGER_PATH'] + rescue => e + $log.error "restart-without-downtime: cancel sequence because failed to take over the shared sockets", error: e + raise + end else - server = ServerEngine::SocketManager::Server.open - ENV['SERVERENGINE_SOCKETMANAGER_PATH'] = server.path.to_s + @socket_manager_server = ServerEngine::SocketManager::Server.open + ENV['SERVERENGINE_SOCKETMANAGER_PATH'] = @socket_manager_server.path.to_s end + + stop_parallel_old_supervisor_after_delay if start_in_parallel end def after_run @@ -76,7 +91,9 @@ def after_run stop_rpc_server if @rpc_endpoint stop_counter_server if @counter cleanup_lock_dir - Fluent::Supervisor.cleanup_resources + Fluent::Supervisor.cleanup_socketmanager_path unless @starting_new_supervisor_without_downtime + + notify_new_supervisor_that_old_one_has_stopped if @starting_new_supervisor_without_downtime end def cleanup_lock_dir @@ -138,7 +155,7 @@ def run_rpc_server @rpc_server.mount_proc('/api/config.gracefulReload') { |req, res| $log.debug "fluentd RPC got /api/config.gracefulReload request" if Fluent.windows? - supervisor_sigusr2_handler + graceful_reload else Process.kill :USR2, Process.pid end @@ -172,6 +189,47 @@ def stop_counter_server @counter.stop end + def stop_parallel_old_supervisor_after_delay + # TODO if the new supervisor fails to start and this is not called, + # it would be necessary to update the pid in the PID file to the old one when daemonized. + + Thread.new do + # Delay to wait the new workers to start up. + # Even if it takes a long time to start the new workers and stop the old Fluentd first, + # it is no problem because the socket buffer works, as long as the capacity is not exceeded. + sleep 10 + old_pid = ENV["FLUENT_RUNNING_IN_PARALLEL_WITH_OLD"]&.to_i + if old_pid + $log.info "restart-without-downtime: stop the old supervisor" + Process.kill :TERM, old_pid + end + rescue => e + $log.warn "restart-without-downtime: failed to stop the old supervisor." + + " If the old one does not exist, please send SIGWINCH to this new process to start to work fully." + + " If it exists, something went wrong. Please kill the old one manually.", + error: e + end + end + + def notify_new_supervisor_that_old_one_has_stopped + if config[:pid_path] + new_pid = File.read(config[:pid_path]).to_i + else + raise "[BUG] new_supervisor_pid is not saved" unless @new_supervisor_pid + new_pid = @new_supervisor_pid + end + + $log.info "restart-without-downtime: notify the new supervisor (pid: #{new_pid}) that old one has stopped" + Process.kill :WINCH, new_pid + rescue => e + $log.error( + "restart-without-downtime: failed to notify the new supervisor." + + " Please send SIGWINCH to the new supervisor process manually" + + " if it does not start to work fully.", + error: e + ) + end + def install_supervisor_signal_handlers return if Fluent.windows? @@ -187,7 +245,11 @@ def install_supervisor_signal_handlers trap :USR2 do $log.debug 'fluentd supervisor process got SIGUSR2' - supervisor_sigusr2_handler + if Fluent.windows? + graceful_reload + else + restart_without_downtime + end end trap :WINCH do @@ -259,7 +321,7 @@ def install_windows_event_handler when :usr1 supervisor_sigusr1_handler when :usr2 - supervisor_sigusr2_handler + graceful_reload when :cont supervisor_dump_handler_for_windows when :stop_event_thread @@ -289,7 +351,7 @@ def supervisor_sigusr1_handler send_signal_to_workers(:USR1) end - def supervisor_sigusr2_handler + def graceful_reload conf = nil t = Thread.new do $log.info 'Reloading new config' @@ -317,7 +379,38 @@ def supervisor_sigusr2_handler $log.error "Failed to reload config file: #{e}" end + def restart_without_downtime + # TODO exclusive lock + + $log.info "start restart-without-downtime sequence" + + if @starting_new_supervisor_without_downtime + $log.warn "restart-without-downtime: canceled because it is already starting" + return + end + if ENV.key?("FLUENT_RUNNING_IN_PARALLEL_WITH_OLD") + $log.warn "restart-without-downtime: canceled because the previous sequence is still running" + return + end + + @starting_new_supervisor_without_downtime = true + commands = [ServerEngine.ruby_bin_path, $0] + ARGV + env_to_add = { + "SERVERENGINE_SOCKETMANAGER_INTERNAL_TOKEN" => ServerEngine::SocketManager::INTERNAL_TOKEN, + "FLUENT_RUNNING_IN_PARALLEL_WITH_OLD" => "#{Process.pid}", + } + pid = Process.spawn(env_to_add, commands.join(" ")) + @new_supervisor_pid = pid unless config[:daemonize] + rescue => e + $log.error "restart-without-downtime: failed", error: e + @starting_new_supervisor_without_downtime = false + end + def cancel_source_only + if ENV.key?("FLUENT_RUNNING_IN_PARALLEL_WITH_OLD") + $log.info "restart-without-downtime: done all sequences, now the new workers starts to work fully" + ENV.delete("FLUENT_RUNNING_IN_PARALLEL_WITH_OLD") + end send_signal_to_workers(:WINCH) end @@ -510,12 +603,11 @@ def self.default_options } end - def self.cleanup_resources - unless Fluent.windows? - if ENV.has_key?('SERVERENGINE_SOCKETMANAGER_PATH') - FileUtils.rm_f(ENV['SERVERENGINE_SOCKETMANAGER_PATH']) - end - end + def self.cleanup_socketmanager_path + return if Fluent.windows? + return unless ENV.key?('SERVERENGINE_SOCKETMANAGER_PATH') + + FileUtils.rm_f(ENV['SERVERENGINE_SOCKETMANAGER_PATH']) end def initialize(cl_opt) @@ -583,7 +675,7 @@ def run_supervisor(dry_run: false) begin ServerEngine::Privilege.change(@chuser, @chgroup) MessagePackFactory.init(enable_time_support: @system_config.enable_msgpack_time_support) - Fluent::Engine.init(@system_config, supervisor_mode: true) + Fluent::Engine.init(@system_config, supervisor_mode: true, start_in_parallel: ENV.key?("FLUENT_RUNNING_IN_PARALLEL_WITH_OLD")) Fluent::Engine.run_configure(@conf, dry_run: dry_run) rescue Fluent::ConfigError => e $log.error 'config error', file: @config_path, error: e @@ -632,10 +724,10 @@ def run_worker File.umask(@chumask.to_i(8)) end MessagePackFactory.init(enable_time_support: @system_config.enable_msgpack_time_support) - Fluent::Engine.init(@system_config) + Fluent::Engine.init(@system_config, start_in_parallel: ENV.key?("FLUENT_RUNNING_IN_PARALLEL_WITH_OLD")) Fluent::Engine.run_configure(@conf) Fluent::Engine.run - self.class.cleanup_resources if @standalone_worker + self.class.cleanup_socketmanager_path if @standalone_worker exit 0 end end @@ -853,7 +945,8 @@ def install_main_process_signal_handlers end trap :USR2 do - reload_config + # Do nothing + # TODO consider suitable code for this end trap :CONT do