Skip to content

Commit

Permalink
Restart without downtime
Browse files Browse the repository at this point in the history
Add a new feature: Update/Reload without downtime.

1. The current supervisor receives a signal.
2. The current supervisor sends signals to its workers, and the
   workers stop all plugins that cannot run in parallel.
3. The current supervisor starts a new supervisor.
   * => Old processes and new processes run in parallel.
4. After the new supervisor and its workers start to work, the
   current supervisor and its workers stop.

ref: nginx's feature for upgrading on the fly

* http://nginx.org/en/docs/control.html#upgrade

Problem to solve:

Updating Fluentd or reloading a config causes downtime.
Plugins that receive data as a server, such as `in_udp`, `in_tcp`,
and `in_syslog`, cannot receive data during this time.
This means that the data sent by a client is lost during this
time unless the client has a re-sending feature.
This makes updating Fluentd or reloading a config difficult in
some cases.

Specific feature:

Run only limited Input plugins in parallel, such as `in_tcp`,
`in_udp`, and `in_syslog`.
Stop all plugins except those Input plugins, and prepare a
dedicated file buffer for Output.
After the new workers start, they load the file buffer and route
those events to the ROOT label.

Note: need treasure-data/serverengine#146

Signed-off-by: Daijiro Fukuda <[email protected]>
  • Loading branch information
daipom committed Aug 30, 2024
1 parent 6218372 commit 2f4fe0c
Show file tree
Hide file tree
Showing 7 changed files with 174 additions and 21 deletions.
4 changes: 4 additions & 0 deletions lib/fluent/engine.rb
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,10 @@ def flush!
@root_agent.flush!
end

def shift_to_limited_mode!
@root_agent.shift_to_limited_mode!
end

def now
# TODO thread update
Fluent::EventTime.now
Expand Down
4 changes: 4 additions & 0 deletions lib/fluent/plugin/in_tcp.rb
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ def multi_workers_ready?
true
end

def limited_mode_ready?
true
end

def start
super

Expand Down
4 changes: 4 additions & 0 deletions lib/fluent/plugin/in_udp.rb
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ def multi_workers_ready?
true
end

def limited_mode_ready?
true
end

def start
super

Expand Down
9 changes: 9 additions & 0 deletions lib/fluent/plugin/input.rb
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,15 @@ def metric_callback(es)
def multi_workers_ready?
false
end

def limited_mode_ready?
false
end

def shift_to_limited_mode!
raise "BUG: use shift_to_limited_mode although this plugin is not ready for the limited mode" unless limited_mode_ready?
event_emitter_force_limited_router
end
end
end
end
8 changes: 8 additions & 0 deletions lib/fluent/plugin_helper/event_emitter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ module EventEmitter

def router
@_event_emitter_used_actually = true

return Engine.root_agent.limited_router if @_event_emitter_force_limited_router

if @_event_emitter_lazy_init
@router = @primary_instance.router
end
Expand All @@ -48,6 +51,10 @@ def event_emitter_used_actually?
@_event_emitter_used_actually
end

def event_emitter_force_limited_router
@_event_emitter_force_limited_router = true
end

def event_emitter_router(label_name)
if label_name
if label_name == "@ROOT"
Expand All @@ -72,6 +79,7 @@ def initialize
super
@_event_emitter_used_actually = false
@_event_emitter_lazy_init = false
@_event_emitter_force_limited_router = false
@router = nil
end

Expand Down
119 changes: 106 additions & 13 deletions lib/fluent/root_agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@ module Fluent
#
# RootAgent
# |
# +------------+-------------+-------------+
# | | | |
# <label> <source> <filter> <match>
# |
# +----+----+
# | |
# <filter> <match>
# +------------+-------------+-------------+------------+
# | | | | |
# <label> <source> <filter> <match> <agents>
# | |
# +----+----+ +----+----+
# | | | |
# <filter> <match> <filter> <match>
#
# Relation:
# * RootAgent has many <label>, <source>, <filter> and <match>
Expand All @@ -51,6 +51,7 @@ def initialize(log:, system_config: SystemConfig.new)
super(log: log)

@labels = {}
@agents = []
@inputs = []
@suppress_emit_error_log_interval = 0
@next_emit_error_log_time = nil
Expand All @@ -60,10 +61,15 @@ def initialize(log:, system_config: SystemConfig.new)
suppress_interval(system_config.emit_error_log_interval) unless system_config.emit_error_log_interval.nil?
@without_source = system_config.without_source unless system_config.without_source.nil?
@enable_input_metrics = !!system_config.enable_input_metrics

@limited_router = nil
@limited_mode_agent_for_load = nil
@limited_mode_agent_buf_path = File.join(system_config.root_dir || DEFAULT_BACKUP_DIR, 'limited_mode_buffer')
end

attr_reader :inputs
attr_reader :labels
attr_reader :limited_router

def configure(conf)
used_worker_ids = []
Expand Down Expand Up @@ -161,6 +167,10 @@ def configure(conf)
add_source(type, e)
}
end

# TODO Stop doing this when it is not needed.
@limited_mode_agent_for_load = create_limited_mode_agent_for_load
@agents << @limited_mode_agent_for_load
end

def setup_error_label(e)
Expand All @@ -170,15 +180,15 @@ def setup_error_label(e)
end

def lifecycle(desc: false, kind_callback: nil)
kind_or_label_list = if desc
[:output, :filter, @labels.values.reverse, :output_with_router, :input].flatten
kind_or_agent_list = if desc
[:output, :filter, @agents.reverse, @labels.values.reverse, :output_with_router, :input].flatten
else
[:input, :output_with_router, @labels.values, :filter, :output].flatten
[:input, :output_with_router, @labels.values, @agents, :filter, :output].flatten
end
kind_or_label_list.each do |kind|
kind_or_agent_list.each do |kind|
if kind.respond_to?(:lifecycle)
label = kind
label.lifecycle(desc: desc) do |plugin, display_kind|
agent = kind
agent.lifecycle(desc: desc) do |plugin, display_kind|
yield plugin, display_kind
end
else
Expand Down Expand Up @@ -231,6 +241,89 @@ def flush!
flushing_threads.each{|t| t.join }
end

def shift_to_limited_mode!
log.info "shifts to the limited mode"

# Stop limited_mode_agent_for_load first to avoid using the same filer buffer path
if @limited_mode_agent_for_load
@limited_mode_agent_for_load.lifecycle do |instance, kind|
execute_shutdown_sequence(SHUTDOWN_SEQUENCES[0], instance, kind)
end
end

limited_mode_agent_for_output = create_limited_mode_agent_for_output
@limited_router = limited_mode_agent_for_output.event_router

lifecycle_control_list[:input].select do |instance|
instance.limited_mode_ready?
end.each do |instance|
instance.shift_to_limited_mode!
end

SHUTDOWN_SEQUENCES.each do |sequence|
if sequence.safe?
lifecycle do |instance, kind|
next if kind == :input and instance.limited_mode_ready?
execute_shutdown_sequence(sequence, instance, kind)
end
next
end

operation_threads = []
callback = ->(){
operation_threads.each { |t| t.join }
operation_threads.clear
}
lifecycle(kind_callback: callback) do |instance, kind|
next if kind == :input and instance.limited_mode_ready?
t = Thread.new do
Thread.current.abort_on_exception = true
execute_shutdown_sequence(sequence, instance, kind)
end
operation_threads << t
end
end

@agents << limited_mode_agent_for_output
end

def create_limited_mode_agent_for_output
limited_mode_agent_for_output = Agent.new(log: log)
limited_mode_agent_for_output.configure(
Config::Element.new('LIMITED_MODE_OUTPUT', '', {}, [
# TODO Use specialized plugin, not out_relabel
Config::Element.new('match', '**', {'@type' => 'relabel'}, [
Config::Element.new('buffer', 'tag', {
'@type' => 'file',
'path' => @limited_mode_agent_buf_path,
'flush_mode' => 'immediate',
'flush_at_shutdown' => 'false',
'flush_thread_count' => 0,
}, [])
])
])
)
limited_mode_agent_for_output
end

def create_limited_mode_agent_for_load
limited_mode_agent_for_load = Agent.new(log: log)
limited_mode_agent_for_load.configure(
Config::Element.new('LIMITED_MODE_LOAD', '', {}, [
# TODO Use specialized plugin, not out_relabel
Config::Element.new('match', '**', {'@type' => 'relabel', '@label' => '@ROOT'}, [
Config::Element.new('buffer', 'tag', {
'@type' => 'file',
'path' => @limited_mode_agent_buf_path,
'flush_mode' => 'immediate',
'flush_at_shutdown' => 'false',
}, [])
])
])
)
limited_mode_agent_for_load
end

class ShutdownSequence
attr_reader :method, :checker
def initialize(method, checker, is_safe)
Expand Down
47 changes: 39 additions & 8 deletions lib/fluent/supervisor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ def before_run
@rpc_endpoint = nil
@rpc_server = nil
@counter = nil
@socket_manager_server = nil
@is_limited_mode = false

@fluentd_lock_dir = Dir.mktmpdir("fluentd-lock-")
ENV['FLUENTD_LOCK_DIR'] = @fluentd_lock_dir
Expand All @@ -66,8 +68,12 @@ def before_run
if config[:disable_shared_socket]
$log.info "shared socket for multiple workers is disabled"
else
server = ServerEngine::SocketManager::Server.open
ENV['SERVERENGINE_SOCKETMANAGER_PATH'] = server.path.to_s
if ENV.key?('SERVERENGINE_SOCKETMANAGER_PATH')
@socket_manager_server = ServerEngine::SocketManager::Server.take_over_another_server(ENV['SERVERENGINE_SOCKETMANAGER_PATH'])
else
@socket_manager_server = ServerEngine::SocketManager::Server.open
ENV['SERVERENGINE_SOCKETMANAGER_PATH'] = @socket_manager_server.path.to_s
end
end
end

Expand All @@ -76,7 +82,7 @@ def after_run
stop_rpc_server if @rpc_endpoint
stop_counter_server if @counter
cleanup_lock_dir
Fluent::Supervisor.cleanup_resources
Fluent::Supervisor.cleanup_resources unless @is_limited_mode
end

def cleanup_lock_dir
Expand Down Expand Up @@ -138,7 +144,7 @@ def run_rpc_server
@rpc_server.mount_proc('/api/config.gracefulReload') { |req, res|
$log.debug "fluentd RPC got /api/config.gracefulReload request"
if Fluent.windows?
supervisor_sigusr2_handler
graceful_reload
else
Process.kill :USR2, Process.pid
end
Expand Down Expand Up @@ -187,7 +193,11 @@ def install_supervisor_signal_handlers

trap :USR2 do
$log.debug 'fluentd supervisor process got SIGUSR2'
supervisor_sigusr2_handler
if Fluent.windows?
graceful_reload
else
start_new_supervisor
end
end
end

Expand Down Expand Up @@ -254,7 +264,7 @@ def install_windows_event_handler
when :usr1
supervisor_sigusr1_handler
when :usr2
supervisor_sigusr2_handler
graceful_reload
when :cont
supervisor_dump_handler_for_windows
when :stop_event_thread
Expand Down Expand Up @@ -284,7 +294,7 @@ def supervisor_sigusr1_handler
send_signal_to_workers(:USR1)
end

def supervisor_sigusr2_handler
def graceful_reload
conf = nil
t = Thread.new do
$log.info 'Reloading new config'
Expand Down Expand Up @@ -312,6 +322,17 @@ def supervisor_sigusr2_handler
$log.error "Failed to reload config file: #{e}"
end

def start_new_supervisor
send_signal_to_workers(:USR2)
sleep 5 # TODO Wait until all workers finish shifting to the limited mode. How?
@is_limited_mode = true
commands = [ServerEngine.ruby_bin_path, $0] + ARGV
env_to_add = {"SERVERENGINE_SOCKETMANAGER_INTERNAL_TOKEN" => ServerEngine::SocketManager::INTERNAL_TOKEN}
Process.spawn(env_to_add, commands.join(" "))
rescue => e
$log.error "Failed to start a new supervisor: #{e}"
end

def supervisor_dump_handler_for_windows
# As for UNIX-like, SIGCONT signal to each process makes the process output its dump-file,
# and it is implemented before the implementation of the function for Windows.
Expand Down Expand Up @@ -832,7 +853,7 @@ def install_main_process_signal_handlers
end

trap :USR2 do
reload_config
shift_to_limited_mode
end

trap :CONT do
Expand Down Expand Up @@ -891,6 +912,16 @@ def flush_buffer
end
end

def shift_to_limited_mode
Thread.new do
begin
Fluent::Engine.shift_to_limited_mode!
rescue Exception => e
$log.warn "failed to shift to the limited mode: #{e}"
end
end
end

def reload_config
Thread.new do
$log.debug('worker got SIGUSR2')
Expand Down

0 comments on commit 2f4fe0c

Please sign in to comment.