diff --git a/lib/fluent/log.rb b/lib/fluent/log.rb index e574947c3b..f9ecc82b43 100644 --- a/lib/fluent/log.rb +++ b/lib/fluent/log.rb @@ -67,8 +67,29 @@ def self.event_tags LEVEL_TEXT.map{|t| "#{LOG_EVENT_TAG_PREFIX}.#{t}" } end + # Create a unique path for each process. + # + # >>> per_process_path("C:/tmp/test.log", :worker, 1) + # C:/tmp/test-1.log + # >>> per_process_path("C:/tmp/test.log", :supervisor, 0) + # C:/tmp/test-supervisor-0.log + def self.per_process_path(path, process_type, worker_id) + path = Pathname(path) + ext = path.extname + + if process_type == :supervisor + suffix = "-#{process_type}-0#{ext}" # "-0" for backword compatibility. + else + suffix = "-#{worker_id}#{ext}" + end + return path.sub_ext(suffix).to_s + end + def initialize(logger, opts={}) - # overwrites logger.level= so that config reloading resets level of Fluentd::Log + # When ServerEngine changes the logger.level, the Fluentd logger level should also change. + # So overwrites logger.level= below. + # However, currently Fluentd doesn't use the ServerEngine's reloading feature, + # so maybe we don't need this overwriting anymore. orig_logger_level_setter = logger.class.public_instance_method(:level=).bind(logger) me = self # The original ruby logger sets the number as each log level like below. @@ -92,6 +113,7 @@ def initialize(logger, opts={}) # So if serverengine's logger level is changed, fluentd's log level will be changed to that + 1. logger.define_singleton_method(:level=) {|level| orig_logger_level_setter.call(level); me.level = self.level + 1 } + @path = opts[:path] @logger = logger @out = logger.instance_variable_get(:@logdev) @level = logger.level + 1 @@ -102,7 +124,8 @@ def initialize(logger, opts={}) @time_format = nil @formatter = nil - self.format = :text + self.format = opts.fetch(:format, :text) + self.time_format = opts[:time_format] if opts.key?(:time_format) enable_color out.tty? # TODO: This variable name is unclear so we should change to better name. @threads_exclude_events = [] @@ -156,6 +179,10 @@ def dup attr_reader :time_format attr_accessor :log_event_enabled, :ignore_repeated_log_interval, :ignore_same_log_interval, :suppress_repeated_stacktrace attr_accessor :out + # Strictly speaking, we should also change @logger.level when the setter of @level is called. + # Currently, we don't need to do it, since Fluentd::Log doesn't use ServerEngine::DaemonLogger.level. + # Since We overwrites logger.level= so that @logger.level is applied to @level, + # we need to find a good way to do this, otherwise we will end up in an endless loop. attr_accessor :level attr_accessor :optional_header, :optional_attrs @@ -202,8 +229,12 @@ def time_format=(time_fmt) @time_formatter = Strftime.new(@time_format) rescue nil end + def stdout? + @out == $stdout + end + def reopen! - @logger.reopen! if @logger + @out.reopen(@path, "a") if @path && @path != "-" nil end diff --git a/lib/fluent/supervisor.rb b/lib/fluent/supervisor.rb index 043e447f5a..2fa31f161b 100644 --- a/lib/fluent/supervisor.rb +++ b/lib/fluent/supervisor.rb @@ -362,11 +362,11 @@ def dump private def reopen_log - if (log = config[:logger_initializer]) + if $log # Creating new thread due to mutex can't lock # in main thread during trap context Thread.new do - log.reopen! + $log.reopen! end end end @@ -443,7 +443,6 @@ def self.serverengine_config(params = {}) logger: $log, log: $log.out, log_level: params['log_level'], - logger_initializer: params['logger_initializer'], chuser: params['chuser'], chgroup: params['chgroup'], chumask: params['chumask'], @@ -471,104 +470,6 @@ def self.serverengine_config(params = {}) se_config end - class LoggerInitializer - def initialize(path, level, chuser, chgroup, opts, log_rotate_age: nil, log_rotate_size: nil) - @path = path - @level = level - @chuser = chuser - @chgroup = chgroup - @opts = opts - @log_rotate_age = log_rotate_age - @log_rotate_size = log_rotate_size - end - - # Create a unique path for each process. - # - # >>> per_process_path(:worker, 1, "C:/tmp/test.log") - # C:/tmp/test-1.log - # >>> per_process_path(:supervisor, 0, "C:/tmp/test.log") - # C:/tmp/test-supervisor-0.log - def self.per_process_path(path, process_type, worker_id) - path = Pathname(path) - ext = path.extname - - if process_type == :supervisor - suffix = "-#{process_type}-0#{ext}" # "-0" for backword compatibility. - else - suffix = "-#{worker_id}#{ext}" - end - return path.sub_ext(suffix).to_s - end - - def init(process_type, worker_id) - @opts[:process_type] = process_type - @opts[:worker_id] = worker_id - - if @path && @path != "-" - unless File.exist?(@path) - FileUtils.mkdir_p(File.dirname(@path)) - end - - if @log_rotate_age || @log_rotate_size - # We need to prepare a unique path for each worker since - # Windows locks files. - @path = LoggerInitializer.per_process_path(@path, process_type, worker_id) if Fluent.windows? - @logdev = Fluent::LogDeviceIO.new(@path, shift_age: @log_rotate_age, shift_size: @log_rotate_size) - else - @logdev = File.open(@path, "a") - end - - if @chuser || @chgroup - chuid = @chuser ? ServerEngine::Privilege.get_etc_passwd(@chuser).uid : nil - chgid = @chgroup ? ServerEngine::Privilege.get_etc_group(@chgroup).gid : nil - File.chown(chuid, chgid, @path) - end - else - @logdev = STDOUT - end - - dl_opts = {} - # subtract 1 to match serverengine daemon logger side logging severity. - dl_opts[:log_level] = @level - 1 - dl_opts[:log_rotate_age] = @log_rotate_age if @log_rotate_age - dl_opts[:log_rotate_size] = @log_rotate_size if @log_rotate_size - logger = ServerEngine::DaemonLogger.new(@logdev, dl_opts) - $log = Fluent::Log.new(logger, @opts) - $log.enable_color(false) if @path - $log.enable_debug if @level <= Fluent::Log::LEVEL_DEBUG - $log.info "init #{process_type} logger", path: @path, rotate_age: @log_rotate_age, rotate_size: @log_rotate_size - end - - def stdout? - @logdev == STDOUT - end - - def reopen! - if @path && @path != "-" - @logdev.reopen(@path, "a") - end - self - end - - def apply_options(format: nil, time_format: nil, log_dir_perm: nil, - ignore_repeated_log_interval: nil, ignore_same_log_interval: nil, suppress_repeated_stacktrace: nil) - $log.format = format if format - $log.time_format = time_format if time_format - $log.ignore_repeated_log_interval = ignore_repeated_log_interval if ignore_repeated_log_interval - $log.ignore_same_log_interval = ignore_same_log_interval if ignore_same_log_interval - $log.suppress_repeated_stacktrace = suppress_repeated_stacktrace unless suppress_repeated_stacktrace.nil? - - if @path && log_dir_perm - File.chmod(log_dir_perm || Fluent::DEFAULT_DIR_PERMISSION, File.dirname(@path)) - end - end - - def level=(level) - @level = level - $log.level = level - end - end - def self.default_options { config_path: Fluent::DEFAULT_CONFIG_PATH, @@ -625,35 +526,18 @@ def initialize(cl_opt) @chgroup = opt[:chgroup] @chuser = opt[:chuser] @chumask = opt[:chumask] + @signame = opt[:signame] + # TODO: `@log_rotate_age` and `@log_rotate_size` should be removed + # since it should be merged with SystemConfig in `build_system_config()`. + # We should always use `system_config.log.rotate_age` and `system_config.log.rotate_size`. + # However, currently, there is a bug that `system_config.log` parameters + # are not in `Fluent::SystemConfig::SYSTEM_CONFIG_PARAMETERS`, and these + # parameters are not merged in `build_system_config()`. + # Until we fix the bug of `Fluent::SystemConfig`, we need to use these instance variables. @log_rotate_age = opt[:log_rotate_age] @log_rotate_size = opt[:log_rotate_size] - @signame = opt[:signame] - @conf = nil - # parse configuration immediately to initialize logger in early stage - if @config_path and File.exist?(@config_path) - @conf = Fluent::Config.build(config_path: @config_path, - encoding: @conf_encoding ? @conf_encoding : 'utf-8', - additional_config: @inline_config ? @inline_config : nil, - use_v1_config: !!@use_v1_config, - type: @config_file_type, - ) - @system_config = build_system_config(@conf) - if @system_config.log - @log_rotate_age ||= @system_config.log.rotate_age - @log_rotate_size ||= @system_config.log.rotate_size - end - @conf = nil - end - - log_opts = {suppress_repeated_stacktrace: opt[:suppress_repeated_stacktrace], ignore_repeated_log_interval: opt[:ignore_repeated_log_interval], - ignore_same_log_interval: opt[:ignore_same_log_interval]} - @log = LoggerInitializer.new( - @log_path, opt[:log_level], @chuser, @chgroup, log_opts, - log_rotate_age: @log_rotate_age, - log_rotate_size: @log_rotate_size - ) @finished = false end @@ -738,17 +622,7 @@ def run_worker end def configure(supervisor: false) - if supervisor - @log.init(:supervisor, 0) - else - worker_id = ENV['SERVERENGINE_WORKER_ID'].to_i - process_type = case - when @standalone_worker then :standalone - when worker_id == 0 then :worker0 - else :workers - end - @log.init(process_type, worker_id) - end + setup_global_logger(supervisor: supervisor) if @show_plugin_config show_plugin_config @@ -767,16 +641,6 @@ def configure(supervisor: false) ) @system_config = build_system_config(@conf) - @log.level = @system_config.log_level - @log.apply_options( - format: @system_config.log.format, - time_format: @system_config.log.time_format, - log_dir_perm: @system_config.dir_permission, - ignore_repeated_log_interval: @system_config.ignore_repeated_log_interval, - ignore_same_log_interval: @system_config.ignore_same_log_interval, - suppress_repeated_stacktrace: @system_config.suppress_repeated_stacktrace, - ) - $log.info :supervisor, 'parsing config file is succeeded', path: @config_path @libs.each do |lib| @@ -800,6 +664,90 @@ def configure(supervisor: false) private + def setup_global_logger(supervisor: false) + if supervisor + worker_id = 0 + process_type = :supervisor + else + worker_id = ENV['SERVERENGINE_WORKER_ID'].to_i + process_type = case + when @standalone_worker then :standalone + when worker_id == 0 then :worker0 + else :workers + end + end + + # Parse configuration immediately to initialize logger in early stage. + # Since we can't confirm the log messages in this parsing process, + # we must parse the config again after initializing logger. + conf = Fluent::Config.build( + config_path: @config_path, + encoding: @conf_encoding, + additional_config: @inline_config, + use_v1_config: @use_v1_config, + type: @config_file_type, + ) + system_config = build_system_config(conf) + + # TODO: we should remove this logic. This merging process should be done + # in `build_system_config()`. + @log_rotate_age ||= system_config.log.rotate_age + @log_rotate_size ||= system_config.log.rotate_size + + rotate = @log_rotate_age || @log_rotate_size + actual_log_path = @log_path + + # We need to prepare a unique path for each worker since Windows locks files. + if Fluent.windows? && rotate + actual_log_path = Fluent::Log.per_process_path(@log_path, process_type, worker_id) + end + + if actual_log_path && actual_log_path != "-" + FileUtils.mkdir_p(File.dirname(actual_log_path)) unless File.exist?(actual_log_path) + if rotate + logdev = Fluent::LogDeviceIO.new( + actual_log_path, + shift_age: @log_rotate_age, + shift_size: @log_rotate_size, + ) + else + logdev = File.open(actual_log_path, "a") + end + + if @chuser || @chgroup + chuid = @chuser ? ServerEngine::Privilege.get_etc_passwd(@chuser).uid : nil + chgid = @chgroup ? ServerEngine::Privilege.get_etc_group(@chgroup).gid : nil + File.chown(chuid, chgid, actual_log_path) + end + + if system_config.dir_permission + File.chmod(system_config.dir_permission || Fluent::DEFAULT_DIR_PERMISSION, File.dirname(actual_log_path)) + end + else + logdev = STDOUT + end + + $log = Fluent::Log.new( + # log_level: subtract 1 to match serverengine daemon logger side logging severity. + ServerEngine::DaemonLogger.new(logdev, log_level: system_config.log_level - 1), + path: actual_log_path, + process_type: process_type, + worker_id: worker_id, + format: system_config.log.format, + time_format: system_config.log.time_format, + suppress_repeated_stacktrace: system_config.suppress_repeated_stacktrace, + ignore_repeated_log_interval: system_config.ignore_repeated_log_interval, + ignore_same_log_interval: system_config.ignore_same_log_interval, + ) + $log.enable_color(false) if actual_log_path + $log.enable_debug if system_config.log_level <= Fluent::Log::LEVEL_DEBUG + + $log.info "init #{process_type} logger", + path: actual_log_path, + rotate_age: @log_rotate_age, + rotate_size: @log_rotate_size + end + def create_socket_manager socket_manager_path = ServerEngine::SocketManager::Server.generate_path ServerEngine::SocketManager::Server.open(socket_manager_path) @@ -833,7 +781,6 @@ def supervise 'workers' => @system_config.workers, 'root_dir' => @system_config.root_dir, - 'logger_initializer' => @log, 'log_level' => @system_config.log_level, 'rpc_endpoint' => @system_config.rpc_endpoint, 'enable_get_dump' => @system_config.enable_get_dump, @@ -937,7 +884,7 @@ def flush_buffer begin $log.debug "fluentd main process get SIGUSR1" $log.info "force flushing buffered events" - @log.reopen! + $log.reopen! Fluent::Engine.flush! $log.debug "flushing thread: flushed" rescue Exception => e @@ -993,7 +940,7 @@ def dump_windows def logging_with_console_output yield $log - unless @log.stdout? + unless $log.stdout? logger = ServerEngine::DaemonLogger.new(STDOUT) log = Fluent::Log.new(logger) log.level = @system_config.log_level @@ -1050,6 +997,11 @@ def main_process(&block) def build_system_config(conf) system_config = SystemConfig.create(conf, @cl_opt[:strict_config_value]) # Prefer the options explicitly specified in the command line + # + # TODO: There is a bug that `system_config.log.rotate_age/rotate_size` are + # not merged with the command line options since they are not in + # `SYSTEM_CONFIG_PARAMETERS`. + # We have to fix this bug. opt = {} Fluent::SystemConfig::SYSTEM_CONFIG_PARAMETERS.each do |param| if @cl_opt.key?(param) && !@cl_opt[param].nil?