From f8b73a571954b4a7cfb137cb521edeb370f3f0b5 Mon Sep 17 00:00:00 2001 From: Daijiro Fukuda Date: Tue, 14 Mar 2023 17:28:57 +0900 Subject: [PATCH] Fix logger initialization The logic of initializing logger is so difficult and has caused many bugs. This fix simplifies it and prevents logger from outputting some initial log messages with some settings not applied, such as `format`. This fix consists of the following 2 points. * All logger parameters are now set in the first initialization. * Previously, only parameters related to rotation were applied first, but it is preferable to apply all parameters on that point. * Remove LoggerInitializer * This class was a source of confusion because its role is difficult to understand. TODO: Fix tests. Signed-off-by: Daijiro Fukuda --- lib/fluent/log.rb | 37 +++++- lib/fluent/supervisor.rb | 252 ++++++++++++++++----------------------- 2 files changed, 136 insertions(+), 153 deletions(-) diff --git a/lib/fluent/log.rb b/lib/fluent/log.rb index e574947c3b..f9ecc82b43 100644 --- a/lib/fluent/log.rb +++ b/lib/fluent/log.rb @@ -67,8 +67,29 @@ def self.event_tags LEVEL_TEXT.map{|t| "#{LOG_EVENT_TAG_PREFIX}.#{t}" } end + # Create a unique path for each process. + # + # >>> per_process_path("C:/tmp/test.log", :worker, 1) + # C:/tmp/test-1.log + # >>> per_process_path("C:/tmp/test.log", :supervisor, 0) + # C:/tmp/test-supervisor-0.log + def self.per_process_path(path, process_type, worker_id) + path = Pathname(path) + ext = path.extname + + if process_type == :supervisor + suffix = "-#{process_type}-0#{ext}" # "-0" for backword compatibility. + else + suffix = "-#{worker_id}#{ext}" + end + return path.sub_ext(suffix).to_s + end + def initialize(logger, opts={}) - # overwrites logger.level= so that config reloading resets level of Fluentd::Log + # When ServerEngine changes the logger.level, the Fluentd logger level should also change. + # So overwrites logger.level= below. + # However, currently Fluentd doesn't use the ServerEngine's reloading feature, + # so maybe we don't need this overwriting anymore. orig_logger_level_setter = logger.class.public_instance_method(:level=).bind(logger) me = self # The original ruby logger sets the number as each log level like below. @@ -92,6 +113,7 @@ def initialize(logger, opts={}) # So if serverengine's logger level is changed, fluentd's log level will be changed to that + 1. logger.define_singleton_method(:level=) {|level| orig_logger_level_setter.call(level); me.level = self.level + 1 } + @path = opts[:path] @logger = logger @out = logger.instance_variable_get(:@logdev) @level = logger.level + 1 @@ -102,7 +124,8 @@ def initialize(logger, opts={}) @time_format = nil @formatter = nil - self.format = :text + self.format = opts.fetch(:format, :text) + self.time_format = opts[:time_format] if opts.key?(:time_format) enable_color out.tty? # TODO: This variable name is unclear so we should change to better name. @threads_exclude_events = [] @@ -156,6 +179,10 @@ def dup attr_reader :time_format attr_accessor :log_event_enabled, :ignore_repeated_log_interval, :ignore_same_log_interval, :suppress_repeated_stacktrace attr_accessor :out + # Strictly speaking, we should also change @logger.level when the setter of @level is called. + # Currently, we don't need to do it, since Fluentd::Log doesn't use ServerEngine::DaemonLogger.level. + # Since We overwrites logger.level= so that @logger.level is applied to @level, + # we need to find a good way to do this, otherwise we will end up in an endless loop. attr_accessor :level attr_accessor :optional_header, :optional_attrs @@ -202,8 +229,12 @@ def time_format=(time_fmt) @time_formatter = Strftime.new(@time_format) rescue nil end + def stdout? + @out == $stdout + end + def reopen! - @logger.reopen! if @logger + @out.reopen(@path, "a") if @path && @path != "-" nil end diff --git a/lib/fluent/supervisor.rb b/lib/fluent/supervisor.rb index 043e447f5a..2fa31f161b 100644 --- a/lib/fluent/supervisor.rb +++ b/lib/fluent/supervisor.rb @@ -362,11 +362,11 @@ def dump private def reopen_log - if (log = config[:logger_initializer]) + if $log # Creating new thread due to mutex can't lock # in main thread during trap context Thread.new do - log.reopen! + $log.reopen! end end end @@ -443,7 +443,6 @@ def self.serverengine_config(params = {}) logger: $log, log: $log.out, log_level: params['log_level'], - logger_initializer: params['logger_initializer'], chuser: params['chuser'], chgroup: params['chgroup'], chumask: params['chumask'], @@ -471,104 +470,6 @@ def self.serverengine_config(params = {}) se_config end - class LoggerInitializer - def initialize(path, level, chuser, chgroup, opts, log_rotate_age: nil, log_rotate_size: nil) - @path = path - @level = level - @chuser = chuser - @chgroup = chgroup - @opts = opts - @log_rotate_age = log_rotate_age - @log_rotate_size = log_rotate_size - end - - # Create a unique path for each process. - # - # >>> per_process_path(:worker, 1, "C:/tmp/test.log") - # C:/tmp/test-1.log - # >>> per_process_path(:supervisor, 0, "C:/tmp/test.log") - # C:/tmp/test-supervisor-0.log - def self.per_process_path(path, process_type, worker_id) - path = Pathname(path) - ext = path.extname - - if process_type == :supervisor - suffix = "-#{process_type}-0#{ext}" # "-0" for backword compatibility. - else - suffix = "-#{worker_id}#{ext}" - end - return path.sub_ext(suffix).to_s - end - - def init(process_type, worker_id) - @opts[:process_type] = process_type - @opts[:worker_id] = worker_id - - if @path && @path != "-" - unless File.exist?(@path) - FileUtils.mkdir_p(File.dirname(@path)) - end - - if @log_rotate_age || @log_rotate_size - # We need to prepare a unique path for each worker since - # Windows locks files. - @path = LoggerInitializer.per_process_path(@path, process_type, worker_id) if Fluent.windows? - @logdev = Fluent::LogDeviceIO.new(@path, shift_age: @log_rotate_age, shift_size: @log_rotate_size) - else - @logdev = File.open(@path, "a") - end - - if @chuser || @chgroup - chuid = @chuser ? ServerEngine::Privilege.get_etc_passwd(@chuser).uid : nil - chgid = @chgroup ? ServerEngine::Privilege.get_etc_group(@chgroup).gid : nil - File.chown(chuid, chgid, @path) - end - else - @logdev = STDOUT - end - - dl_opts = {} - # subtract 1 to match serverengine daemon logger side logging severity. - dl_opts[:log_level] = @level - 1 - dl_opts[:log_rotate_age] = @log_rotate_age if @log_rotate_age - dl_opts[:log_rotate_size] = @log_rotate_size if @log_rotate_size - logger = ServerEngine::DaemonLogger.new(@logdev, dl_opts) - $log = Fluent::Log.new(logger, @opts) - $log.enable_color(false) if @path - $log.enable_debug if @level <= Fluent::Log::LEVEL_DEBUG - $log.info "init #{process_type} logger", path: @path, rotate_age: @log_rotate_age, rotate_size: @log_rotate_size - end - - def stdout? - @logdev == STDOUT - end - - def reopen! - if @path && @path != "-" - @logdev.reopen(@path, "a") - end - self - end - - def apply_options(format: nil, time_format: nil, log_dir_perm: nil, - ignore_repeated_log_interval: nil, ignore_same_log_interval: nil, suppress_repeated_stacktrace: nil) - $log.format = format if format - $log.time_format = time_format if time_format - $log.ignore_repeated_log_interval = ignore_repeated_log_interval if ignore_repeated_log_interval - $log.ignore_same_log_interval = ignore_same_log_interval if ignore_same_log_interval - $log.suppress_repeated_stacktrace = suppress_repeated_stacktrace unless suppress_repeated_stacktrace.nil? - - if @path && log_dir_perm - File.chmod(log_dir_perm || Fluent::DEFAULT_DIR_PERMISSION, File.dirname(@path)) - end - end - - def level=(level) - @level = level - $log.level = level - end - end - def self.default_options { config_path: Fluent::DEFAULT_CONFIG_PATH, @@ -625,35 +526,18 @@ def initialize(cl_opt) @chgroup = opt[:chgroup] @chuser = opt[:chuser] @chumask = opt[:chumask] + @signame = opt[:signame] + # TODO: `@log_rotate_age` and `@log_rotate_size` should be removed + # since it should be merged with SystemConfig in `build_system_config()`. + # We should always use `system_config.log.rotate_age` and `system_config.log.rotate_size`. + # However, currently, there is a bug that `system_config.log` parameters + # are not in `Fluent::SystemConfig::SYSTEM_CONFIG_PARAMETERS`, and these + # parameters are not merged in `build_system_config()`. + # Until we fix the bug of `Fluent::SystemConfig`, we need to use these instance variables. @log_rotate_age = opt[:log_rotate_age] @log_rotate_size = opt[:log_rotate_size] - @signame = opt[:signame] - @conf = nil - # parse configuration immediately to initialize logger in early stage - if @config_path and File.exist?(@config_path) - @conf = Fluent::Config.build(config_path: @config_path, - encoding: @conf_encoding ? @conf_encoding : 'utf-8', - additional_config: @inline_config ? @inline_config : nil, - use_v1_config: !!@use_v1_config, - type: @config_file_type, - ) - @system_config = build_system_config(@conf) - if @system_config.log - @log_rotate_age ||= @system_config.log.rotate_age - @log_rotate_size ||= @system_config.log.rotate_size - end - @conf = nil - end - - log_opts = {suppress_repeated_stacktrace: opt[:suppress_repeated_stacktrace], ignore_repeated_log_interval: opt[:ignore_repeated_log_interval], - ignore_same_log_interval: opt[:ignore_same_log_interval]} - @log = LoggerInitializer.new( - @log_path, opt[:log_level], @chuser, @chgroup, log_opts, - log_rotate_age: @log_rotate_age, - log_rotate_size: @log_rotate_size - ) @finished = false end @@ -738,17 +622,7 @@ def run_worker end def configure(supervisor: false) - if supervisor - @log.init(:supervisor, 0) - else - worker_id = ENV['SERVERENGINE_WORKER_ID'].to_i - process_type = case - when @standalone_worker then :standalone - when worker_id == 0 then :worker0 - else :workers - end - @log.init(process_type, worker_id) - end + setup_global_logger(supervisor: supervisor) if @show_plugin_config show_plugin_config @@ -767,16 +641,6 @@ def configure(supervisor: false) ) @system_config = build_system_config(@conf) - @log.level = @system_config.log_level - @log.apply_options( - format: @system_config.log.format, - time_format: @system_config.log.time_format, - log_dir_perm: @system_config.dir_permission, - ignore_repeated_log_interval: @system_config.ignore_repeated_log_interval, - ignore_same_log_interval: @system_config.ignore_same_log_interval, - suppress_repeated_stacktrace: @system_config.suppress_repeated_stacktrace, - ) - $log.info :supervisor, 'parsing config file is succeeded', path: @config_path @libs.each do |lib| @@ -800,6 +664,90 @@ def configure(supervisor: false) private + def setup_global_logger(supervisor: false) + if supervisor + worker_id = 0 + process_type = :supervisor + else + worker_id = ENV['SERVERENGINE_WORKER_ID'].to_i + process_type = case + when @standalone_worker then :standalone + when worker_id == 0 then :worker0 + else :workers + end + end + + # Parse configuration immediately to initialize logger in early stage. + # Since we can't confirm the log messages in this parsing process, + # we must parse the config again after initializing logger. + conf = Fluent::Config.build( + config_path: @config_path, + encoding: @conf_encoding, + additional_config: @inline_config, + use_v1_config: @use_v1_config, + type: @config_file_type, + ) + system_config = build_system_config(conf) + + # TODO: we should remove this logic. This merging process should be done + # in `build_system_config()`. + @log_rotate_age ||= system_config.log.rotate_age + @log_rotate_size ||= system_config.log.rotate_size + + rotate = @log_rotate_age || @log_rotate_size + actual_log_path = @log_path + + # We need to prepare a unique path for each worker since Windows locks files. + if Fluent.windows? && rotate + actual_log_path = Fluent::Log.per_process_path(@log_path, process_type, worker_id) + end + + if actual_log_path && actual_log_path != "-" + FileUtils.mkdir_p(File.dirname(actual_log_path)) unless File.exist?(actual_log_path) + if rotate + logdev = Fluent::LogDeviceIO.new( + actual_log_path, + shift_age: @log_rotate_age, + shift_size: @log_rotate_size, + ) + else + logdev = File.open(actual_log_path, "a") + end + + if @chuser || @chgroup + chuid = @chuser ? ServerEngine::Privilege.get_etc_passwd(@chuser).uid : nil + chgid = @chgroup ? ServerEngine::Privilege.get_etc_group(@chgroup).gid : nil + File.chown(chuid, chgid, actual_log_path) + end + + if system_config.dir_permission + File.chmod(system_config.dir_permission || Fluent::DEFAULT_DIR_PERMISSION, File.dirname(actual_log_path)) + end + else + logdev = STDOUT + end + + $log = Fluent::Log.new( + # log_level: subtract 1 to match serverengine daemon logger side logging severity. + ServerEngine::DaemonLogger.new(logdev, log_level: system_config.log_level - 1), + path: actual_log_path, + process_type: process_type, + worker_id: worker_id, + format: system_config.log.format, + time_format: system_config.log.time_format, + suppress_repeated_stacktrace: system_config.suppress_repeated_stacktrace, + ignore_repeated_log_interval: system_config.ignore_repeated_log_interval, + ignore_same_log_interval: system_config.ignore_same_log_interval, + ) + $log.enable_color(false) if actual_log_path + $log.enable_debug if system_config.log_level <= Fluent::Log::LEVEL_DEBUG + + $log.info "init #{process_type} logger", + path: actual_log_path, + rotate_age: @log_rotate_age, + rotate_size: @log_rotate_size + end + def create_socket_manager socket_manager_path = ServerEngine::SocketManager::Server.generate_path ServerEngine::SocketManager::Server.open(socket_manager_path) @@ -833,7 +781,6 @@ def supervise 'workers' => @system_config.workers, 'root_dir' => @system_config.root_dir, - 'logger_initializer' => @log, 'log_level' => @system_config.log_level, 'rpc_endpoint' => @system_config.rpc_endpoint, 'enable_get_dump' => @system_config.enable_get_dump, @@ -937,7 +884,7 @@ def flush_buffer begin $log.debug "fluentd main process get SIGUSR1" $log.info "force flushing buffered events" - @log.reopen! + $log.reopen! Fluent::Engine.flush! $log.debug "flushing thread: flushed" rescue Exception => e @@ -993,7 +940,7 @@ def dump_windows def logging_with_console_output yield $log - unless @log.stdout? + unless $log.stdout? logger = ServerEngine::DaemonLogger.new(STDOUT) log = Fluent::Log.new(logger) log.level = @system_config.log_level @@ -1050,6 +997,11 @@ def main_process(&block) def build_system_config(conf) system_config = SystemConfig.create(conf, @cl_opt[:strict_config_value]) # Prefer the options explicitly specified in the command line + # + # TODO: There is a bug that `system_config.log.rotate_age/rotate_size` are + # not merged with the command line options since they are not in + # `SYSTEM_CONFIG_PARAMETERS`. + # We have to fix this bug. opt = {} Fluent::SystemConfig::SYSTEM_CONFIG_PARAMETERS.each do |param| if @cl_opt.key?(param) && !@cl_opt[param].nil?