Skip to content

Commit

Permalink
Restart without downtime
Browse files Browse the repository at this point in the history
Add a new feature: Update/Reload without downtime.

1. The current supervisor receives a signal.
2. The current supervisor sends signals to its workers, and the
   workers stop all plugins that cannot run in parallel.
3. The current supervisor starts a new supervisor.
   * => Old processes and new processes run in parallel.
4. After the new supervisor and its workers start to work, the
   current supervisor and its workers stop.

ref: nginx's feature for upgrading on the fly

* http://nginx.org/en/docs/control.html#upgrade

Problem to solve:

Updating Fluentd or reloading a config causes downtime.
Plugins that receive data as a server, such as `in_udp`, `in_tcp`,
and `in_syslog`, cannot receive data during this time.
This means that the data sent by a client is lost during this
time unless the client has a re-sending feature.
This makes updating Fluentd or reloading a config difficult in
some cases.

Specific feature:

Run only limited Input plugins in parallel, such as `in_tcp`,
`in_udp`, and `in_syslog`.
Stop all plugins except those Input plugins, and prepare an
agent for forwarding data to the new workers.
After the new workers start, they receive events from the old
workers.

Note: need treasure-data/serverengine#146

Signed-off-by: Daijiro Fukuda <[email protected]>
  • Loading branch information
daipom committed Oct 3, 2024
1 parent 79d0293 commit d0f31e8
Show file tree
Hide file tree
Showing 7 changed files with 163 additions and 11 deletions.
4 changes: 4 additions & 0 deletions lib/fluent/engine.rb
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,10 @@ def flush!
@root_agent.flush!
end

def shift_to_limited_mode!
@root_agent.shift_to_limited_mode!
end

def now
# TODO thread update
Fluent::EventTime.now
Expand Down
4 changes: 4 additions & 0 deletions lib/fluent/plugin/in_tcp.rb
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ def multi_workers_ready?
true
end

def limited_mode_ready?
true
end

def start
super

Expand Down
4 changes: 4 additions & 0 deletions lib/fluent/plugin/in_udp.rb
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ def multi_workers_ready?
true
end

def limited_mode_ready?
true
end

def start
super

Expand Down
9 changes: 9 additions & 0 deletions lib/fluent/plugin/input.rb
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,15 @@ def metric_callback(es)
def multi_workers_ready?
false
end

def limited_mode_ready?
false
end

def shift_to_limited_mode!
raise "BUG: use shift_to_limited_mode although this plugin is not ready for the limited mode" unless limited_mode_ready?
event_emitter_force_limited_router
end
end
end
end
8 changes: 8 additions & 0 deletions lib/fluent/plugin_helper/event_emitter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ module EventEmitter

def router
@_event_emitter_used_actually = true

return Engine.root_agent.limited_router if @_event_emitter_force_limited_router

if @_event_emitter_lazy_init
@router = @primary_instance.router
end
Expand All @@ -48,6 +51,10 @@ def event_emitter_used_actually?
@_event_emitter_used_actually
end

def event_emitter_force_limited_router
@_event_emitter_force_limited_router = true
end

def event_emitter_router(label_name)
if label_name
if label_name == "@ROOT"
Expand All @@ -72,6 +79,7 @@ def initialize
super
@_event_emitter_used_actually = false
@_event_emitter_lazy_init = false
@_event_emitter_force_limited_router = false
@router = nil
end

Expand Down
98 changes: 95 additions & 3 deletions lib/fluent/root_agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,16 @@ def initialize(log:, system_config: SystemConfig.new)
suppress_interval(system_config.emit_error_log_interval) unless system_config.emit_error_log_interval.nil?
@without_source = system_config.without_source unless system_config.without_source.nil?
@enable_input_metrics = !!system_config.enable_input_metrics

@limited_mode_agent = nil
@limited_router = nil
@limited_mode_forwarding_port = "29140"
@limited_mode_forwarding_buf_path = File.join(system_config.root_dir || DEFAULT_BACKUP_DIR, "limited_mode_buffer")
end

attr_reader :inputs
attr_reader :labels
attr_reader :limited_router

def configure(conf)
used_worker_ids = []
Expand Down Expand Up @@ -161,6 +167,9 @@ def configure(conf)
add_source(type, e)
}
end

# TODO Stop doing this when it is not needed.
add_source_to_receive_from_limited_mode_agent
end

def setup_error_label(e)
Expand Down Expand Up @@ -192,10 +201,15 @@ def lifecycle(desc: false, kind_callback: nil)
yield instance, display_kind
end
end
if kind_callback
kind_callback.call
end

kind_callback&.call
end

return unless @limited_mode_agent
@limited_mode_agent.lifecycle do |plugin, display_kind|
yield plugin, display_kind
end
kind_callback&.call
end

def start
Expand Down Expand Up @@ -231,6 +245,84 @@ def flush!
flushing_threads.each{|t| t.join }
end

def shift_to_limited_mode!
log.info "shifts to the limited mode"

limited_mode_agent = create_limited_mode_agent
@limited_router = limited_mode_agent.event_router
limited_mode_agent.lifecycle(desc: true) do |plugin|
plugin.start unless plugin.started?
plugin.after_start unless plugin.after_started?
end

lifecycle_control_list[:input].select do |instance|
instance.limited_mode_ready?
end.each do |instance|
instance.shift_to_limited_mode!
end

SHUTDOWN_SEQUENCES.each do |sequence|
if sequence.safe?
lifecycle do |instance, kind|
next if kind == :input and instance.limited_mode_ready?
execute_shutdown_sequence(sequence, instance, kind)
end
next
end

operation_threads = []
callback = ->(){
operation_threads.each { |t| t.join }
operation_threads.clear
}
lifecycle(kind_callback: callback) do |instance, kind|
next if kind == :input and instance.limited_mode_ready?
t = Thread.new do
Thread.current.abort_on_exception = true
execute_shutdown_sequence(sequence, instance, kind)
end
operation_threads << t
end
end

@limited_mode_agent = limited_mode_agent
end

def create_limited_mode_agent
limited_mode_agent = Agent.new(log: log)
limited_mode_agent.configure(
Config::Element.new('LIMITED_MODE_OUTPUT', '', {}, [
Config::Element.new('match', '**', {'@type' => 'forward'}, [
Config::Element.new('server', '', {
'host' => 'localhost',
'port' => @limited_mode_forwarding_port,
}, []),
Config::Element.new('buffer', '', {
'@type' => 'file',
'path' => @limited_mode_forwarding_buf_path,
'flush_at_shutdown' => 'true',
'retry_type' => 'periodic',
'retry_wait' => '10s',
'retry_randomize' => 'false',
}, []),
])
])
)
limited_mode_agent
end

def add_source_to_receive_from_limited_mode_agent
add_source(
'forward',
Config::Element.new('source', '', {
'@type' => 'forward',
'bind' => 'localhost',
'port' => @limited_mode_forwarding_port,
}, []
),
)
end

class ShutdownSequence
attr_reader :method, :checker
def initialize(method, checker, is_safe)
Expand Down
47 changes: 39 additions & 8 deletions lib/fluent/supervisor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ def before_run
@rpc_endpoint = nil
@rpc_server = nil
@counter = nil
@socket_manager_server = nil
@is_limited_mode = false

@fluentd_lock_dir = Dir.mktmpdir("fluentd-lock-")
ENV['FLUENTD_LOCK_DIR'] = @fluentd_lock_dir
Expand All @@ -66,8 +68,12 @@ def before_run
if config[:disable_shared_socket]
$log.info "shared socket for multiple workers is disabled"
else
server = ServerEngine::SocketManager::Server.open
ENV['SERVERENGINE_SOCKETMANAGER_PATH'] = server.path.to_s
if ENV.key?('SERVERENGINE_SOCKETMANAGER_PATH')
@socket_manager_server = ServerEngine::SocketManager::Server.take_over_another_server(ENV['SERVERENGINE_SOCKETMANAGER_PATH'])
else
@socket_manager_server = ServerEngine::SocketManager::Server.open
ENV['SERVERENGINE_SOCKETMANAGER_PATH'] = @socket_manager_server.path.to_s
end
end
end

Expand All @@ -76,7 +82,7 @@ def after_run
stop_rpc_server if @rpc_endpoint
stop_counter_server if @counter
cleanup_lock_dir
Fluent::Supervisor.cleanup_resources
Fluent::Supervisor.cleanup_resources unless @is_limited_mode
end

def cleanup_lock_dir
Expand Down Expand Up @@ -138,7 +144,7 @@ def run_rpc_server
@rpc_server.mount_proc('/api/config.gracefulReload') { |req, res|
$log.debug "fluentd RPC got /api/config.gracefulReload request"
if Fluent.windows?
supervisor_sigusr2_handler
graceful_reload
else
Process.kill :USR2, Process.pid
end
Expand Down Expand Up @@ -187,7 +193,11 @@ def install_supervisor_signal_handlers

trap :USR2 do
$log.debug 'fluentd supervisor process got SIGUSR2'
supervisor_sigusr2_handler
if Fluent.windows?
graceful_reload
else
start_new_supervisor
end
end
end

Expand Down Expand Up @@ -254,7 +264,7 @@ def install_windows_event_handler
when :usr1
supervisor_sigusr1_handler
when :usr2
supervisor_sigusr2_handler
graceful_reload
when :cont
supervisor_dump_handler_for_windows
when :stop_event_thread
Expand Down Expand Up @@ -284,7 +294,7 @@ def supervisor_sigusr1_handler
send_signal_to_workers(:USR1)
end

def supervisor_sigusr2_handler
def graceful_reload
conf = nil
t = Thread.new do
$log.info 'Reloading new config'
Expand Down Expand Up @@ -312,6 +322,17 @@ def supervisor_sigusr2_handler
$log.error "Failed to reload config file: #{e}"
end

def start_new_supervisor
send_signal_to_workers(:USR2)
sleep 5 # TODO Wait until all workers finish shifting to the limited mode. How?
@is_limited_mode = true
commands = [ServerEngine.ruby_bin_path, $0] + ARGV
env_to_add = {"SERVERENGINE_SOCKETMANAGER_INTERNAL_TOKEN" => ServerEngine::SocketManager::INTERNAL_TOKEN}
Process.spawn(env_to_add, commands.join(" "))
rescue => e
$log.error "Failed to start a new supervisor: #{e}"
end

def supervisor_dump_handler_for_windows
# As for UNIX-like, SIGCONT signal to each process makes the process output its dump-file,
# and it is implemented before the implementation of the function for Windows.
Expand Down Expand Up @@ -834,7 +855,7 @@ def install_main_process_signal_handlers
end

trap :USR2 do
reload_config
shift_to_limited_mode
end

trap :CONT do
Expand Down Expand Up @@ -893,6 +914,16 @@ def flush_buffer
end
end

def shift_to_limited_mode
Thread.new do
begin
Fluent::Engine.shift_to_limited_mode!
rescue Exception => e
$log.warn "failed to shift to the limited mode: #{e}"
end
end
end

def reload_config
Thread.new do
$log.debug('worker got SIGUSR2')
Expand Down

0 comments on commit d0f31e8

Please sign in to comment.