diff --git a/lib/fluent/engine.rb b/lib/fluent/engine.rb index afac3167ca..5e45084a39 100644 --- a/lib/fluent/engine.rb +++ b/lib/fluent/engine.rb @@ -136,6 +136,10 @@ def flush! @root_agent.flush! end + def shift_to_limited_mode! + @root_agent.shift_to_limited_mode! + end + def now # TODO thread update Fluent::EventTime.now diff --git a/lib/fluent/plugin/in_tcp.rb b/lib/fluent/plugin/in_tcp.rb index bd2ea83e5b..ccc28209ec 100644 --- a/lib/fluent/plugin/in_tcp.rb +++ b/lib/fluent/plugin/in_tcp.rb @@ -101,6 +101,10 @@ def multi_workers_ready? true end + def limited_mode_ready? + true + end + def start super diff --git a/lib/fluent/plugin/in_udp.rb b/lib/fluent/plugin/in_udp.rb index c2d436115f..dd65526ffa 100644 --- a/lib/fluent/plugin/in_udp.rb +++ b/lib/fluent/plugin/in_udp.rb @@ -65,6 +65,10 @@ def multi_workers_ready? true end + def limited_mode_ready? + true + end + def start super diff --git a/lib/fluent/plugin/input.rb b/lib/fluent/plugin/input.rb index 7a6909f7a9..347e742f7c 100644 --- a/lib/fluent/plugin/input.rb +++ b/lib/fluent/plugin/input.rb @@ -70,6 +70,15 @@ def metric_callback(es) def multi_workers_ready? false end + + def limited_mode_ready? + false + end + + def shift_to_limited_mode! + raise "BUG: use shift_to_limited_mode although this plugin is not ready for the limited mode" unless limited_mode_ready? + event_emitter_force_limited_router + end end end end diff --git a/lib/fluent/plugin_helper/event_emitter.rb b/lib/fluent/plugin_helper/event_emitter.rb index ba089e485a..5c573d7299 100644 --- a/lib/fluent/plugin_helper/event_emitter.rb +++ b/lib/fluent/plugin_helper/event_emitter.rb @@ -26,6 +26,9 @@ module EventEmitter def router @_event_emitter_used_actually = true + + return Engine.root_agent.limited_router if @_event_emitter_force_limited_router + if @_event_emitter_lazy_init @router = @primary_instance.router end @@ -48,6 +51,10 @@ def event_emitter_used_actually? @_event_emitter_used_actually end + def event_emitter_force_limited_router + @_event_emitter_force_limited_router = true + end + def event_emitter_router(label_name) if label_name if label_name == "@ROOT" @@ -72,6 +79,7 @@ def initialize super @_event_emitter_used_actually = false @_event_emitter_lazy_init = false + @_event_emitter_force_limited_router = false @router = nil end diff --git a/lib/fluent/root_agent.rb b/lib/fluent/root_agent.rb index 1165dbadcf..c301737fee 100644 --- a/lib/fluent/root_agent.rb +++ b/lib/fluent/root_agent.rb @@ -60,10 +60,16 @@ def initialize(log:, system_config: SystemConfig.new) suppress_interval(system_config.emit_error_log_interval) unless system_config.emit_error_log_interval.nil? @without_source = system_config.without_source unless system_config.without_source.nil? @enable_input_metrics = !!system_config.enable_input_metrics + + @limited_mode_agent = nil + @limited_router = nil + @limited_mode_forwarding_port = "29140" + @limited_mode_forwarding_buf_path = File.join(system_config.root_dir || DEFAULT_BACKUP_DIR, "limited_mode_buffer") end attr_reader :inputs attr_reader :labels + attr_reader :limited_router def configure(conf) used_worker_ids = [] @@ -161,6 +167,9 @@ def configure(conf) add_source(type, e) } end + + # TODO Stop doing this when it is not needed. + add_source_to_receive_from_limited_mode_agent end def setup_error_label(e) @@ -192,10 +201,15 @@ def lifecycle(desc: false, kind_callback: nil) yield instance, display_kind end end - if kind_callback - kind_callback.call - end + + kind_callback&.call + end + + return unless @limited_mode_agent + @limited_mode_agent.lifecycle do |plugin, display_kind| + yield plugin, display_kind end + kind_callback&.call end def start @@ -231,6 +245,84 @@ def flush! flushing_threads.each{|t| t.join } end + def shift_to_limited_mode! + log.info "shifts to the limited mode" + + limited_mode_agent = create_limited_mode_agent + @limited_router = limited_mode_agent.event_router + limited_mode_agent.lifecycle(desc: true) do |plugin| + plugin.start unless plugin.started? + plugin.after_start unless plugin.after_started? + end + + lifecycle_control_list[:input].select do |instance| + instance.limited_mode_ready? + end.each do |instance| + instance.shift_to_limited_mode! + end + + SHUTDOWN_SEQUENCES.each do |sequence| + if sequence.safe? + lifecycle do |instance, kind| + next if kind == :input and instance.limited_mode_ready? + execute_shutdown_sequence(sequence, instance, kind) + end + next + end + + operation_threads = [] + callback = ->(){ + operation_threads.each { |t| t.join } + operation_threads.clear + } + lifecycle(kind_callback: callback) do |instance, kind| + next if kind == :input and instance.limited_mode_ready? + t = Thread.new do + Thread.current.abort_on_exception = true + execute_shutdown_sequence(sequence, instance, kind) + end + operation_threads << t + end + end + + @limited_mode_agent = limited_mode_agent + end + + def create_limited_mode_agent + limited_mode_agent = Agent.new(log: log) + limited_mode_agent.configure( + Config::Element.new('LIMITED_MODE_OUTPUT', '', {}, [ + Config::Element.new('match', '**', {'@type' => 'forward'}, [ + Config::Element.new('server', '', { + 'host' => 'localhost', + 'port' => @limited_mode_forwarding_port, + }, []), + Config::Element.new('buffer', '', { + '@type' => 'file', + 'path' => @limited_mode_forwarding_buf_path, + 'flush_at_shutdown' => 'true', + 'retry_type' => 'periodic', + 'retry_wait' => '10s', + 'retry_randomize' => 'false', + }, []), + ]) + ]) + ) + limited_mode_agent + end + + def add_source_to_receive_from_limited_mode_agent + add_source( + 'forward', + Config::Element.new('source', '', { + '@type' => 'forward', + 'bind' => 'localhost', + 'port' => @limited_mode_forwarding_port, + }, [] + ), + ) + end + class ShutdownSequence attr_reader :method, :checker def initialize(method, checker, is_safe) diff --git a/lib/fluent/supervisor.rb b/lib/fluent/supervisor.rb index 5c1018c9c7..421939a25f 100644 --- a/lib/fluent/supervisor.rb +++ b/lib/fluent/supervisor.rb @@ -43,6 +43,8 @@ def before_run @rpc_endpoint = nil @rpc_server = nil @counter = nil + @socket_manager_server = nil + @is_limited_mode = false @fluentd_lock_dir = Dir.mktmpdir("fluentd-lock-") ENV['FLUENTD_LOCK_DIR'] = @fluentd_lock_dir @@ -66,8 +68,12 @@ def before_run if config[:disable_shared_socket] $log.info "shared socket for multiple workers is disabled" else - server = ServerEngine::SocketManager::Server.open - ENV['SERVERENGINE_SOCKETMANAGER_PATH'] = server.path.to_s + if ENV.key?('SERVERENGINE_SOCKETMANAGER_PATH') + @socket_manager_server = ServerEngine::SocketManager::Server.take_over_another_server(ENV['SERVERENGINE_SOCKETMANAGER_PATH']) + else + @socket_manager_server = ServerEngine::SocketManager::Server.open + ENV['SERVERENGINE_SOCKETMANAGER_PATH'] = @socket_manager_server.path.to_s + end end end @@ -76,7 +82,7 @@ def after_run stop_rpc_server if @rpc_endpoint stop_counter_server if @counter cleanup_lock_dir - Fluent::Supervisor.cleanup_resources + Fluent::Supervisor.cleanup_resources unless @is_limited_mode end def cleanup_lock_dir @@ -138,7 +144,7 @@ def run_rpc_server @rpc_server.mount_proc('/api/config.gracefulReload') { |req, res| $log.debug "fluentd RPC got /api/config.gracefulReload request" if Fluent.windows? - supervisor_sigusr2_handler + graceful_reload else Process.kill :USR2, Process.pid end @@ -187,7 +193,11 @@ def install_supervisor_signal_handlers trap :USR2 do $log.debug 'fluentd supervisor process got SIGUSR2' - supervisor_sigusr2_handler + if Fluent.windows? + graceful_reload + else + start_new_supervisor + end end end @@ -254,7 +264,7 @@ def install_windows_event_handler when :usr1 supervisor_sigusr1_handler when :usr2 - supervisor_sigusr2_handler + graceful_reload when :cont supervisor_dump_handler_for_windows when :stop_event_thread @@ -284,7 +294,7 @@ def supervisor_sigusr1_handler send_signal_to_workers(:USR1) end - def supervisor_sigusr2_handler + def graceful_reload conf = nil t = Thread.new do $log.info 'Reloading new config' @@ -312,6 +322,17 @@ def supervisor_sigusr2_handler $log.error "Failed to reload config file: #{e}" end + def start_new_supervisor + send_signal_to_workers(:USR2) + sleep 5 # TODO Wait until all workers finish shifting to the limited mode. How? + @is_limited_mode = true + commands = [ServerEngine.ruby_bin_path, $0] + ARGV + env_to_add = {"SERVERENGINE_SOCKETMANAGER_INTERNAL_TOKEN" => ServerEngine::SocketManager::INTERNAL_TOKEN} + Process.spawn(env_to_add, commands.join(" ")) + rescue => e + $log.error "Failed to start a new supervisor: #{e}" + end + def supervisor_dump_handler_for_windows # As for UNIX-like, SIGCONT signal to each process makes the process output its dump-file, # and it is implemented before the implementation of the function for Windows. @@ -832,7 +853,7 @@ def install_main_process_signal_handlers end trap :USR2 do - reload_config + shift_to_limited_mode end trap :CONT do @@ -891,6 +912,16 @@ def flush_buffer end end + def shift_to_limited_mode + Thread.new do + begin + Fluent::Engine.shift_to_limited_mode! + rescue Exception => e + $log.warn "failed to shift to the limited mode: #{e}" + end + end + end + def reload_config Thread.new do $log.debug('worker got SIGUSR2')