Skip to content

Commit

Permalink
Merge pull request #3631 from daipom/issue-3465
Browse files Browse the repository at this point in the history
Apply modifications in pipeline to the records being passed to `@ERROR` label
  • Loading branch information
ashie authored Feb 16, 2022
2 parents e890533 + bfd84fc commit 847bb39
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 1 deletion.
20 changes: 19 additions & 1 deletion lib/fluent/event_router.rb
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@ def emit_stream(tag, es)
if callback = find_callback
callback.call(es)
end
rescue Pipeline::OutputError => e
@emit_error_handler.handle_emits_error(tag, e.processed_es, e.internal_error)
rescue => e
@emit_error_handler.handle_emits_error(tag, es, e)
end
Expand Down Expand Up @@ -161,6 +163,17 @@ def get(key)
private

class Pipeline

class OutputError < StandardError
attr_reader :internal_error
attr_reader :processed_es

def initialize(internal_error, processed_es)
@internal_error = internal_error
@processed_es = processed_es
end
end

def initialize
@filters = []
@output = nil
Expand All @@ -178,7 +191,12 @@ def set_output(output)

def emit_events(tag, es)
processed = @optimizer.filter_stream(tag, es)
@output.emit_events(tag, processed)

begin
@output.emit_events(tag, processed)
rescue => e
raise OutputError.new(e, processed)
end
end

class FilterOptimizer
Expand Down
17 changes: 17 additions & 0 deletions test/test_event_router.rb
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,23 @@ def filter_with_time(tag, time, record); end
event_router.emit('test', Engine.now, 'k' => 'v')
end
end

test 'can pass records modified by filters to handle_emits_error' do
filter = Class.new(FluentTestFilter) {
def filter_stream(_tag, es); end
}.new
event_router.add_rule('test', filter)
event_router.add_rule('test', error_output)

time = Engine.now
modified_es = OneEventStream.new(time, 'modified_label' => 'modified_value')

assert_rr do
stub(filter).filter_stream { modified_es }
mock(emit_handler).handle_emits_error('test', modified_es, is_a(RuntimeError))
event_router.emit('test', time, 'pre_label' => 'pre_value')
end
end
end
end
end

0 comments on commit 847bb39

Please sign in to comment.