Skip to content

Commit

Permalink
Refactor logger initialization
Browse files Browse the repository at this point in the history
In the supervisor process, Fluentd initializes the logger twice,
at `Supervisor::configure()` and `Supervisor.load_config()`.

`Supervisor.load_config()` is called in ServerEngine's reloading
function, but Fluentd doesn't use the function even when `SIGHUP`
or `SIGUSR2`.
So I can't see the reason for initializing the logger in the
callback.

In addition, I removed some needless parameters that were passed
to ServerEngine.

This also fixes a problem that `ignore_same_log_interval` is not
passed to the `Supervisor.load_config()` and that option is not
reflected to the logger of the supervisor process.

Signed-off-by: Daijiro Fukuda <[email protected]>
  • Loading branch information
daipom committed Feb 19, 2023
1 parent 07e3ad1 commit 39c6698
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 67 deletions.
2 changes: 1 addition & 1 deletion lib/fluent/log.rb
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ def dup

attr_reader :format
attr_reader :time_format
attr_accessor :log_event_enabled, :ignore_repeated_log_interval, :ignore_same_log_interval
attr_accessor :log_event_enabled, :ignore_repeated_log_interval, :ignore_same_log_interval, :suppress_repeated_stacktrace
attr_accessor :out
attr_accessor :level
attr_accessor :optional_header, :optional_attrs
Expand Down
43 changes: 9 additions & 34 deletions lib/fluent/supervisor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -437,28 +437,10 @@ def self.load_config(path, params = {})
end

log_level = params['log_level']
suppress_repeated_stacktrace = params['suppress_repeated_stacktrace']
ignore_repeated_log_interval = params['ignore_repeated_log_interval']
ignore_same_log_interval = params['ignore_same_log_interval']

log_path = params['log_path']
chuser = params['chuser']
chgroup = params['chgroup']
chumask = params['chumask']
log_rotate_age = params['log_rotate_age']
log_rotate_size = params['log_rotate_size']

log_opts = {suppress_repeated_stacktrace: suppress_repeated_stacktrace, ignore_repeated_log_interval: ignore_repeated_log_interval,
ignore_same_log_interval: ignore_same_log_interval}
logger_initializer = Supervisor::LoggerInitializer.new(
log_path, log_level, chuser, chgroup, log_opts,
log_rotate_age: log_rotate_age,
log_rotate_size: log_rotate_size
)
# this #init sets initialized logger to $log
logger_initializer.init(:supervisor, 0)
logger_initializer.apply_options(format: params['log_format'], time_format: params['log_time_format'])
logger = $log

command_sender = Fluent.windows? ? "pipe" : "signal"

Expand All @@ -477,17 +459,13 @@ def self.load_config(path, params = {})
unrecoverable_exit_codes: [2],
stop_immediately_at_unrecoverable_exit: true,
root_dir: params['root_dir'],
logger: logger,
log: logger.out,
log_path: log_path,
logger: $log,
log: $log.out,
log_level: log_level,
logger_initializer: logger_initializer,
logger_initializer: params['logger_initializer'],
chuser: chuser,
chgroup: chgroup,
chumask: chumask,
suppress_repeated_stacktrace: suppress_repeated_stacktrace,
ignore_repeated_log_interval: ignore_repeated_log_interval,
ignore_same_log_interval: ignore_same_log_interval,
daemonize: daemonize,
rpc_endpoint: params['rpc_endpoint'],
counter_server: params['counter_server'],
Expand Down Expand Up @@ -601,11 +579,13 @@ def reopen!
self
end

def apply_options(format: nil, time_format: nil, log_dir_perm: nil, ignore_repeated_log_interval: nil, ignore_same_log_interval: nil)
def apply_options(format: nil, time_format: nil, log_dir_perm: nil,
ignore_repeated_log_interval: nil, ignore_same_log_interval: nil, suppress_repeated_stacktrace: nil)
$log.format = format if format
$log.time_format = time_format if time_format
$log.ignore_repeated_log_interval = ignore_repeated_log_interval if ignore_repeated_log_interval
$log.ignore_same_log_interval = ignore_same_log_interval if ignore_same_log_interval
$log.suppress_repeated_stacktrace = suppress_repeated_stacktrace unless suppress_repeated_stacktrace.nil?

if @path && log_dir_perm
File.chmod(log_dir_perm || Fluent::DEFAULT_DIR_PERMISSION, File.dirname(@path))
Expand Down Expand Up @@ -820,7 +800,8 @@ def configure(supervisor: false)
time_format: @system_config.log.time_format,
log_dir_perm: @system_config.dir_permission,
ignore_repeated_log_interval: @system_config.ignore_repeated_log_interval,
ignore_same_log_interval: @system_config.ignore_same_log_interval
ignore_same_log_interval: @system_config.ignore_same_log_interval,
suppress_repeated_stacktrace: @system_config.suppress_repeated_stacktrace,
)

$log.info :supervisor, 'parsing config file is succeeded', path: @config_path
Expand Down Expand Up @@ -869,9 +850,6 @@ def supervise
'main_cmd' => fluentd_spawn_cmd,
'daemonize' => @daemonize,
'inline_config' => @inline_config,
'log_path' => @log_path,
'log_rotate_age' => @log_rotate_age,
'log_rotate_size' => @log_rotate_size,
'chuser' => @chuser,
'chgroup' => @chgroup,
'use_v1_config' => @use_v1_config,
Expand All @@ -881,14 +859,11 @@ def supervise

'workers' => @system_config.workers,
'root_dir' => @system_config.root_dir,
'logger_initializer' => @log,
'log_level' => @system_config.log_level,
'suppress_repeated_stacktrace' => @system_config.suppress_repeated_stacktrace,
'ignore_repeated_log_interval' => @system_config.ignore_repeated_log_interval,
'rpc_endpoint' => @system_config.rpc_endpoint,
'enable_get_dump' => @system_config.enable_get_dump,
'counter_server' => @system_config.counter_server,
'log_format' => @system_config.log.format,
'log_time_format' => @system_config.log.time_format,
'disable_shared_socket' => @system_config.disable_shared_socket,
'restart_worker_interval' => @system_config.restart_worker_interval,
}
Expand Down
61 changes: 29 additions & 32 deletions test/test_supervisor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -546,16 +546,13 @@ def test_load_config
params = {}
params['workers'] = 1
params['use_v1_config'] = true
params['log_path'] = 'test/tmp/supervisor/log'
params['suppress_repeated_stacktrace'] = true
params['log_level'] = Fluent::Log::LEVEL_INFO
params['conf_encoding'] = 'utf-8'
load_config_proc = Proc.new { Fluent::Supervisor.load_config(tmp_dir, params) }

# first call
se_config = load_config_proc.call
assert_equal Fluent::Log::LEVEL_INFO, se_config[:log_level]
assert_equal true, se_config[:suppress_repeated_stacktrace]
assert_equal 'spawn', se_config[:worker_type]
assert_equal 1, se_config[:workers]
assert_equal false, se_config[:log_stdin]
Expand Down Expand Up @@ -597,33 +594,6 @@ def test_load_config
Timecop.return
end

def test_load_config_for_logger
tmp_dir = "#{@tmp_dir}/dir/test_load_config_log.conf"
conf_info_str = %[
<system>
<log>
format json
time_format %FT%T.%L%z
</log>
</system>
]
write_config tmp_dir, conf_info_str
params = {
'use_v1_config' => true,
'conf_encoding' => 'utf8',
'log_level' => Fluent::Log::LEVEL_INFO,
'log_path' => 'test/tmp/supervisor/log',

'workers' => 1,
'log_format' => :json,
'log_time_format' => '%FT%T.%L%z',
}

r = Fluent::Supervisor.load_config(tmp_dir, params)
assert_equal :json, r[:logger].format
assert_equal '%FT%T.%L%z', r[:logger].time_format
end

def test_load_config_for_daemonize
tmp_dir = "#{@tmp_dir}/dir/test_load_config.conf"
conf_info_str = %[
Expand All @@ -646,7 +616,6 @@ def test_load_config_for_daemonize
params['workers'] = 1
params['use_v1_config'] = true
params['log_path'] = 'test/tmp/supervisor/log'
params['suppress_repeated_stacktrace'] = true
params['log_level'] = Fluent::Log::LEVEL_INFO
params['daemonize'] = './fluentd.pid'
params['conf_encoding'] = 'utf-8'
Expand All @@ -655,7 +624,6 @@ def test_load_config_for_daemonize
# first call
se_config = load_config_proc.call
assert_equal Fluent::Log::LEVEL_INFO, se_config[:log_level]
assert_equal true, se_config[:suppress_repeated_stacktrace]
assert_equal 'spawn', se_config[:worker_type]
assert_equal 1, se_config[:workers]
assert_equal false, se_config[:log_stdin]
Expand Down Expand Up @@ -697,6 +665,35 @@ def test_load_config_for_daemonize
Timecop.return
end

data("supervisor", { supervise: true })
data("worker", { supervise: false })
def test_init_for_logger(data)
tmp_conf_path = "#{@tmp_dir}/dir/test_init_for_logger.conf"
conf_info_str = %[
<system>
suppress_repeated_stacktrace false
ignore_repeated_log_interval 10s
ignore_same_log_interval 20s
<log>
format json
time_format %FT%T.%L%z
</log>
</system>
]
write_config tmp_conf_path, conf_info_str

opts = Fluent::Supervisor.default_options
opts[:config_path] = tmp_conf_path
s = Fluent::Supervisor.new(opts)
s.configure(supervisor: data[:supervise])

assert_equal :json, $log.format
assert_equal '%FT%T.%L%z', $log.time_format
assert_equal false, $log.suppress_repeated_stacktrace
assert_equal 10, $log.ignore_repeated_log_interval
assert_equal 20, $log.ignore_same_log_interval
end

def test_logger
opts = Fluent::Supervisor.default_options
sv = Fluent::Supervisor.new(opts)
Expand Down

0 comments on commit 39c6698

Please sign in to comment.