Skip to content

Commit

Permalink
Improvement @error and logging handling
Browse files Browse the repository at this point in the history
  • Loading branch information
repeatedly committed Aug 29, 2014
1 parent 62e3db2 commit ed93293
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 11 deletions.
2 changes: 1 addition & 1 deletion lib/fluent/engine.rb
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ def log_event_loop

events.each {|tag,time,record|
begin
Engine.emit(tag, time, record)
@event_router.emit(tag, time, record)
rescue => e
$log.error "failed to emit fluentd's log event", :tag => tag, :event => record, :error_class => e.class, :error => e
end
Expand Down
41 changes: 31 additions & 10 deletions lib/fluent/root_agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ module Fluent

require 'fluent/agent'
require 'fluent/label'
#require 'fluentd/collectors/null_collector'
#require 'fluentd/collectors/no_match_notice_collector'

#
# Fluentd forms a tree structure to manage plugins:
Expand Down Expand Up @@ -90,15 +88,15 @@ def configure(conf)
}
end

setup_error_label(error_label_config)
setup_error_label(error_label_config) if error_label_config
end

def setup_error_label(error_label_config)
def setup_error_label(e)
# initialize built-in ERROR label
if error_label_config
error_label = add_label(ERROR_LABEL, error_label_config)
@error_collector = error_label.event_router
end
error_label = add_label(ERROR_LABEL, e)
error_label.configure(e)
error_label.root_agent = RootAgentProxyWithoutErrorCollector.new(self)
@error_collector = error_label.event_router
end

def start
Expand Down Expand Up @@ -169,11 +167,10 @@ def find_label(label_name)

def handle_emits_error(tag, es, e)
if @error_collector
#@error_collector.emit_stream("error.#{tag}", e)
@error_collector.emit_stream(tag, es)
else
now = Engine.now
if @suppress_emit_error_log_interval == 0 || now > @next_emit_error_log_time
if @suppress_emit_error_log_interval.zero? || now > @next_emit_error_log_time
log.warn "emit transaction failed ", :error_class => e.class, :error => e
log.warn_backtrace
# log.debug "current next_emit_error_log_time: #{Time.at(@next_emit_error_log_time)}"
Expand All @@ -184,5 +181,29 @@ def handle_emits_error(tag, es, e)
raise e
end
end

# <label @ERROR> element use RootAgent wrapped by # this RootAgentProxyWithoutErrorCollector.
# So that those elements don't send cause infinite loop.
class RootAgentProxyWithoutErrorCollector < SimpleDelegator
def initialize(root_agent)
super

@suppress_emit_error_log_interval = 0
@next_emit_error_log_time = nil

interval_time = root_agent.instance_variable_get(:@suppress_emit_error_log_interval)
suppress_interval(interval_time) unless interval_time.zero?
end

def handle_emits_error(tag, es, e)
now = Engine.now
if @suppress_emit_error_log_interval.zero? || now > @next_emit_error_log_time
log.warn "emit transaction failed ", :error_class => e.class, :error => e
log.warn_backtrace
@next_emit_error_log_time = now + @suppress_emit_error_log_interval
end
raise e
end
end
end
end

0 comments on commit ed93293

Please sign in to comment.