diff --git a/lib/fluent/plugin/multi_output.rb b/lib/fluent/plugin/multi_output.rb index 5f81396945..6c5784e86e 100644 --- a/lib/fluent/plugin/multi_output.rb +++ b/lib/fluent/plugin/multi_output.rb @@ -29,6 +29,7 @@ class MultiOutput < Base helpers :event_emitter # to get router from agent, which will be supplied to child plugins config_section :store, param_name: :stores, multi: true, required: true do + config_argument :arg, :string, default: '' config_param :@type, :string, default: nil end diff --git a/lib/fluent/plugin/out_copy.rb b/lib/fluent/plugin/out_copy.rb index 358c9f908d..c419e28324 100644 --- a/lib/fluent/plugin/out_copy.rb +++ b/lib/fluent/plugin/out_copy.rb @@ -25,6 +25,21 @@ class CopyOutput < MultiOutput desc 'If true, pass different record to each `store` plugin.' config_param :deep_copy, :bool, default: false + attr_reader :ignore_errors + + def initialize + super + @ignore_errors = [] + end + + def configure(conf) + super + + @stores.each { |store| + @ignore_errors << (store.arg == 'ignore_error') + } + end + def multi_workers_ready? true end @@ -38,8 +53,16 @@ def process(tag, es) es = m end - outputs.each do |output| - output.emit_events(tag, @deep_copy ? es.dup : es) + outputs.each_with_index do |output, i| + begin + output.emit_events(tag, @deep_copy ? es.dup : es) + rescue => e + if @ignore_errors[i] + log.error "ignore emit error", error: e + else + raise e + end + end end end end diff --git a/test/plugin/test_out_copy.rb b/test/plugin/test_out_copy.rb index e1dbb055af..7aa65c34e7 100644 --- a/test/plugin/test_out_copy.rb +++ b/test/plugin/test_out_copy.rb @@ -156,5 +156,36 @@ def test_deep_copy_controls_shallow_or_deep_copied(data) end end end + + IGNORE_ERROR_CONFIG = %[ + + @type test + name c0 + + + @type test + name c1 + + + @type test + name c2 + + ] + + def test_ignore_error + d = create_driver(IGNORE_ERROR_CONFIG) + + # override to raise an error + d.instance.outputs[0].define_singleton_method(:process) do |tag, es| + raise ArgumentError, 'Failed' + end + + time = Time.parse("2011-01-02 13:14:15 UTC").to_i + assert_nothing_raised do + d.run(default_tag: 'test') do + d.feed(time, {"a"=>1}) + end + end + end end