diff --git a/lib/fluent/engine.rb b/lib/fluent/engine.rb index 65db5c7a39..94d09aa334 100644 --- a/lib/fluent/engine.rb +++ b/lib/fluent/engine.rb @@ -49,7 +49,7 @@ def initialize attr_reader :root_agent, :system_config, :supervisor_mode - def init(system_config, supervisor_mode: false) + def init(system_config, supervisor_mode: false, start_in_parallel: false) @system_config = system_config @supervisor_mode = supervisor_mode @@ -58,7 +58,7 @@ def init(system_config, supervisor_mode: false) @log_event_verbose = system_config.log_event_verbose unless system_config.log_event_verbose.nil? - @root_agent = RootAgent.new(log: log, system_config: @system_config) + @root_agent = RootAgent.new(log: log, system_config: @system_config, start_in_parallel: start_in_parallel) self end diff --git a/lib/fluent/plugin/in_syslog.rb b/lib/fluent/plugin/in_syslog.rb index 28ad1c5dd9..b33a479aee 100644 --- a/lib/fluent/plugin/in_syslog.rb +++ b/lib/fluent/plugin/in_syslog.rb @@ -156,6 +156,10 @@ def multi_workers_ready? true end + def restart_without_downtime_ready? + true + end + def start super diff --git a/lib/fluent/plugin/in_tcp.rb b/lib/fluent/plugin/in_tcp.rb index bd2ea83e5b..224eecb043 100644 --- a/lib/fluent/plugin/in_tcp.rb +++ b/lib/fluent/plugin/in_tcp.rb @@ -101,6 +101,10 @@ def multi_workers_ready? true end + def restart_without_downtime_ready? + true + end + def start super diff --git a/lib/fluent/plugin/in_udp.rb b/lib/fluent/plugin/in_udp.rb index c2d436115f..006524a95c 100644 --- a/lib/fluent/plugin/in_udp.rb +++ b/lib/fluent/plugin/in_udp.rb @@ -65,6 +65,10 @@ def multi_workers_ready? true end + def restart_without_downtime_ready? + true + end + def start super diff --git a/lib/fluent/plugin/input.rb b/lib/fluent/plugin/input.rb index 7a6909f7a9..7fb9b3608a 100644 --- a/lib/fluent/plugin/input.rb +++ b/lib/fluent/plugin/input.rb @@ -70,6 +70,10 @@ def metric_callback(es) def multi_workers_ready? false end + + def restart_without_downtime_ready? + false + end end end end diff --git a/lib/fluent/root_agent.rb b/lib/fluent/root_agent.rb index 8411152da4..f5870feaa2 100644 --- a/lib/fluent/root_agent.rb +++ b/lib/fluent/root_agent.rb @@ -48,7 +48,35 @@ module Fluent class RootAgent < Agent ERROR_LABEL = "@ERROR".freeze # @ERROR is built-in error label - def initialize(log:, system_config: SystemConfig.new) + class SourceOnlyMode + DISABELD = 0 + NORMAL = 1 + RESTART_WITHOUT_DOWNTIME_READY_ONLY = 2 + + def initialize(with_source_only, start_in_parallel) + if start_in_parallel + @mode = RESTART_WITHOUT_DOWNTIME_READY_ONLY + elsif with_source_only + @mode = NORMAL + else + @mode = DISABELD + end + end + + def source_only? + @mode != DISABELD + end + + def restart_without_downtime_ready_only? + @mode == RESTART_WITHOUT_DOWNTIME_READY_ONLY + end + + def disable! + @mode = DISABELD + end + end + + def initialize(log:, system_config: SystemConfig.new, start_in_parallel: false) super(log: log) @labels = {} @@ -56,7 +84,7 @@ def initialize(log:, system_config: SystemConfig.new) @suppress_emit_error_log_interval = 0 @next_emit_error_log_time = nil @without_source = system_config.without_source || false - @with_source_only = system_config.with_source_only || false + @source_only_mode = SourceOnlyMode.new(system_config.with_source_only, start_in_parallel) @source_only_buffer_agent = nil @enable_input_metrics = system_config.enable_input_metrics || false @@ -67,7 +95,7 @@ def initialize(log:, system_config: SystemConfig.new) attr_reader :labels def source_only_router - raise "[BUG] 'RootAgent#source_only_router' should not be called when 'with_source_only' is false" unless @with_source_only + raise "[BUG] 'RootAgent#source_only_router' should not be called when 'with_source_only' is false" unless @source_only_mode.source_only? @source_only_buffer_agent.event_router end @@ -154,7 +182,7 @@ def configure(conf) super - setup_source_only_buffer_agent if @with_source_only + setup_source_only_buffer_agent if @source_only_mode.source_only? # initialize elements if @without_source @@ -184,7 +212,7 @@ def setup_source_only_buffer_agent(flush: false) def lifecycle(desc: false, kind_callback: nil, kind_or_agent_list: nil) unless kind_or_agent_list - if @with_source_only + if @source_only_mode.source_only? kind_or_agent_list = [:input, @source_only_buffer_agent] elsif @source_only_buffer_agent # source_only_buffer_agent can re-reroute events, so the priority is equal to output_with_router. @@ -210,6 +238,9 @@ def lifecycle(desc: false, kind_callback: nil, kind_or_agent_list: nil) end display_kind = (kind == :output_with_router ? :output : kind) list.each do |instance| + if @source_only_mode.restart_without_downtime_ready_only? + next unless instance.restart_without_downtime_ready? + end yield instance, display_kind end end @@ -254,9 +285,9 @@ def flush! def cancel_source_only! # TODO exclusive lock - if @with_source_only + if @source_only_mode.source_only? log.info "cancel --with-source-only mode and start the other plugins" - @with_source_only = false + @source_only_mode.disable! start lifecycle_control_list[:input].each(&:event_emitter_cancel_source_only) @@ -371,7 +402,7 @@ def add_source(type, conf) # See also 'fluentd/plugin/input.rb' input.context_router = @event_router input.configure(conf) - input.event_emitter_set_source_only if @with_source_only + input.event_emitter_set_source_only if @source_only_mode.source_only? if @enable_input_metrics @event_router.add_metric_callbacks(input.plugin_id, Proc.new {|es| input.metric_callback(es) }) end diff --git a/lib/fluent/supervisor.rb b/lib/fluent/supervisor.rb index c95ab125fa..851030e0e5 100644 --- a/lib/fluent/supervisor.rb +++ b/lib/fluent/supervisor.rb @@ -43,6 +43,10 @@ def before_run @rpc_endpoint = nil @rpc_server = nil @counter = nil + @socket_manager_server = nil + @starting_new_supervisor_without_downtime = false + @new_supervisor_pid = nil + start_in_parallel = ENV.key?("FLUENT_RUNNING_IN_PARALLEL_WITH_OLD") @fluentd_lock_dir = Dir.mktmpdir("fluentd-lock-") ENV['FLUENTD_LOCK_DIR'] = @fluentd_lock_dir @@ -65,10 +69,21 @@ def before_run if config[:disable_shared_socket] $log.info "shared socket for multiple workers is disabled" + elsif start_in_parallel + begin + raise "[BUG] SERVERENGINE_SOCKETMANAGER_PATH env var must exist when starting in parallel" unless ENV.key?('SERVERENGINE_SOCKETMANAGER_PATH') + @socket_manager_server = ServerEngine::SocketManager::Server.take_over_another_server(ENV['SERVERENGINE_SOCKETMANAGER_PATH']) + $log.info "restart-without-downtime: took over the shared sockets", path: ENV['SERVERENGINE_SOCKETMANAGER_PATH'] + rescue => e + $log.error "restart-without-downtime: cancel sequence because failed to take over the shared sockets", error: e + raise + end else - server = ServerEngine::SocketManager::Server.open - ENV['SERVERENGINE_SOCKETMANAGER_PATH'] = server.path.to_s + @socket_manager_server = ServerEngine::SocketManager::Server.open + ENV['SERVERENGINE_SOCKETMANAGER_PATH'] = @socket_manager_server.path.to_s end + + stop_parallel_old_supervisor_after_delay if start_in_parallel end def after_run @@ -76,7 +91,9 @@ def after_run stop_rpc_server if @rpc_endpoint stop_counter_server if @counter cleanup_lock_dir - Fluent::Supervisor.cleanup_resources + Fluent::Supervisor.cleanup_socketmanager_path unless @starting_new_supervisor_without_downtime + + notify_new_supervisor_that_old_one_has_stopped if @starting_new_supervisor_without_downtime end def cleanup_lock_dir @@ -138,7 +155,7 @@ def run_rpc_server @rpc_server.mount_proc('/api/config.gracefulReload') { |req, res| $log.debug "fluentd RPC got /api/config.gracefulReload request" if Fluent.windows? - supervisor_sigusr2_handler + graceful_reload else Process.kill :USR2, Process.pid end @@ -172,6 +189,47 @@ def stop_counter_server @counter.stop end + def stop_parallel_old_supervisor_after_delay + # TODO if the new supervisor fails to start and this is not called, + # it would be necessary to update the pid in the PID file to the old one when daemonized. + + Thread.new do + # Delay to avoid worker downtime as much as possible. + # Even if the downtime occurs, it is no problem because the socket buffer works, + # as long as the capacity is not exceeded. + sleep 10 + old_pid = ENV["FLUENT_RUNNING_IN_PARALLEL_WITH_OLD"]&.to_i + if old_pid + $log.info "restart-without-downtime: stop the old supervisor" + Process.kill :TERM, old_pid + end + rescue => e + $log.warn "restart-without-downtime: failed to stop the old supervisor." + + " If the old one does not exist, please send '34' signal to this new process to start to work fully." + + " If it exists, something went wrong. Please kill the old one manually.", + error: e + end + end + + def notify_new_supervisor_that_old_one_has_stopped + if config[:pid_path] + new_pid = File.read(config[:pid_path]).to_i + else + raise "[BUG] new_supervisor_pid is not saved" unless @new_supervisor_pid + new_pid = @new_supervisor_pid + end + + $log.info "restart-without-downtime: notify the new supervisor (pid: #{new_pid}) that old one has stopped" + Process.kill 34, new_pid + rescue => e + $log.error( + "restart-without-downtime: failed to notify the new supervisor." + + " Please send '34' signal to the new supervisor process manually" + + " if it does not start to work fully.", + error: e + ) + end + def install_supervisor_signal_handlers return if Fluent.windows? @@ -187,7 +245,11 @@ def install_supervisor_signal_handlers trap :USR2 do $log.debug 'fluentd supervisor process got SIGUSR2' - supervisor_sigusr2_handler + if Fluent.windows? + graceful_reload + else + restart_without_downtime + end end trap 34 do @@ -259,7 +321,7 @@ def install_windows_event_handler when :usr1 supervisor_sigusr1_handler when :usr2 - supervisor_sigusr2_handler + graceful_reload when :cont supervisor_dump_handler_for_windows when :stop_event_thread @@ -289,7 +351,7 @@ def supervisor_sigusr1_handler send_signal_to_workers(:USR1) end - def supervisor_sigusr2_handler + def graceful_reload conf = nil t = Thread.new do $log.info 'Reloading new config' @@ -317,7 +379,38 @@ def supervisor_sigusr2_handler $log.error "Failed to reload config file: #{e}" end + def restart_without_downtime + # TODO exclusive lock + + $log.info "start restart-without-downtime sequence" + + if @starting_new_supervisor_without_downtime + $log.warn "restart-without-downtime: canceled because it is already starting" + return + end + if ENV.key?("FLUENT_RUNNING_IN_PARALLEL_WITH_OLD") + $log.warn "restart-without-downtime: canceled because the previous sequence is still running" + return + end + + @starting_new_supervisor_without_downtime = true + commands = [ServerEngine.ruby_bin_path, $0] + ARGV + env_to_add = { + "SERVERENGINE_SOCKETMANAGER_INTERNAL_TOKEN" => ServerEngine::SocketManager::INTERNAL_TOKEN, + "FLUENT_RUNNING_IN_PARALLEL_WITH_OLD" => "#{Process.pid}", + } + pid = Process.spawn(env_to_add, commands.join(" ")) + @new_supervisor_pid = pid unless config[:daemonize] + rescue => e + $log.error "restart-without-downtime: failed", error: e + @starting_new_supervisor_without_downtime = false + end + def cancel_source_only + if ENV.key?("FLUENT_RUNNING_IN_PARALLEL_WITH_OLD") + $log.info "restart-without-downtime: done all sequences, now the new workers starts to work fully" + ENV.delete("FLUENT_RUNNING_IN_PARALLEL_WITH_OLD") + end send_signal_to_workers(34) end @@ -509,12 +602,11 @@ def self.default_options } end - def self.cleanup_resources - unless Fluent.windows? - if ENV.has_key?('SERVERENGINE_SOCKETMANAGER_PATH') - FileUtils.rm_f(ENV['SERVERENGINE_SOCKETMANAGER_PATH']) - end - end + def self.cleanup_socketmanager_path + return if Fluent.windows? + return unless ENV.key?('SERVERENGINE_SOCKETMANAGER_PATH') + + FileUtils.rm_f(ENV['SERVERENGINE_SOCKETMANAGER_PATH']) end def initialize(cl_opt) @@ -578,7 +670,7 @@ def run_supervisor(dry_run: false) begin ServerEngine::Privilege.change(@chuser, @chgroup) MessagePackFactory.init(enable_time_support: @system_config.enable_msgpack_time_support) - Fluent::Engine.init(@system_config, supervisor_mode: true) + Fluent::Engine.init(@system_config, supervisor_mode: true, start_in_parallel: ENV.key?("FLUENT_RUNNING_IN_PARALLEL_WITH_OLD")) Fluent::Engine.run_configure(@conf, dry_run: dry_run) rescue Fluent::ConfigError => e $log.error 'config error', file: @config_path, error: e @@ -623,10 +715,10 @@ def run_worker File.umask(@chumask.to_i(8)) end MessagePackFactory.init(enable_time_support: @system_config.enable_msgpack_time_support) - Fluent::Engine.init(@system_config) + Fluent::Engine.init(@system_config, start_in_parallel: ENV.key?("FLUENT_RUNNING_IN_PARALLEL_WITH_OLD")) Fluent::Engine.run_configure(@conf) Fluent::Engine.run - self.class.cleanup_resources if @standalone_worker + self.class.cleanup_socketmanager_path if @standalone_worker exit 0 end end @@ -844,7 +936,8 @@ def install_main_process_signal_handlers end trap :USR2 do - reload_config + # Do nothing + # TODO consider suitable code for this end trap :CONT do