Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

log: Add ignore_repeated_log_interval parameter #2937

Merged
merged 4 commits into from
Apr 10, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 45 additions & 6 deletions lib/fluent/log.rb
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ def initialize(logger, opts={})
@optional_attrs = nil

@suppress_repeated_stacktrace = opts[:suppress_repeated_stacktrace]
@ignore_repeated_log_interval = opts[:ignore_repeated_log_interval]

@process_type = opts[:process_type] # :supervisor, :worker0, :workers Or :standalone
@process_type ||= :standalone # to keep behavior of existing code
Expand Down Expand Up @@ -139,7 +140,8 @@ def dup
dl_opts = {}
dl_opts[:log_level] = @level - 1
logger = ServerEngine::DaemonLogger.new(@out, dl_opts)
clone = self.class.new(logger, suppress_repeated_stacktrace: @suppress_repeated_stacktrace, process_type: @process_type, worker_id: @worker_id)
clone = self.class.new(logger, suppress_repeated_stacktrace: @suppress_repeated_stacktrace, process_type: @process_type,
worker_id: @worker_id, ignore_repeated_log_interval: @ignore_repeated_log_interval)
clone.format = @format
clone.time_format = @time_format
clone.log_event_enabled = @log_event_enabled
Expand All @@ -149,7 +151,7 @@ def dup

attr_reader :format
attr_reader :time_format
attr_accessor :log_event_enabled
attr_accessor :log_event_enabled, :ignore_repeated_log_interval
attr_accessor :out
attr_accessor :level
attr_accessor :optional_header, :optional_attrs
Expand Down Expand Up @@ -278,6 +280,7 @@ def trace(*args, &block)
return if skipped_type?(type)
args << block.call if block
time, msg = event(:trace, args)
return if time.nil?
puts [@color_trace, @formatter.call(type, time, LEVEL_TRACE, msg), @color_reset].join
rescue
# logger should not raise an exception. This rescue prevents unexpected behaviour.
Expand All @@ -299,6 +302,7 @@ def debug(*args, &block)
return if skipped_type?(type)
args << block.call if block
time, msg = event(:debug, args)
return if time.nil?
puts [@color_debug, @formatter.call(type, time, LEVEL_DEBUG, msg), @color_reset].join
rescue
end
Expand All @@ -319,6 +323,7 @@ def info(*args, &block)
return if skipped_type?(type)
args << block.call if block
time, msg = event(:info, args)
return if time.nil?
puts [@color_info, @formatter.call(type, time, LEVEL_INFO, msg), @color_reset].join
rescue
end
Expand All @@ -339,6 +344,7 @@ def warn(*args, &block)
return if skipped_type?(type)
args << block.call if block
time, msg = event(:warn, args)
return if time.nil?
puts [@color_warn, @formatter.call(type, time, LEVEL_WARN, msg), @color_reset].join
rescue
end
Expand All @@ -359,6 +365,7 @@ def error(*args, &block)
return if skipped_type?(type)
args << block.call if block
time, msg = event(:error, args)
return if time.nil?
puts [@color_error, @formatter.call(type, time, LEVEL_ERROR, msg), @color_reset].join
rescue
end
Expand All @@ -379,6 +386,7 @@ def fatal(*args, &block)
return if skipped_type?(type)
args << block.call if block
time, msg = event(:fatal, args)
return if time.nil?
puts [@color_fatal, @formatter.call(type, time, LEVEL_FATAL, msg), @color_reset].join
rescue
end
Expand Down Expand Up @@ -412,20 +420,37 @@ def reset
@out.reset if @out.respond_to?(:reset)
end

CachedLog = Struct.new(:msg, :time)

def ignore_repeated_log?(key, time, message)
cached_log = Thread.current[key]
return false if cached_log.nil?
(cached_log.msg == message) && (time - cached_log.time <= @ignore_repeated_log_interval)
end

def suppress_stacktrace?(backtrace)
cached_log = Thread.current[:last_repeated_stacktrace]
return false if cached_log.nil?
cached_log.msg == backtrace
end

def dump_stacktrace(type, backtrace, level)
return if @level > level

time = Time.now

if @format == :text
line = caller_line(type, time, 5, level)
if @suppress_repeated_stacktrace && (Thread.current[:last_repeated_stacktrace] == backtrace)
if @ignore_repeated_log_interval && ignore_repeated_log?(:last_repeated_stacktrace, time, backtrace)
return
elsif @suppress_repeated_stacktrace && suppress_stacktrace?(backtrace)
puts [" ", line, 'suppressed same stacktrace'].join
Thread.current[:last_repeated_stacktrace] = CachedLog.new(backtrace, time) if @ignore_repeated_log_interval
else
backtrace.each { |msg|
puts [" ", line, msg].join
}
Thread.current[:last_repeated_stacktrace] = backtrace if @suppress_repeated_stacktrace
Thread.current[:last_repeated_stacktrace] = CachedLog.new(backtrace, time) if @suppress_repeated_stacktrace
end
else
r = {
Expand All @@ -436,11 +461,14 @@ def dump_stacktrace(type, backtrace, level)
r['worker_id'] = wid
end

if @suppress_repeated_stacktrace && (Thread.current[:last_repeated_stacktrace] == backtrace)
if @ignore_repeated_log_interval && ignore_repeated_log?(:last_repeated_stacktrace, time, backtrace)
return
elsif @suppress_repeated_stacktrace && suppress_stacktrace?(backtrace)
r['message'] = 'suppressed same stacktrace'
Thread.current[:last_repeated_stacktrace] = CachedLog.new(backtrace, time) if @ignore_repeated_log_interval
else
r['message'] = backtrace.join("\n")
Thread.current[:last_repeated_stacktrace] = backtrace if @suppress_repeated_stacktrace
Thread.current[:last_repeated_stacktrace] = CachedLog.new(backtrace, time) if @suppress_repeated_stacktrace
end

puts Yajl.dump(r)
Expand Down Expand Up @@ -479,6 +507,14 @@ def event(level, args)
end
}

if @ignore_repeated_log_interval
if ignore_repeated_log?(:last_repeated_log, time, message)
return nil
ganmacs marked this conversation as resolved.
Show resolved Hide resolved
else
Thread.current[:last_repeated_log] = CachedLog.new(message, time)
end
end

if @log_event_enabled && !@threads_exclude_events.include?(Thread.current)
record = map.dup
record.keys.each {|key|
Expand Down Expand Up @@ -530,6 +566,9 @@ def initialize(logger)
if logger.instance_variable_defined?(:@suppress_repeated_stacktrace)
@suppress_repeated_stacktrace = logger.instance_variable_get(:@suppress_repeated_stacktrace)
end
if logger.instance_variable_defined?(:@ignore_repeated_log_interval)
@ignore_repeated_log_interval = logger.instance_variable_get(:@ignore_repeated_log_interval)
end

self.format = @logger.format
self.time_format = @logger.time_format
Expand Down
12 changes: 9 additions & 3 deletions lib/fluent/supervisor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -301,14 +301,15 @@ def self.load_config(path, params = {})

log_level = params['log_level']
suppress_repeated_stacktrace = params['suppress_repeated_stacktrace']
ignore_repeated_log_interval = params['ignore_repeated_log_interval']

log_path = params['log_path']
chuser = params['chuser']
chgroup = params['chgroup']
log_rotate_age = params['log_rotate_age']
log_rotate_size = params['log_rotate_size']

log_opts = {suppress_repeated_stacktrace: suppress_repeated_stacktrace}
log_opts = {suppress_repeated_stacktrace: suppress_repeated_stacktrace, ignore_repeated_log_interval: ignore_repeated_log_interval}
logger_initializer = Supervisor::LoggerInitializer.new(
log_path, log_level, chuser, chgroup, log_opts,
log_rotate_age: log_rotate_age,
Expand Down Expand Up @@ -345,6 +346,7 @@ def self.load_config(path, params = {})
chgroup: chgroup,
chumask: 0,
suppress_repeated_stacktrace: suppress_repeated_stacktrace,
ignore_repeated_log_interval: ignore_repeated_log_interval,
daemonize: daemonize,
rpc_endpoint: params['rpc_endpoint'],
counter_server: params['counter_server'],
Expand Down Expand Up @@ -439,9 +441,10 @@ def reopen!
self
end

def apply_options(format: nil, time_format: nil, log_dir_perm: nil)
def apply_options(format: nil, time_format: nil, log_dir_perm: nil, ignore_repeated_log_interval: nil)
$log.format = format if format
$log.time_format = time_format if time_format
$log.ignore_repeated_log_interval = ignore_repeated_log_interval if ignore_repeated_log_interval

if @path && log_dir_perm
File.chmod(log_dir_perm || 0755, File.dirname(@path))
Expand All @@ -468,6 +471,7 @@ def self.default_options
root_dir: nil,
suppress_interval: 0,
suppress_repeated_stacktrace: true,
ignore_repeated_log_interval: nil,
without_source: nil,
use_v1_config: true,
strict_config_value: nil,
Expand Down Expand Up @@ -507,7 +511,7 @@ def initialize(opt)
@cl_opt = opt
@conf = nil

log_opts = { suppress_repeated_stacktrace: opt[:suppress_repeated_stacktrace] }
log_opts = {suppress_repeated_stacktrace: opt[:suppress_repeated_stacktrace], ignore_repeated_log_interval: opt[:ignore_repeated_log_interval]}
@log = LoggerInitializer.new(
@log_path, opt[:log_level], @chuser, @chgroup, log_opts,
log_rotate_age: @log_rotate_age,
Expand Down Expand Up @@ -628,6 +632,7 @@ def configure(supervisor: false)
format: @system_config.log.format,
time_format: @system_config.log.time_format,
log_dir_perm: @system_config.dir_permission,
ignore_repeated_log_interval: @system_config.ignore_repeated_log_interval
)

$log.info :supervisor, 'parsing config file is succeeded', path: @config_path
Expand Down Expand Up @@ -690,6 +695,7 @@ def supervise
'root_dir' => @system_config.root_dir,
'log_level' => @system_config.log_level,
'suppress_repeated_stacktrace' => @system_config.suppress_repeated_stacktrace,
'ignore_repeated_log_interval' => @system_config.ignore_repeated_log_interval,
'rpc_endpoint' => @system_config.rpc_endpoint,
'enable_get_dump' => @system_config.enable_get_dump,
'counter_server' => @system_config.counter_server,
Expand Down
3 changes: 2 additions & 1 deletion lib/fluent/system_config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class SystemConfig
SYSTEM_CONFIG_PARAMETERS = [
:workers, :root_dir, :log_level,
:suppress_repeated_stacktrace, :emit_error_log_interval, :suppress_config_dump,
:log_event_verbose,
:log_event_verbose, :ignore_repeated_log_interval,
:without_source, :rpc_endpoint, :enable_get_dump, :process_name,
:file_permission, :dir_permission, :counter_server, :counter_client,
:strict_config_value, :enable_msgpack_time_support
Expand All @@ -34,6 +34,7 @@ class SystemConfig
config_param :root_dir, :string, default: nil
config_param :log_level, :enum, list: [:trace, :debug, :info, :warn, :error, :fatal], default: 'info'
config_param :suppress_repeated_stacktrace, :bool, default: nil
config_param :ignore_repeated_log_interval, :time, default: nil
config_param :emit_error_log_interval, :time, default: nil
config_param :suppress_config_dump, :bool, default: nil
config_param :log_event_verbose, :bool, default: nil
Expand Down
2 changes: 2 additions & 0 deletions test/config/test_system_config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ def parse_text(text)
assert_nil(sc.root_dir)
assert_equal(Fluent::Log::LEVEL_INFO, sc.log_level)
assert_nil(sc.suppress_repeated_stacktrace)
assert_nil(sc.ignore_repeated_log_interval)
assert_nil(sc.emit_error_log_interval)
assert_nil(sc.suppress_config_dump)
assert_nil(sc.without_source)
Expand All @@ -86,6 +87,7 @@ def parse_text(text)
'root_dir' => ['root_dir', File.join(TMP_DIR, 'root')],
'log_level' => ['log_level', 'error'],
'suppress_repeated_stacktrace' => ['suppress_repeated_stacktrace', true],
'ignore_repeated_log_interval' => ['ignore_repeated_log_interval', 10],
'log_event_verbose' => ['log_event_verbose', true],
'suppress_config_dump' => ['suppress_config_dump', true],
'without_source' => ['without_source', true],
Expand Down
46 changes: 46 additions & 0 deletions test/test_log.rb
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,52 @@ def test_different_log_level
end
end

sub_test_case "ignore_repeated_log_interval" do
def test_same_message
message = "This is test"
logger = ServerEngine::DaemonLogger.new(@log_device, {log_level: ServerEngine::DaemonLogger::INFO})
log = Fluent::Log.new(logger, {ignore_repeated_log_interval: 5})

log.error message
10.times { |i|
Timecop.return
ganmacs marked this conversation as resolved.
Show resolved Hide resolved
Timecop.freeze(@timestamp + i)
log.error message
}

expected = [
"2016-04-21 02:58:41 +0000 [error]: This is test\n",
"2016-04-21 02:58:47 +0000 [error]: This is test\n"
]
assert_equal(expected, log.out.logs)
end

def test_different_message
message = "This is test"
logger = ServerEngine::DaemonLogger.new(@log_device, {log_level: ServerEngine::DaemonLogger::INFO})
log = Fluent::Log.new(logger, {ignore_repeated_log_interval: 10})

log.error message
3.times { |i|
Timecop.return
Timecop.freeze(@timestamp + i)
log.error message
log.error message
log.info "Hello! " + message
}

expected = [
"2016-04-21 02:58:41 +0000 [error]: This is test\n",
"2016-04-21 02:58:41 +0000 [info]: Hello! This is test\n",
"2016-04-21 02:58:42 +0000 [error]: This is test\n",
"2016-04-21 02:58:42 +0000 [info]: Hello! This is test\n",
"2016-04-21 02:58:43 +0000 [error]: This is test\n",
"2016-04-21 02:58:43 +0000 [info]: Hello! This is test\n",
]
assert_equal(expected, log.out.logs)
end
end

def test_dup
dl_opts = {}
dl_opts[:log_level] = ServerEngine::DaemonLogger::TRACE
Expand Down