Skip to content

Commit

Permalink
Merge pull request #4086 from daipom/refactor-supervisor-loadconfig
Browse files Browse the repository at this point in the history
Refactor Superviser.load_config()
  • Loading branch information
ashie authored Mar 10, 2023
2 parents fb64173 + 041af39 commit ee796ef
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 138 deletions.
6 changes: 2 additions & 4 deletions lib/fluent/daemon.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,5 @@

server_module = Fluent.const_get(ARGV[0])
worker_module = Fluent.const_get(ARGV[1])
# it doesn't call ARGV in block because when reloading config, params will be initialized and then it can't use previous config.
config_path = ARGV[2]
params = JSON.parse(ARGV[3])
ServerEngine::Daemon.run_server(server_module, worker_module) { Fluent::Supervisor.load_config(config_path, params) }
params = JSON.parse(ARGV[2])
ServerEngine::Daemon.run_server(server_module, worker_module) { Fluent::Supervisor.serverengine_config(params) }
1 change: 0 additions & 1 deletion lib/fluent/log.rb
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,6 @@ def time_format=(time_fmt)
end

def reopen!
# do nothing in @logger.reopen! because it's already reopened in Supervisor.load_config
@logger.reopen! if @logger
nil
end
Expand Down
40 changes: 9 additions & 31 deletions lib/fluent/supervisor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -424,22 +424,7 @@ def dump
end

class Supervisor
# For ServerEngine's `reload_config`.
# This is called only at the initilization of the supervisor process,
# since Fluentd overwrites all related SIGNAL(HUP,USR1,USR2) and have own
# reloading feature.
def self.load_config(path, params = {})
pre_loadtime = 0
pre_loadtime = params['pre_loadtime'].to_i if params['pre_loadtime']
pre_config_mtime = nil
pre_config_mtime = params['pre_config_mtime'] if params['pre_config_mtime']
config_mtime = File.mtime(path)

# reuse previous config if last load time is within 5 seconds and mtime of the config file is not changed
if (Time.now - Time.at(pre_loadtime) < 5) && (config_mtime == pre_config_mtime)
return params['pre_conf']
end

def self.serverengine_config(params = {})
# ServerEngine's "daemonize" option is boolean, and path of pid file is brought by "pid_path"
pid_path = params['daemonize']
daemonize = !!params['daemonize']
Expand Down Expand Up @@ -470,28 +455,18 @@ def self.load_config(path, params = {})
File.join(File.dirname(__FILE__), 'daemon.rb'),
ServerModule.name,
WorkerModule.name,
path,
JSON.dump(params)],
command_sender: Fluent.windows? ? "pipe" : "signal",
config_path: params['fluentd_conf_path'],
fluentd_conf: params['fluentd_conf'],
conf_encoding: params['conf_encoding'],
inline_config: params['inline_config'],
config_path: path,
main_cmd: params['main_cmd'],
signame: params['signame'],
disable_shared_socket: params['disable_shared_socket'],
restart_worker_interval: params['restart_worker_interval'],
}
if daemonize
se_config[:pid_path] = pid_path
end
pre_params = params.dup
params['pre_loadtime'] = Time.now.to_i
params['pre_config_mtime'] = config_mtime
params['pre_conf'] = se_config
# prevent pre_conf from being too big by reloading many times.
pre_params['pre_conf'] = nil
params['pre_conf'][:windows_daemon_cmdline][5] = JSON.dump(pre_params)
se_config[:pid_path] = pid_path if daemonize

se_config
end
Expand Down Expand Up @@ -850,10 +825,11 @@ def supervise
'inline_config' => @inline_config,
'chuser' => @chuser,
'chgroup' => @chgroup,
'fluentd_conf_path' => @config_path,
'fluentd_conf' => @conf.to_s,
'use_v1_config' => @use_v1_config,
'conf_encoding' => @conf_encoding,
'signame' => @signame,
'fluentd_conf' => @conf.to_s,

'workers' => @system_config.workers,
'root_dir' => @system_config.root_dir,
Expand All @@ -866,8 +842,10 @@ def supervise
'restart_worker_interval' => @system_config.restart_worker_interval,
}

se = ServerEngine.create(ServerModule, WorkerModule){
Fluent::Supervisor.load_config(@config_path, params)
se = ServerEngine.create(ServerModule, WorkerModule) {
# Note: This is called only at the initialization of ServerEngine, since
# Fluentd overwrites all related SIGNAL(HUP,USR1,USR2) and have own reloading feature.
Fluent::Supervisor.serverengine_config(params)
}

se.run
Expand Down
112 changes: 10 additions & 102 deletions test/test_supervisor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -517,31 +517,15 @@ def server.config
assert_equal('{"ok":true}', response)
end

def test_load_config
tmp_dir = "#{@tmp_dir}/dir/test_load_config.conf"
conf_info_str = %[
<system>
log_level info
</system>
]
conf_debug_str = %[
<system>
log_level debug
</system>
]
now = Time.now
Timecop.freeze(now)

write_config tmp_dir, conf_info_str

def test_serverengine_config
params = {}
params['workers'] = 1
params['fluentd_conf_path'] = "fluentd.conf"
params['use_v1_config'] = true
params['log_level'] = Fluent::Log::LEVEL_INFO
params['conf_encoding'] = 'utf-8'
load_config_proc = Proc.new { Fluent::Supervisor.load_config(tmp_dir, params) }
params['log_level'] = Fluent::Log::LEVEL_INFO
load_config_proc = Proc.new { Fluent::Supervisor.serverengine_config(params) }

# first call
se_config = load_config_proc.call
assert_equal Fluent::Log::LEVEL_INFO, se_config[:log_level]
assert_equal 'spawn', se_config[:worker_type]
Expand All @@ -551,68 +535,21 @@ def test_load_config
assert_equal false, se_config[:log_stderr]
assert_equal true, se_config[:enable_heartbeat]
assert_equal false, se_config[:auto_heartbeat]
assert_equal "fluentd.conf", se_config[:config_path]
assert_equal false, se_config[:daemonize]
assert_nil se_config[:pid_path]

# second call immediately(reuse config)
se_config = load_config_proc.call
pre_config_mtime = se_config[:windows_daemon_cmdline][5]['pre_config_mtime']
pre_loadtime = se_config[:windows_daemon_cmdline][5]['pre_loadtime']
assert_nil pre_config_mtime
assert_nil pre_loadtime

Timecop.freeze(now + 5)

# third call after 5 seconds(don't reuse config)
se_config = load_config_proc.call
pre_config_mtime = se_config[:windows_daemon_cmdline][5]['pre_config_mtime']
pre_loadtime = se_config[:windows_daemon_cmdline][5]['pre_loadtime']
assert_not_nil pre_config_mtime
assert_not_nil pre_loadtime

# forth call immediately(reuse config)
se_config = load_config_proc.call
# test that pre_config_mtime and pre_loadtime are not changed from previous one because reused pre_config
assert_equal pre_config_mtime, se_config[:windows_daemon_cmdline][5]['pre_config_mtime']
assert_equal pre_loadtime, se_config[:windows_daemon_cmdline][5]['pre_loadtime']

write_config tmp_dir, conf_debug_str

# fifth call after changed conf file(don't reuse config)
se_config = load_config_proc.call
assert_equal Fluent::Log::LEVEL_INFO, se_config[:log_level]
ensure
Timecop.return
end

def test_load_config_for_daemonize
tmp_dir = "#{@tmp_dir}/dir/test_load_config.conf"
conf_info_str = %[
<system>
log_level info
</system>
]
conf_debug_str = %[
<system>
log_level debug
</system>
]

now = Time.now
Timecop.freeze(now)

write_config tmp_dir, conf_info_str

def test_serverengine_config_for_daemonize
params = {}
params['workers'] = 1
params['fluentd_conf_path'] = "fluentd.conf"
params['use_v1_config'] = true
params['log_path'] = 'test/tmp/supervisor/log'
params['conf_encoding'] = 'utf-8'
params['log_level'] = Fluent::Log::LEVEL_INFO
params['daemonize'] = './fluentd.pid'
params['conf_encoding'] = 'utf-8'
load_config_proc = Proc.new { Fluent::Supervisor.load_config(tmp_dir, params) }
load_config_proc = Proc.new { Fluent::Supervisor.serverengine_config(params) }

# first call
se_config = load_config_proc.call
assert_equal Fluent::Log::LEVEL_INFO, se_config[:log_level]
assert_equal 'spawn', se_config[:worker_type]
Expand All @@ -622,38 +559,9 @@ def test_load_config_for_daemonize
assert_equal false, se_config[:log_stderr]
assert_equal true, se_config[:enable_heartbeat]
assert_equal false, se_config[:auto_heartbeat]
assert_equal "fluentd.conf", se_config[:config_path]
assert_equal true, se_config[:daemonize]
assert_equal './fluentd.pid', se_config[:pid_path]

# second call immediately(reuse config)
se_config = load_config_proc.call
pre_config_mtime = se_config[:windows_daemon_cmdline][5]['pre_config_mtime']
pre_loadtime = se_config[:windows_daemon_cmdline][5]['pre_loadtime']
assert_nil pre_config_mtime
assert_nil pre_loadtime

Timecop.freeze(now + 5)

# third call after 6 seconds(don't reuse config)
se_config = load_config_proc.call
pre_config_mtime = se_config[:windows_daemon_cmdline][5]['pre_config_mtime']
pre_loadtime = se_config[:windows_daemon_cmdline][5]['pre_loadtime']
assert_not_nil pre_config_mtime
assert_not_nil pre_loadtime

# forth call immediately(reuse config)
se_config = load_config_proc.call
# test that pre_config_mtime and pre_loadtime are not changed from previous one because reused pre_config
assert_equal pre_config_mtime, se_config[:windows_daemon_cmdline][5]['pre_config_mtime']
assert_equal pre_loadtime, se_config[:windows_daemon_cmdline][5]['pre_loadtime']

write_config tmp_dir, conf_debug_str

# fifth call after changed conf file(don't reuse config)
se_config = load_config_proc.call
assert_equal Fluent::Log::LEVEL_INFO, se_config[:log_level]
ensure
Timecop.return
end

data("supervisor", { supervise: true })
Expand Down

0 comments on commit ee796ef

Please sign in to comment.