Skip to content

Commit

Permalink
Merge pull request #1770 from fluent/out_copy-add-ignoer_error
Browse files Browse the repository at this point in the history
out_copy: support ignore_error in store section
  • Loading branch information
repeatedly authored Dec 5, 2017
2 parents b5c8b1d + bac9db0 commit ed82cf8
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 2 deletions.
1 change: 1 addition & 0 deletions lib/fluent/plugin/multi_output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class MultiOutput < Base
helpers :event_emitter # to get router from agent, which will be supplied to child plugins

config_section :store, param_name: :stores, multi: true, required: true do
config_argument :arg, :string, default: ''
config_param :@type, :string, default: nil
end

Expand Down
27 changes: 25 additions & 2 deletions lib/fluent/plugin/out_copy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,21 @@ class CopyOutput < MultiOutput
desc 'If true, pass different record to each `store` plugin.'
config_param :deep_copy, :bool, default: false

attr_reader :ignore_errors

def initialize
super
@ignore_errors = []
end

def configure(conf)
super

@stores.each { |store|
@ignore_errors << (store.arg == 'ignore_error')
}
end

def multi_workers_ready?
true
end
Expand All @@ -38,8 +53,16 @@ def process(tag, es)
es = m
end

outputs.each do |output|
output.emit_events(tag, @deep_copy ? es.dup : es)
outputs.each_with_index do |output, i|
begin
output.emit_events(tag, @deep_copy ? es.dup : es)
rescue => e
if @ignore_errors[i]
log.error "ignore emit error", error: e
else
raise e
end
end
end
end
end
Expand Down
31 changes: 31 additions & 0 deletions test/plugin/test_out_copy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -156,5 +156,36 @@ def test_deep_copy_controls_shallow_or_deep_copied(data)
end
end
end

IGNORE_ERROR_CONFIG = %[
<store ignore_error>
@type test
name c0
</store>
<store ignore_error>
@type test
name c1
</store>
<store>
@type test
name c2
</store>
]

def test_ignore_error
d = create_driver(IGNORE_ERROR_CONFIG)

# override to raise an error
d.instance.outputs[0].define_singleton_method(:process) do |tag, es|
raise ArgumentError, 'Failed'
end

time = Time.parse("2011-01-02 13:14:15 UTC").to_i
assert_nothing_raised do
d.run(default_tag: 'test') do
d.feed(time, {"a"=>1})
end
end
end
end

0 comments on commit ed82cf8

Please sign in to comment.