diff --git a/lib/fluent/plugin/out_copy.rb b/lib/fluent/plugin/out_copy.rb index c419e28324..1e79d98909 100644 --- a/lib/fluent/plugin/out_copy.rb +++ b/lib/fluent/plugin/out_copy.rb @@ -23,7 +23,9 @@ class CopyOutput < MultiOutput Fluent::Plugin.register_output('copy', self) desc 'If true, pass different record to each `store` plugin.' - config_param :deep_copy, :bool, default: false + config_param :deep_copy, :bool, default: false, deprecated: "use 'copy_mode' parameter instead" + desc 'Pass different record to each `store` plugin by specified method' + config_param :copy_mode, :enum, list: [:no_copy, :shallow, :deep, :marshal], default: :no_copy attr_reader :ignore_errors @@ -35,6 +37,7 @@ def initialize def configure(conf) super + @copy_proc = gen_copy_proc @stores.each { |store| @ignore_errors << (store.arg == 'ignore_error') } @@ -55,7 +58,7 @@ def process(tag, es) outputs.each_with_index do |output, i| begin - output.emit_events(tag, @deep_copy ? es.dup : es) + output.emit_events(tag, @copy_proc ? @copy_proc.call(es) : es) rescue => e if @ignore_errors[i] log.error "ignore emit error", error: e @@ -65,5 +68,40 @@ def process(tag, es) end end end + + private + + def gen_copy_proc + @copy_mode = :shallow if @deep_copy + + case @copy_mode + when :no_copy + nil + when :shallow + Proc.new { |es| es.dup } + when :deep + Proc.new { |es| + packer = Fluent::MessagePackFactory.msgpack_packer + times = [] + records = [] + es.each { |time, record| + times << time + packer.pack(record) + } + Fluent::MessagePackFactory.msgpack_unpacker.feed_each(packer.full_pack) { |record| + records << record + } + Fluent::MultiEventStream.new(times, records) + } + when :marshal + Proc.new { |es| + new_es = Fluent::MultiEventStream.new + es.each { |time, record| + new_es.add(time, Marshal.load(Marshal.dump(record))) + } + new_es + } + end + end end end diff --git a/test/plugin/test_out_copy.rb b/test/plugin/test_out_copy.rb index 7aa65c34e7..e4503be997 100644 --- a/test/plugin/test_out_copy.rb +++ b/test/plugin/test_out_copy.rb @@ -50,6 +50,22 @@ def test_configure assert_equal "c0", outputs[0].name assert_equal "c1", outputs[1].name assert_equal "c2", outputs[2].name + assert_false d.instance.deep_copy + assert_equal :no_copy, d.instance.copy_mode + end + + def test_configure_with_deep_copy_and_use_shallow_copy_mode + d = create_driver(%[ + deep_copy true + + @type test + name c0 + + ]) + + outputs = d.instance.outputs + assert_true d.instance.deep_copy + assert_equal :shallow, d.instance.copy_mode end def test_feed_events @@ -86,9 +102,9 @@ def test_msgpack_unpacker_cache_bug_for_msgpack_event_stream } end - def create_event_test_driver(does_deep_copy = false) + def create_event_test_driver(copy_mode = 'no_copy') config = %[ - deep_copy #{does_deep_copy} + copy_mode #{copy_mode} @type test name output1 @@ -110,49 +126,60 @@ def create_event_test_driver(does_deep_copy = false) end time = event_time("2013-05-26 06:37:22 UTC") - mes0 = Fluent::MultiEventStream.new - mes0.add(time, {"a" => 1}) - mes0.add(time, {"b" => 1}) - mes1 = Fluent::MultiEventStream.new - mes1.add(time, {"a" => 1}) - mes1.add(time, {"b" => 1}) + gen_multi_es = Proc.new { + es = Fluent::MultiEventStream.new + es.add(time, {"a" => 1, "nest" => {'k' => 'v'}}) + es.add(time, {"b" => 1, "nest" => {'k' => 'v'}}) + es + } data( - "OneEventStream without deep_copy" => [false, Fluent::OneEventStream.new(time, {"a" => 1})], - "OneEventStream with deep_copy" => [true, Fluent::OneEventStream.new(time, {"a" => 1})], - "ArrayEventStream without deep_copy" => [false, Fluent::ArrayEventStream.new([ [time, {"a" => 1}], [time, {"b" => 2}] ])], - "ArrayEventStream with deep_copy" => [true, Fluent::ArrayEventStream.new([ [time, {"a" => 1}], [time, {"b" => 2}] ])], - "MultiEventStream without deep_copy" => [false, mes0], - "MultiEventStream with deep_copy" => [true, mes1], + "OneEventStream without copy" => ['no_copy', Fluent::OneEventStream.new(time, {"a" => 1, "nest" => {'k' => 'v'}})], + "OneEventStream with shallow" => ['shallow', Fluent::OneEventStream.new(time, {"a" => 1, "nest" => {'k' => 'v'}})], + "OneEventStream with marshal" => ['marshal', Fluent::OneEventStream.new(time, {"a" => 1, "nest" => {'k' => 'v'}})], + "OneEventStream with deep" => ['deep', Fluent::OneEventStream.new(time, {"a" => 1, "nest" => {'k' => 'v'}})], + "ArrayEventStream without copy" => ['no_copy', Fluent::ArrayEventStream.new([[time, {"a" => 1, "nest" => {'k' => 'v'}}], [time, {"b" => 2, "nest" => {'k' => 'v'}}]])], + "ArrayEventStream with shallow" => ['shallow', Fluent::ArrayEventStream.new([[time, {"a" => 1, "nest" => {'k' => 'v'}}], [time, {"b" => 2, "nest" => {'k' => 'v'}}]])], + "ArrayEventStream with marshal" => ['marshal', Fluent::ArrayEventStream.new([[time, {"a" => 1, "nest" => {'k' => 'v'}}], [time, {"b" => 2, "nest" => {'k' => 'v'}}]])], + "ArrayEventStream with deep" => ['deep', Fluent::ArrayEventStream.new([[time, {"a" => 1, "nest" => {'k' => 'v'}}], [time, {"b" => 2, "nest" => {'k' => 'v'}}]])], + "MultiEventStream without copy" => ['no_copy', gen_multi_es.call], + "MultiEventStream with shallow" => ['shallow', gen_multi_es.call], + "MultiEventStream with marshal" => ['marshal', gen_multi_es.call], + "MultiEventStream with deep" => ['deep', gen_multi_es.call], ) - def test_deep_copy_controls_shallow_or_deep_copied(data) - does_deep_copy, es = data - - d = create_event_test_driver(does_deep_copy) + def test_copy_mode_with_event_streams(data) + copy_mode, es = data + d = create_event_test_driver(copy_mode) d.run(default_tag: 'test') do d.feed(es) end events = d.instance.outputs.map(&:events) - if does_deep_copy + if copy_mode != 'no_copy' events[0].each_with_index do |entry0, i| record0 = entry0.last record1 = events[1][i].last - assert{ record0.object_id != record1.object_id } + assert_not_equal record0.object_id, record1.object_id assert_equal "bar", record0["foo"] assert !record1.has_key?("foo") + if copy_mode == 'shallow' + assert_equal record0['nest'].object_id, record1['nest'].object_id + else + assert_not_equal record0['nest'].object_id, record1['nest'].object_id + end end else events[0].each_with_index do |entry0, i| record0 = entry0.last record1 = events[1][i].last - assert{ record0.object_id == record1.object_id } + assert_equal record0.object_id, record1.object_id assert_equal "bar", record0["foo"] assert_equal "bar", record1["foo"] + assert_equal record0['nest'].object_id, record1['nest'].object_id end end end