Skip to content

Commit

Permalink
Merge pull request fluent#4065 from daipom/refactor-logger-initializa…
Browse files Browse the repository at this point in the history
…tion

Refactor logger initialization
  • Loading branch information
ashie authored Mar 9, 2023
2 parents 8812ec1 + 6b74fa3 commit 7b0e0cd
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 80 deletions.
2 changes: 1 addition & 1 deletion lib/fluent/log.rb
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ def dup

attr_reader :format
attr_reader :time_format
attr_accessor :log_event_enabled, :ignore_repeated_log_interval, :ignore_same_log_interval
attr_accessor :log_event_enabled, :ignore_repeated_log_interval, :ignore_same_log_interval, :suppress_repeated_stacktrace
attr_accessor :out
attr_accessor :level
attr_accessor :optional_header, :optional_attrs
Expand Down
65 changes: 18 additions & 47 deletions lib/fluent/supervisor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,10 @@ def dump
end

class Supervisor
# For ServerEngine's `reload_config`.
# This is called only at the initilization of the supervisor process,
# since Fluentd overwrites all related SIGNAL(HUP,USR1,USR2) and have own
# reloading feature.
def self.load_config(path, params = {})
pre_loadtime = 0
pre_loadtime = params['pre_loadtime'].to_i if params['pre_loadtime']
Expand All @@ -436,32 +440,6 @@ def self.load_config(path, params = {})
return params['pre_conf']
end

log_level = params['log_level']
suppress_repeated_stacktrace = params['suppress_repeated_stacktrace']
ignore_repeated_log_interval = params['ignore_repeated_log_interval']
ignore_same_log_interval = params['ignore_same_log_interval']

log_path = params['log_path']
chuser = params['chuser']
chgroup = params['chgroup']
chumask = params['chumask']
log_rotate_age = params['log_rotate_age']
log_rotate_size = params['log_rotate_size']

log_opts = {suppress_repeated_stacktrace: suppress_repeated_stacktrace, ignore_repeated_log_interval: ignore_repeated_log_interval,
ignore_same_log_interval: ignore_same_log_interval}
logger_initializer = Supervisor::LoggerInitializer.new(
log_path, log_level, chuser, chgroup, log_opts,
log_rotate_age: log_rotate_age,
log_rotate_size: log_rotate_size
)
# this #init sets initialized logger to $log
logger_initializer.init(:supervisor, 0)
logger_initializer.apply_options(format: params['log_format'], time_format: params['log_time_format'])
logger = $log

command_sender = Fluent.windows? ? "pipe" : "signal"

# ServerEngine's "daemonize" option is boolean, and path of pid file is brought by "pid_path"
pid_path = params['daemonize']
daemonize = !!params['daemonize']
Expand All @@ -477,17 +455,13 @@ def self.load_config(path, params = {})
unrecoverable_exit_codes: [2],
stop_immediately_at_unrecoverable_exit: true,
root_dir: params['root_dir'],
logger: logger,
log: logger.out,
log_path: log_path,
log_level: log_level,
logger_initializer: logger_initializer,
chuser: chuser,
chgroup: chgroup,
chumask: chumask,
suppress_repeated_stacktrace: suppress_repeated_stacktrace,
ignore_repeated_log_interval: ignore_repeated_log_interval,
ignore_same_log_interval: ignore_same_log_interval,
logger: $log,
log: $log.out,
log_level: params['log_level'],
logger_initializer: params['logger_initializer'],
chuser: params['chuser'],
chgroup: params['chgroup'],
chumask: params['chumask'],
daemonize: daemonize,
rpc_endpoint: params['rpc_endpoint'],
counter_server: params['counter_server'],
Expand All @@ -498,7 +472,7 @@ def self.load_config(path, params = {})
WorkerModule.name,
path,
JSON.dump(params)],
command_sender: command_sender,
command_sender: Fluent.windows? ? "pipe" : "signal",
fluentd_conf: params['fluentd_conf'],
conf_encoding: params['conf_encoding'],
inline_config: params['inline_config'],
Expand Down Expand Up @@ -601,11 +575,13 @@ def reopen!
self
end

def apply_options(format: nil, time_format: nil, log_dir_perm: nil, ignore_repeated_log_interval: nil, ignore_same_log_interval: nil)
def apply_options(format: nil, time_format: nil, log_dir_perm: nil,
ignore_repeated_log_interval: nil, ignore_same_log_interval: nil, suppress_repeated_stacktrace: nil)
$log.format = format if format
$log.time_format = time_format if time_format
$log.ignore_repeated_log_interval = ignore_repeated_log_interval if ignore_repeated_log_interval
$log.ignore_same_log_interval = ignore_same_log_interval if ignore_same_log_interval
$log.suppress_repeated_stacktrace = suppress_repeated_stacktrace unless suppress_repeated_stacktrace.nil?

if @path && log_dir_perm
File.chmod(log_dir_perm || Fluent::DEFAULT_DIR_PERMISSION, File.dirname(@path))
Expand Down Expand Up @@ -822,7 +798,8 @@ def configure(supervisor: false)
time_format: @system_config.log.time_format,
log_dir_perm: @system_config.dir_permission,
ignore_repeated_log_interval: @system_config.ignore_repeated_log_interval,
ignore_same_log_interval: @system_config.ignore_same_log_interval
ignore_same_log_interval: @system_config.ignore_same_log_interval,
suppress_repeated_stacktrace: @system_config.suppress_repeated_stacktrace,
)

$log.info :supervisor, 'parsing config file is succeeded', path: @config_path
Expand Down Expand Up @@ -871,9 +848,6 @@ def supervise
'main_cmd' => fluentd_spawn_cmd,
'daemonize' => @daemonize,
'inline_config' => @inline_config,
'log_path' => @log_path,
'log_rotate_age' => @log_rotate_age,
'log_rotate_size' => @log_rotate_size,
'chuser' => @chuser,
'chgroup' => @chgroup,
'use_v1_config' => @use_v1_config,
Expand All @@ -883,14 +857,11 @@ def supervise

'workers' => @system_config.workers,
'root_dir' => @system_config.root_dir,
'logger_initializer' => @log,
'log_level' => @system_config.log_level,
'suppress_repeated_stacktrace' => @system_config.suppress_repeated_stacktrace,
'ignore_repeated_log_interval' => @system_config.ignore_repeated_log_interval,
'rpc_endpoint' => @system_config.rpc_endpoint,
'enable_get_dump' => @system_config.enable_get_dump,
'counter_server' => @system_config.counter_server,
'log_format' => @system_config.log.format,
'log_time_format' => @system_config.log.time_format,
'disable_shared_socket' => @system_config.disable_shared_socket,
'restart_worker_interval' => @system_config.restart_worker_interval,
}
Expand Down
59 changes: 27 additions & 32 deletions test/test_supervisor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -537,16 +537,13 @@ def test_load_config
params = {}
params['workers'] = 1
params['use_v1_config'] = true
params['log_path'] = 'test/tmp/supervisor/log'
params['suppress_repeated_stacktrace'] = true
params['log_level'] = Fluent::Log::LEVEL_INFO
params['conf_encoding'] = 'utf-8'
load_config_proc = Proc.new { Fluent::Supervisor.load_config(tmp_dir, params) }

# first call
se_config = load_config_proc.call
assert_equal Fluent::Log::LEVEL_INFO, se_config[:log_level]
assert_equal true, se_config[:suppress_repeated_stacktrace]
assert_equal 'spawn', se_config[:worker_type]
assert_equal 1, se_config[:workers]
assert_equal false, se_config[:log_stdin]
Expand Down Expand Up @@ -588,33 +585,6 @@ def test_load_config
Timecop.return
end

def test_load_config_for_logger
tmp_dir = "#{@tmp_dir}/dir/test_load_config_log.conf"
conf_info_str = %[
<system>
<log>
format json
time_format %FT%T.%L%z
</log>
</system>
]
write_config tmp_dir, conf_info_str
params = {
'use_v1_config' => true,
'conf_encoding' => 'utf8',
'log_level' => Fluent::Log::LEVEL_INFO,
'log_path' => 'test/tmp/supervisor/log',

'workers' => 1,
'log_format' => :json,
'log_time_format' => '%FT%T.%L%z',
}

r = Fluent::Supervisor.load_config(tmp_dir, params)
assert_equal :json, r[:logger].format
assert_equal '%FT%T.%L%z', r[:logger].time_format
end

def test_load_config_for_daemonize
tmp_dir = "#{@tmp_dir}/dir/test_load_config.conf"
conf_info_str = %[
Expand All @@ -637,7 +607,6 @@ def test_load_config_for_daemonize
params['workers'] = 1
params['use_v1_config'] = true
params['log_path'] = 'test/tmp/supervisor/log'
params['suppress_repeated_stacktrace'] = true
params['log_level'] = Fluent::Log::LEVEL_INFO
params['daemonize'] = './fluentd.pid'
params['conf_encoding'] = 'utf-8'
Expand All @@ -646,7 +615,6 @@ def test_load_config_for_daemonize
# first call
se_config = load_config_proc.call
assert_equal Fluent::Log::LEVEL_INFO, se_config[:log_level]
assert_equal true, se_config[:suppress_repeated_stacktrace]
assert_equal 'spawn', se_config[:worker_type]
assert_equal 1, se_config[:workers]
assert_equal false, se_config[:log_stdin]
Expand Down Expand Up @@ -688,6 +656,33 @@ def test_load_config_for_daemonize
Timecop.return
end

data("supervisor", { supervise: true })
data("worker", { supervise: false })
def test_init_for_logger(data)
tmp_conf_path = "#{@tmp_dir}/dir/test_init_for_logger.conf"
conf_info_str = %[
<system>
suppress_repeated_stacktrace false
ignore_repeated_log_interval 10s
ignore_same_log_interval 20s
<log>
format json
time_format %FT%T.%L%z
</log>
</system>
]
write_config tmp_conf_path, conf_info_str

s = Fluent::Supervisor.new({config_path: tmp_conf_path})
s.configure(supervisor: data[:supervise])

assert_equal :json, $log.format
assert_equal '%FT%T.%L%z', $log.time_format
assert_equal false, $log.suppress_repeated_stacktrace
assert_equal 10, $log.ignore_repeated_log_interval
assert_equal 20, $log.ignore_same_log_interval
end

def test_logger
sv = Fluent::Supervisor.new({})
log = sv.instance_variable_get(:@log)
Expand Down

0 comments on commit 7b0e0cd

Please sign in to comment.