Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor logger initialization #4065

Merged
merged 3 commits into from
Mar 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/fluent/log.rb
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ def dup

attr_reader :format
attr_reader :time_format
attr_accessor :log_event_enabled, :ignore_repeated_log_interval, :ignore_same_log_interval
attr_accessor :log_event_enabled, :ignore_repeated_log_interval, :ignore_same_log_interval, :suppress_repeated_stacktrace
attr_accessor :out
attr_accessor :level
attr_accessor :optional_header, :optional_attrs
Expand Down
65 changes: 18 additions & 47 deletions lib/fluent/supervisor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,10 @@ def dump
end

class Supervisor
# For ServerEngine's `reload_config`.
# This is called only at the initilization of the supervisor process,
# since Fluentd overwrites all related SIGNAL(HUP,USR1,USR2) and have own
# reloading feature.
def self.load_config(path, params = {})
Comment on lines +427 to 431
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we should rename or move this method.
It is hard to understand what this method is.
I will do it in another PR.

pre_loadtime = 0
pre_loadtime = params['pre_loadtime'].to_i if params['pre_loadtime']
Expand All @@ -436,32 +440,6 @@ def self.load_config(path, params = {})
return params['pre_conf']
end

log_level = params['log_level']
suppress_repeated_stacktrace = params['suppress_repeated_stacktrace']
ignore_repeated_log_interval = params['ignore_repeated_log_interval']
ignore_same_log_interval = params['ignore_same_log_interval']

log_path = params['log_path']
chuser = params['chuser']
chgroup = params['chgroup']
chumask = params['chumask']
log_rotate_age = params['log_rotate_age']
log_rotate_size = params['log_rotate_size']

log_opts = {suppress_repeated_stacktrace: suppress_repeated_stacktrace, ignore_repeated_log_interval: ignore_repeated_log_interval,
ignore_same_log_interval: ignore_same_log_interval}
logger_initializer = Supervisor::LoggerInitializer.new(
log_path, log_level, chuser, chgroup, log_opts,
log_rotate_age: log_rotate_age,
log_rotate_size: log_rotate_size
)
# this #init sets initialized logger to $log
logger_initializer.init(:supervisor, 0)
logger_initializer.apply_options(format: params['log_format'], time_format: params['log_time_format'])
logger = $log

command_sender = Fluent.windows? ? "pipe" : "signal"

# ServerEngine's "daemonize" option is boolean, and path of pid file is brought by "pid_path"
pid_path = params['daemonize']
daemonize = !!params['daemonize']
Expand All @@ -477,17 +455,13 @@ def self.load_config(path, params = {})
unrecoverable_exit_codes: [2],
stop_immediately_at_unrecoverable_exit: true,
root_dir: params['root_dir'],
logger: logger,
log: logger.out,
log_path: log_path,
log_level: log_level,
logger_initializer: logger_initializer,
chuser: chuser,
chgroup: chgroup,
chumask: chumask,
suppress_repeated_stacktrace: suppress_repeated_stacktrace,
ignore_repeated_log_interval: ignore_repeated_log_interval,
ignore_same_log_interval: ignore_same_log_interval,
logger: $log,
log: $log.out,
log_level: params['log_level'],
logger_initializer: params['logger_initializer'],
chuser: params['chuser'],
chgroup: params['chgroup'],
chumask: params['chumask'],
daemonize: daemonize,
rpc_endpoint: params['rpc_endpoint'],
counter_server: params['counter_server'],
Expand All @@ -498,7 +472,7 @@ def self.load_config(path, params = {})
WorkerModule.name,
path,
JSON.dump(params)],
command_sender: command_sender,
command_sender: Fluent.windows? ? "pipe" : "signal",
fluentd_conf: params['fluentd_conf'],
conf_encoding: params['conf_encoding'],
inline_config: params['inline_config'],
Expand Down Expand Up @@ -601,11 +575,13 @@ def reopen!
self
end

def apply_options(format: nil, time_format: nil, log_dir_perm: nil, ignore_repeated_log_interval: nil, ignore_same_log_interval: nil)
def apply_options(format: nil, time_format: nil, log_dir_perm: nil,
ignore_repeated_log_interval: nil, ignore_same_log_interval: nil, suppress_repeated_stacktrace: nil)
$log.format = format if format
$log.time_format = time_format if time_format
$log.ignore_repeated_log_interval = ignore_repeated_log_interval if ignore_repeated_log_interval
$log.ignore_same_log_interval = ignore_same_log_interval if ignore_same_log_interval
$log.suppress_repeated_stacktrace = suppress_repeated_stacktrace unless suppress_repeated_stacktrace.nil?

if @path && log_dir_perm
File.chmod(log_dir_perm || Fluent::DEFAULT_DIR_PERMISSION, File.dirname(@path))
Expand Down Expand Up @@ -822,7 +798,8 @@ def configure(supervisor: false)
time_format: @system_config.log.time_format,
log_dir_perm: @system_config.dir_permission,
ignore_repeated_log_interval: @system_config.ignore_repeated_log_interval,
ignore_same_log_interval: @system_config.ignore_same_log_interval
ignore_same_log_interval: @system_config.ignore_same_log_interval,
suppress_repeated_stacktrace: @system_config.suppress_repeated_stacktrace,
)

$log.info :supervisor, 'parsing config file is succeeded', path: @config_path
Expand Down Expand Up @@ -871,9 +848,6 @@ def supervise
'main_cmd' => fluentd_spawn_cmd,
'daemonize' => @daemonize,
'inline_config' => @inline_config,
'log_path' => @log_path,
'log_rotate_age' => @log_rotate_age,
'log_rotate_size' => @log_rotate_size,
'chuser' => @chuser,
'chgroup' => @chgroup,
'use_v1_config' => @use_v1_config,
Expand All @@ -883,14 +857,11 @@ def supervise

'workers' => @system_config.workers,
'root_dir' => @system_config.root_dir,
'logger_initializer' => @log,
'log_level' => @system_config.log_level,
'suppress_repeated_stacktrace' => @system_config.suppress_repeated_stacktrace,
'ignore_repeated_log_interval' => @system_config.ignore_repeated_log_interval,
'rpc_endpoint' => @system_config.rpc_endpoint,
'enable_get_dump' => @system_config.enable_get_dump,
'counter_server' => @system_config.counter_server,
'log_format' => @system_config.log.format,
'log_time_format' => @system_config.log.time_format,
'disable_shared_socket' => @system_config.disable_shared_socket,
'restart_worker_interval' => @system_config.restart_worker_interval,
}
Expand Down
59 changes: 27 additions & 32 deletions test/test_supervisor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -537,16 +537,13 @@ def test_load_config
params = {}
params['workers'] = 1
params['use_v1_config'] = true
params['log_path'] = 'test/tmp/supervisor/log'
params['suppress_repeated_stacktrace'] = true
params['log_level'] = Fluent::Log::LEVEL_INFO
params['conf_encoding'] = 'utf-8'
load_config_proc = Proc.new { Fluent::Supervisor.load_config(tmp_dir, params) }

# first call
se_config = load_config_proc.call
assert_equal Fluent::Log::LEVEL_INFO, se_config[:log_level]
assert_equal true, se_config[:suppress_repeated_stacktrace]
assert_equal 'spawn', se_config[:worker_type]
assert_equal 1, se_config[:workers]
assert_equal false, se_config[:log_stdin]
Expand Down Expand Up @@ -588,33 +585,6 @@ def test_load_config
Timecop.return
end

def test_load_config_for_logger
tmp_dir = "#{@tmp_dir}/dir/test_load_config_log.conf"
conf_info_str = %[
<system>
<log>
format json
time_format %FT%T.%L%z
</log>
</system>
]
write_config tmp_dir, conf_info_str
params = {
'use_v1_config' => true,
'conf_encoding' => 'utf8',
'log_level' => Fluent::Log::LEVEL_INFO,
'log_path' => 'test/tmp/supervisor/log',

'workers' => 1,
'log_format' => :json,
'log_time_format' => '%FT%T.%L%z',
}

r = Fluent::Supervisor.load_config(tmp_dir, params)
assert_equal :json, r[:logger].format
assert_equal '%FT%T.%L%z', r[:logger].time_format
end

def test_load_config_for_daemonize
tmp_dir = "#{@tmp_dir}/dir/test_load_config.conf"
conf_info_str = %[
Expand All @@ -637,7 +607,6 @@ def test_load_config_for_daemonize
params['workers'] = 1
params['use_v1_config'] = true
params['log_path'] = 'test/tmp/supervisor/log'
params['suppress_repeated_stacktrace'] = true
params['log_level'] = Fluent::Log::LEVEL_INFO
params['daemonize'] = './fluentd.pid'
params['conf_encoding'] = 'utf-8'
Expand All @@ -646,7 +615,6 @@ def test_load_config_for_daemonize
# first call
se_config = load_config_proc.call
assert_equal Fluent::Log::LEVEL_INFO, se_config[:log_level]
assert_equal true, se_config[:suppress_repeated_stacktrace]
assert_equal 'spawn', se_config[:worker_type]
assert_equal 1, se_config[:workers]
assert_equal false, se_config[:log_stdin]
Expand Down Expand Up @@ -688,6 +656,33 @@ def test_load_config_for_daemonize
Timecop.return
end

data("supervisor", { supervise: true })
data("worker", { supervise: false })
def test_init_for_logger(data)
tmp_conf_path = "#{@tmp_dir}/dir/test_init_for_logger.conf"
conf_info_str = %[
<system>
suppress_repeated_stacktrace false
ignore_repeated_log_interval 10s
ignore_same_log_interval 20s
<log>
format json
time_format %FT%T.%L%z
</log>
</system>
]
write_config tmp_conf_path, conf_info_str

s = Fluent::Supervisor.new({config_path: tmp_conf_path})
s.configure(supervisor: data[:supervise])

assert_equal :json, $log.format
assert_equal '%FT%T.%L%z', $log.time_format
assert_equal false, $log.suppress_repeated_stacktrace
assert_equal 10, $log.ignore_repeated_log_interval
assert_equal 20, $log.ignore_same_log_interval
end

def test_logger
sv = Fluent::Supervisor.new({})
log = sv.instance_variable_get(:@log)
Expand Down