From dce0177dce3df7d809ff77c0fca4e3485122ac6c Mon Sep 17 00:00:00 2001 From: Daijiro Fukuda Date: Mon, 25 Nov 2024 12:26:00 +0900 Subject: [PATCH] Zero downtime restart Add a new feature: Zero downtime update/reload 1. The supervisor receives SIGUSR2. 2. Spawn a new supervisor. 3. Take over shared sockets. 4. Launch new workers, and stop old processes in parallel. * Launch new workers with source-only mode * Limit to zero_downtime_restart_ready? input plugin * Send SIGTERM to the old supervisor after 10s delay from 3. 5. The old supervisor stops and sends SIGWINCH to the new one. 6. The new workers run fully. Problem to solve: Updating Fluentd or reloading a config causes downtime. Plugins that receive data as a server, such as `in_udp`, `in_tcp`, and `in_syslog`, cannot receive data during this time. This means that the data sent by a client is lost during this time unless the client has a re-sending feature. This makes updating Fluentd or reloading a config difficult in some cases. Note: need these feature * https://github.com/fluent/fluentd/pull/4661 * https://github.com/treasure-data/serverengine/pull/146 Co-authored-by: Shizuo Fujita Signed-off-by: Daijiro Fukuda --- lib/fluent/engine.rb | 4 +- lib/fluent/plugin/in_syslog.rb | 4 + lib/fluent/plugin/in_tcp.rb | 4 + lib/fluent/plugin/in_udp.rb | 4 + lib/fluent/plugin/input.rb | 4 + lib/fluent/root_agent.rb | 50 ++++++++-- lib/fluent/supervisor.rb | 170 +++++++++++++++++++++++++++++---- test/test_plugin_classes.rb | 8 ++ test/test_root_agent.rb | 106 ++++++++++++++++++++ test/test_supervisor.rb | 123 +++++++++++++++++++++++- 10 files changed, 447 insertions(+), 30 deletions(-) diff --git a/lib/fluent/engine.rb b/lib/fluent/engine.rb index b6677c6443..263d1cfc5b 100644 --- a/lib/fluent/engine.rb +++ b/lib/fluent/engine.rb @@ -51,7 +51,7 @@ def initialize attr_reader :root_agent, :system_config, :supervisor_mode - def init(system_config, supervisor_mode: false) + def init(system_config, supervisor_mode: false, start_in_parallel: false) @system_config = system_config @supervisor_mode = supervisor_mode @@ -60,7 +60,7 @@ def init(system_config, supervisor_mode: false) @log_event_verbose = system_config.log_event_verbose unless system_config.log_event_verbose.nil? - @root_agent = RootAgent.new(log: log, system_config: @system_config) + @root_agent = RootAgent.new(log: log, system_config: @system_config, start_in_parallel: start_in_parallel) self end diff --git a/lib/fluent/plugin/in_syslog.rb b/lib/fluent/plugin/in_syslog.rb index 28ad1c5dd9..3d4b205c98 100644 --- a/lib/fluent/plugin/in_syslog.rb +++ b/lib/fluent/plugin/in_syslog.rb @@ -156,6 +156,10 @@ def multi_workers_ready? true end + def zero_downtime_restart_ready? + true + end + def start super diff --git a/lib/fluent/plugin/in_tcp.rb b/lib/fluent/plugin/in_tcp.rb index bd2ea83e5b..9ecb6b793d 100644 --- a/lib/fluent/plugin/in_tcp.rb +++ b/lib/fluent/plugin/in_tcp.rb @@ -101,6 +101,10 @@ def multi_workers_ready? true end + def zero_downtime_restart_ready? + true + end + def start super diff --git a/lib/fluent/plugin/in_udp.rb b/lib/fluent/plugin/in_udp.rb index c2d436115f..645adf8f08 100644 --- a/lib/fluent/plugin/in_udp.rb +++ b/lib/fluent/plugin/in_udp.rb @@ -65,6 +65,10 @@ def multi_workers_ready? true end + def zero_downtime_restart_ready? + true + end + def start super diff --git a/lib/fluent/plugin/input.rb b/lib/fluent/plugin/input.rb index 7a6909f7a9..d465ed3a22 100644 --- a/lib/fluent/plugin/input.rb +++ b/lib/fluent/plugin/input.rb @@ -70,6 +70,10 @@ def metric_callback(es) def multi_workers_ready? false end + + def zero_downtime_restart_ready? + false + end end end end diff --git a/lib/fluent/root_agent.rb b/lib/fluent/root_agent.rb index c66ccb489c..0a7a34b1cb 100644 --- a/lib/fluent/root_agent.rb +++ b/lib/fluent/root_agent.rb @@ -48,7 +48,35 @@ module Fluent class RootAgent < Agent ERROR_LABEL = "@ERROR".freeze # @ERROR is built-in error label - def initialize(log:, system_config: SystemConfig.new) + class SourceOnlyMode + DISABELD = 0 + NORMAL = 1 + ONLY_ZERO_DOWNTIME_RESTART_READY = 2 + + def initialize(with_source_only, start_in_parallel) + if start_in_parallel + @mode = ONLY_ZERO_DOWNTIME_RESTART_READY + elsif with_source_only + @mode = NORMAL + else + @mode = DISABELD + end + end + + def enabled? + @mode != DISABELD + end + + def only_zero_downtime_restart_ready? + @mode == ONLY_ZERO_DOWNTIME_RESTART_READY + end + + def disable! + @mode = DISABELD + end + end + + def initialize(log:, system_config: SystemConfig.new, start_in_parallel: false) super(log: log) @labels = {} @@ -56,7 +84,7 @@ def initialize(log:, system_config: SystemConfig.new) @suppress_emit_error_log_interval = 0 @next_emit_error_log_time = nil @without_source = system_config.without_source || false - @with_source_only = system_config.with_source_only || false + @source_only_mode = SourceOnlyMode.new(system_config.with_source_only, start_in_parallel) @source_only_buffer_agent = nil @enable_input_metrics = system_config.enable_input_metrics || false @@ -67,7 +95,7 @@ def initialize(log:, system_config: SystemConfig.new) attr_reader :labels def source_only_router - raise "[BUG] 'RootAgent#source_only_router' should not be called when 'with_source_only' is false" unless @with_source_only + raise "[BUG] 'RootAgent#source_only_router' should not be called when 'with_source_only' is false" unless @source_only_mode.enabled? @source_only_buffer_agent.event_router end @@ -154,7 +182,7 @@ def configure(conf) super - setup_source_only_buffer_agent if @with_source_only + setup_source_only_buffer_agent if @source_only_mode.enabled? # initialize elements if @without_source @@ -187,9 +215,12 @@ def cleanup_source_only_buffer_agent end def lifecycle(desc: false, kind_callback: nil, kind_or_agent_list: nil) + only_zero_downtime_restart_ready = false + unless kind_or_agent_list - if @with_source_only + if @source_only_mode.enabled? kind_or_agent_list = [:input, @source_only_buffer_agent] + only_zero_downtime_restart_ready = @source_only_mode.only_zero_downtime_restart_ready? elsif @source_only_buffer_agent # source_only_buffer_agent can re-reroute events, so the priority is equal to output_with_router. kind_or_agent_list = [:input, :output_with_router, @source_only_buffer_agent, @labels.values, :filter, :output].flatten @@ -214,6 +245,9 @@ def lifecycle(desc: false, kind_callback: nil, kind_or_agent_list: nil) end display_kind = (kind == :output_with_router ? :output : kind) list.each do |instance| + if only_zero_downtime_restart_ready + next unless instance.respond_to?(:zero_downtime_restart_ready?) and instance.zero_downtime_restart_ready? + end yield instance, display_kind end end @@ -257,7 +291,7 @@ def flush! end def cancel_source_only! - unless @with_source_only + unless @source_only_mode.enabled? log.info "do nothing for canceling with-source-only because the current mode is not with-source-only." return end @@ -285,7 +319,7 @@ def cancel_source_only! setup_source_only_buffer_agent(flush: true) start(kind_or_agent_list: [@source_only_buffer_agent]) - @with_source_only = false + @source_only_mode.disable! end def shutdown(kind_or_agent_list: nil) @@ -378,7 +412,7 @@ def add_source(type, conf) # See also 'fluentd/plugin/input.rb' input.context_router = @event_router input.configure(conf) - input.event_emitter_apply_source_only if @with_source_only + input.event_emitter_apply_source_only if @source_only_mode.enabled? if @enable_input_metrics @event_router.add_metric_callbacks(input.plugin_id, Proc.new {|es| input.metric_callback(es) }) end diff --git a/lib/fluent/supervisor.rb b/lib/fluent/supervisor.rb index a76bd1d40a..c14bd21f02 100644 --- a/lib/fluent/supervisor.rb +++ b/lib/fluent/supervisor.rb @@ -43,11 +43,16 @@ def before_run @rpc_endpoint = nil @rpc_server = nil @counter = nil + @socket_manager_server = nil + @starting_new_supervisor_with_zero_downtime = false + @new_supervisor_pid = nil + start_in_parallel = ENV.key?("FLUENT_RUNNING_IN_PARALLEL_WITH_OLD") + @zero_downtime_restart_mutex = Mutex.new @fluentd_lock_dir = Dir.mktmpdir("fluentd-lock-") ENV['FLUENTD_LOCK_DIR'] = @fluentd_lock_dir - if config[:rpc_endpoint] + if config[:rpc_endpoint] and not start_in_parallel @rpc_endpoint = config[:rpc_endpoint] @enable_get_dump = config[:enable_get_dump] run_rpc_server @@ -59,16 +64,27 @@ def before_run install_supervisor_signal_handlers end - if counter = config[:counter_server] + if counter = config[:counter_server] and not start_in_parallel run_counter_server(counter) end if config[:disable_shared_socket] $log.info "shared socket for multiple workers is disabled" + elsif start_in_parallel + begin + raise "[BUG] SERVERENGINE_SOCKETMANAGER_PATH env var must exist when starting in parallel" unless ENV.key?('SERVERENGINE_SOCKETMANAGER_PATH') + @socket_manager_server = ServerEngine::SocketManager::Server.share_sockets_with_another_server(ENV['SERVERENGINE_SOCKETMANAGER_PATH']) + $log.info "zero-downtime-restart: took over the shared sockets", path: ENV['SERVERENGINE_SOCKETMANAGER_PATH'] + rescue => e + $log.error "zero-downtime-restart: cancel sequence because failed to take over the shared sockets", error: e + raise + end else - server = ServerEngine::SocketManager::Server.open - ENV['SERVERENGINE_SOCKETMANAGER_PATH'] = server.path.to_s + @socket_manager_server = ServerEngine::SocketManager::Server.open + ENV['SERVERENGINE_SOCKETMANAGER_PATH'] = @socket_manager_server.path.to_s end + + stop_parallel_old_supervisor_after_delay if start_in_parallel end def after_run @@ -76,7 +92,9 @@ def after_run stop_rpc_server if @rpc_endpoint stop_counter_server if @counter cleanup_lock_dir - Fluent::Supervisor.cleanup_resources + Fluent::Supervisor.cleanup_socketmanager_path unless @starting_new_supervisor_with_zero_downtime + + notify_new_supervisor_that_old_one_has_stopped if @starting_new_supervisor_with_zero_downtime end def cleanup_lock_dir @@ -138,7 +156,7 @@ def run_rpc_server @rpc_server.mount_proc('/api/config.gracefulReload') { |req, res| $log.debug "fluentd RPC got /api/config.gracefulReload request" if Fluent.windows? - supervisor_sigusr2_handler + graceful_reload else Process.kill :USR2, Process.pid end @@ -157,7 +175,7 @@ def run_rpc_server end def stop_rpc_server - @rpc_server.shutdown + @rpc_server&.shutdown end def run_counter_server(counter_conf) @@ -172,6 +190,44 @@ def stop_counter_server @counter.stop end + def stop_parallel_old_supervisor_after_delay + Thread.new do + # Delay to wait the new workers to start up. + # Even if it takes a long time to start the new workers and stop the old Fluentd first, + # it is no problem because the socket buffer works, as long as the capacity is not exceeded. + sleep 10 + old_pid = ENV["FLUENT_RUNNING_IN_PARALLEL_WITH_OLD"]&.to_i + if old_pid + $log.info "zero-downtime-restart: stop the old supervisor" + Process.kill :TERM, old_pid + end + rescue => e + $log.warn "zero-downtime-restart: failed to stop the old supervisor." + + " If the old one does not exist, please send SIGWINCH to this new process to start to work fully." + + " If it exists, something went wrong. Please kill the old one manually.", + error: e + end + end + + def notify_new_supervisor_that_old_one_has_stopped + if config[:pid_path] + new_pid = File.read(config[:pid_path]).to_i + else + raise "[BUG] new_supervisor_pid is not saved" unless @new_supervisor_pid + new_pid = @new_supervisor_pid + end + + $log.info "zero-downtime-restart: notify the new supervisor (pid: #{new_pid}) that old one has stopped" + Process.kill :WINCH, new_pid + rescue => e + $log.error( + "zero-downtime-restart: failed to notify the new supervisor." + + " Please send SIGWINCH to the new supervisor process manually" + + " if it does not start to work fully.", + error: e + ) + end + def install_supervisor_signal_handlers return if Fluent.windows? @@ -187,7 +243,11 @@ def install_supervisor_signal_handlers trap :USR2 do $log.debug 'fluentd supervisor process got SIGUSR2' - supervisor_sigusr2_handler + if Fluent.windows? + graceful_reload + else + zero_downtime_restart + end end trap :WINCH do @@ -259,7 +319,7 @@ def install_windows_event_handler when :usr1 supervisor_sigusr1_handler when :usr2 - supervisor_sigusr2_handler + graceful_reload when :cont supervisor_dump_handler_for_windows when :stop_event_thread @@ -289,7 +349,7 @@ def supervisor_sigusr1_handler send_signal_to_workers(:USR1) end - def supervisor_sigusr2_handler + def graceful_reload conf = nil t = Thread.new do $log.info 'Reloading new config' @@ -317,7 +377,76 @@ def supervisor_sigusr2_handler $log.error "Failed to reload config file: #{e}" end + def zero_downtime_restart + Thread.new do + @zero_downtime_restart_mutex.synchronize do + $log.info "start zero-downtime-restart sequence" + + if @starting_new_supervisor_with_zero_downtime + $log.warn "zero-downtime-restart: canceled because it is already starting" + Thread.exit + end + if ENV.key?("FLUENT_RUNNING_IN_PARALLEL_WITH_OLD") + $log.warn "zero-downtime-restart: canceled because the previous sequence is still running" + Thread.exit + end + + @starting_new_supervisor_with_zero_downtime = true + commands = [ServerEngine.ruby_bin_path, $0] + ARGV + env_to_add = { + "SERVERENGINE_SOCKETMANAGER_INTERNAL_TOKEN" => ServerEngine::SocketManager::INTERNAL_TOKEN, + "FLUENT_RUNNING_IN_PARALLEL_WITH_OLD" => "#{Process.pid}", + } + pid = Process.spawn(env_to_add, commands.join(" ")) + @new_supervisor_pid = pid unless config[:daemonize] + + if config[:daemonize] + Thread.new(pid) do |pid| + _, status = Process.wait2(pid) + # check if `ServerEngine::Daemon#daemonize_with_double_fork` succeeded or not + unless status.success? + @starting_new_supervisor_with_zero_downtime = false + $log.error "zero-downtime-restart: failed because new supervisor exits unexpectedly" + end + end + else + Thread.new(pid) do |pid| + _, status = Process.wait2(pid) + @starting_new_supervisor_with_zero_downtime = false + $log.error "zero-downtime-restart: failed because new supervisor exits unexpectedly", status: status + end + end + end + rescue => e + $log.error "zero-downtime-restart: failed", error: e + @starting_new_supervisor_with_zero_downtime = false + end + end + def cancel_source_only + if ENV.key?("FLUENT_RUNNING_IN_PARALLEL_WITH_OLD") + if config[:rpc_endpoint] + begin + @rpc_endpoint = config[:rpc_endpoint] + @enable_get_dump = config[:enable_get_dump] + run_rpc_server + rescue => e + $log.error "failed to start RPC server", error: e + end + end + + if counter = config[:counter_server] + begin + run_counter_server(counter) + rescue => e + $log.error "failed to start counter server", error: e + end + end + + $log.info "zero-downtime-restart: done all sequences, now new processes start to work fully" + ENV.delete("FLUENT_RUNNING_IN_PARALLEL_WITH_OLD") + end + send_signal_to_workers(:WINCH) end @@ -510,12 +639,11 @@ def self.default_options } end - def self.cleanup_resources - unless Fluent.windows? - if ENV.has_key?('SERVERENGINE_SOCKETMANAGER_PATH') - FileUtils.rm_f(ENV['SERVERENGINE_SOCKETMANAGER_PATH']) - end - end + def self.cleanup_socketmanager_path + return if Fluent.windows? + return unless ENV.key?('SERVERENGINE_SOCKETMANAGER_PATH') + + FileUtils.rm_f(ENV['SERVERENGINE_SOCKETMANAGER_PATH']) end def initialize(cl_opt) @@ -583,7 +711,7 @@ def run_supervisor(dry_run: false) begin ServerEngine::Privilege.change(@chuser, @chgroup) MessagePackFactory.init(enable_time_support: @system_config.enable_msgpack_time_support) - Fluent::Engine.init(@system_config, supervisor_mode: true) + Fluent::Engine.init(@system_config, supervisor_mode: true, start_in_parallel: ENV.key?("FLUENT_RUNNING_IN_PARALLEL_WITH_OLD")) Fluent::Engine.run_configure(@conf, dry_run: dry_run) rescue Fluent::ConfigError => e $log.error 'config error', file: @config_path, error: e @@ -632,10 +760,10 @@ def run_worker File.umask(@chumask.to_i(8)) end MessagePackFactory.init(enable_time_support: @system_config.enable_msgpack_time_support) - Fluent::Engine.init(@system_config) + Fluent::Engine.init(@system_config, start_in_parallel: ENV.key?("FLUENT_RUNNING_IN_PARALLEL_WITH_OLD")) Fluent::Engine.run_configure(@conf) Fluent::Engine.run - self.class.cleanup_resources if @standalone_worker + self.class.cleanup_socketmanager_path if @standalone_worker exit 0 end end @@ -853,6 +981,10 @@ def install_main_process_signal_handlers end trap :USR2 do + # Leave the old GracefulReload feature, just in case. + # We can send SIGUSR2 to the worker process to use this old GracefulReload feature. + # (Note: Normally, we can send SIGUSR2 to the supervisor process to use + # zero-downtime-restart feature as GracefulReload on non-Windows.) reload_config end diff --git a/test/test_plugin_classes.rb b/test/test_plugin_classes.rb index ccd6a1469c..a973cfba73 100644 --- a/test/test_plugin_classes.rb +++ b/test/test_plugin_classes.rb @@ -106,6 +106,14 @@ def initialize @emit_size_metrics = FluentTest::FluentTestCounterMetrics.new end + def multi_workers_ready? + true + end + + def zero_downtime_restart_ready? + true + end + def start super @started = true diff --git a/test/test_root_agent.rb b/test/test_root_agent.rb index d10174d0f3..fa468ed554 100644 --- a/test/test_root_agent.rb +++ b/test/test_root_agent.rb @@ -1051,4 +1051,110 @@ def setup @root_agent.shutdown end end + + sub_test_case 'start_in_parallel' do + def conf + <<~EOC + + @type test_in_gen + @id test_in_gen + num 20 + interval_sec 0.1 + async + + + + @type test_in + @id test_in + + + + @type record_transformer + @id record_transformer + + foo foo + + + + + @type test_out + @id test_out + + EOC + end + + def setup + omit "Not supported on Windows" if Fluent.windows? + system_config = SystemConfig.new( + Config::Element.new('system', '', {}, [ + Config::Element.new('source_only_buffer', '', { + 'flush_interval' => 1, + }, []), + ]) + ) + @root_agent = RootAgent.new(log: $log, system_config: system_config, start_in_parallel: true) + stub(Engine).root_agent { @root_agent } + stub(Engine).system_config { system_config } + @root_agent.configure(Config.parse(conf, "(test)", "(test_dir)")) + end + + test 'only input plugins should start' do + @root_agent.start + + assert_equal( + { + "input started?" => [true, false], + "filter started?" => [false], + "output started?" => [false], + }, + { + "input started?" => @root_agent.inputs.map { |plugin| plugin.started? }, + "filter started?" => @root_agent.filters.map { |plugin| plugin.started? }, + "output started?" => @root_agent.outputs.map { |plugin| plugin.started? }, + } + ) + ensure + @root_agent.shutdown + # Buffer files remain because not cancelling source-only. + # As a test, they should be clean-up-ed. + buf_dir = @root_agent.instance_variable_get(:@source_only_buffer_agent).instance_variable_get(:@base_buffer_dir) + FileUtils.remove_dir(buf_dir) + end + + test '#cancel_source_only! should start all plugins' do + @root_agent.start + @root_agent.cancel_source_only! + + assert_equal( + { + "input started?" => [true, true], + "filter started?" => [true], + "output started?" => [true], + }, + { + "input started?" => @root_agent.inputs.map { |plugin| plugin.started? }, + "filter started?" => @root_agent.filters.map { |plugin| plugin.started? }, + "output started?" => @root_agent.outputs.map { |plugin| plugin.started? }, + } + ) + ensure + @root_agent.shutdown + end + + test 'buffer should be loaded after #cancel_source_only!' do + @root_agent.start + sleep 1 + @root_agent.cancel_source_only! + + waiting(3) do + # Wait buffer loaded after source-only cancelled + sleep 1 until @root_agent.outputs[0].events["test.event"].any? { |record| record["num"] == 0 } + end + + # all data should be outputted + assert { @root_agent.outputs[0].events["test.event"].size == 20 } + ensure + @root_agent.shutdown + end + end end diff --git a/test/test_supervisor.rb b/test/test_supervisor.rb index 6d3363630e..28c20b2e39 100644 --- a/test/test_supervisor.rb +++ b/test/test_supervisor.rb @@ -19,7 +19,7 @@ class SupervisorTest < ::Test::Unit::TestCase class DummyServer include Fluent::ServerModule - attr_accessor :rpc_endpoint, :enable_get_dump + attr_accessor :rpc_endpoint, :enable_get_dump, :socket_manager_server def config {} end @@ -891,6 +891,127 @@ def server.config end end + sub_test_case "zero_downtime_restart" do + setup do + omit "Not supported on Windows" if Fluent.windows? + end + + data( + # When daemonize, exit-status is important. The new spawned process does double-fork and exits soon. + "daemonize and succeeded double-fork of new process" => [true, true, 0, false], + "daemonize and failed double-fork of new process" => [true, false, 0, true], + # When no daemon, whether the new spawned process is alive is important, not exit-status. + "no daemon and new process alive" => [false, false, 3, false], + "no daemon and new process dead" => [false, false, 0, true], + ) + def test_zero_downtime_restart((daemonize, wait_success, wait_sleep, restart_canceled)) + # == Arrange == + env_spawn = {} + pid_wait = nil + + server = DummyServer.new + + stub(server).config do + { + daemonize: daemonize, + pid_path: "test-pid-file", + } + end + process_stub = stub(Process) + process_stub.spawn do |env, commands| + env_spawn = env + -1 + end + process_stub.wait2 do |pid| + pid_wait = pid + sleep wait_sleep + if wait_success + status = Class.new{def success?; true; end}.new + else + status = Class.new{def success?; false; end}.new + end + [pid, status] + end + stub(File).read("test-pid-file") { -1 } + + # mock to check notify_new_supervisor_that_old_one_has_stopped sends SIGWINCH + if restart_canceled + mock(Process).kill(:WINCH, -1).never + else + mock(Process).kill(:WINCH, -1) + end + + # == Act and Assert == + server.before_run + server.zero_downtime_restart.join + sleep 1 # To wait a sub thread for waitpid in zero_downtime_restart + server.after_run + + assert_equal( + [ + !restart_canceled, + true, + Process.pid, + -1, + ], + [ + server.instance_variable_get(:@starting_new_supervisor_with_zero_downtime), + env_spawn.key?("SERVERENGINE_SOCKETMANAGER_INTERNAL_TOKEN"), + env_spawn["FLUENT_RUNNING_IN_PARALLEL_WITH_OLD"].to_i, + pid_wait, + ] + ) + ensure + Fluent::Supervisor.cleanup_socketmanager_path + ENV.delete('SERVERENGINE_SOCKETMANAGER_PATH') + end + + def test_share_sockets + server = DummyServer.new + server.before_run + path = ENV['SERVERENGINE_SOCKETMANAGER_PATH'] + + client = ServerEngine::SocketManager::Client.new(path) + udp_port = unused_port(protocol: :udp) + tcp_port = unused_port(protocol: :tcp) + client.listen_udp("localhost", udp_port) + client.listen_tcp("localhost", tcp_port) + + ENV['FLUENT_RUNNING_IN_PARALLEL_WITH_OLD'] = "" + new_server = DummyServer.new + stub(new_server).stop_parallel_old_supervisor_after_delay + new_server.before_run + + assert_equal( + [[udp_port], [tcp_port]], + [ + new_server.socket_manager_server.udp_sockets.values.map { |v| v.addr[1] }, + new_server.socket_manager_server.tcp_sockets.values.map { |v| v.addr[1] }, + ] + ) + ensure + server&.after_run + new_server&.after_run + ENV.delete('SERVERENGINE_SOCKETMANAGER_PATH') + ENV.delete("FLUENT_RUNNING_IN_PARALLEL_WITH_OLD") + end + + def test_stop_parallel_old_supervisor_after_delay + ENV['SERVERENGINE_SOCKETMANAGER_PATH'] = "" + ENV['FLUENT_RUNNING_IN_PARALLEL_WITH_OLD'] = "-1" + stub(ServerEngine::SocketManager::Server).share_sockets_with_another_server + mock(Process).kill(:TERM, -1) + + server = DummyServer.new + server.before_run + sleep 12 # Can't we skip the delay for this test? + ensure + server&.after_run + ENV.delete('SERVERENGINE_SOCKETMANAGER_PATH') + ENV.delete("FLUENT_RUNNING_IN_PARALLEL_WITH_OLD") + end + end + def create_debug_dummy_logger dl_opts = {} dl_opts[:log_level] = ServerEngine::DaemonLogger::DEBUG