diff --git a/lib/fluent/plugin/out_copy.rb b/lib/fluent/plugin/out_copy.rb
index c419e28324..1e79d98909 100644
--- a/lib/fluent/plugin/out_copy.rb
+++ b/lib/fluent/plugin/out_copy.rb
@@ -23,7 +23,9 @@ class CopyOutput < MultiOutput
Fluent::Plugin.register_output('copy', self)
desc 'If true, pass different record to each `store` plugin.'
- config_param :deep_copy, :bool, default: false
+ config_param :deep_copy, :bool, default: false, deprecated: "use 'copy_mode' parameter instead"
+ desc 'Pass different record to each `store` plugin by specified method'
+ config_param :copy_mode, :enum, list: [:no_copy, :shallow, :deep, :marshal], default: :no_copy
attr_reader :ignore_errors
@@ -35,6 +37,7 @@ def initialize
def configure(conf)
super
+ @copy_proc = gen_copy_proc
@stores.each { |store|
@ignore_errors << (store.arg == 'ignore_error')
}
@@ -55,7 +58,7 @@ def process(tag, es)
outputs.each_with_index do |output, i|
begin
- output.emit_events(tag, @deep_copy ? es.dup : es)
+ output.emit_events(tag, @copy_proc ? @copy_proc.call(es) : es)
rescue => e
if @ignore_errors[i]
log.error "ignore emit error", error: e
@@ -65,5 +68,40 @@ def process(tag, es)
end
end
end
+
+ private
+
+ def gen_copy_proc
+ @copy_mode = :shallow if @deep_copy
+
+ case @copy_mode
+ when :no_copy
+ nil
+ when :shallow
+ Proc.new { |es| es.dup }
+ when :deep
+ Proc.new { |es|
+ packer = Fluent::MessagePackFactory.msgpack_packer
+ times = []
+ records = []
+ es.each { |time, record|
+ times << time
+ packer.pack(record)
+ }
+ Fluent::MessagePackFactory.msgpack_unpacker.feed_each(packer.full_pack) { |record|
+ records << record
+ }
+ Fluent::MultiEventStream.new(times, records)
+ }
+ when :marshal
+ Proc.new { |es|
+ new_es = Fluent::MultiEventStream.new
+ es.each { |time, record|
+ new_es.add(time, Marshal.load(Marshal.dump(record)))
+ }
+ new_es
+ }
+ end
+ end
end
end
diff --git a/test/plugin/test_out_copy.rb b/test/plugin/test_out_copy.rb
index 7aa65c34e7..e4503be997 100644
--- a/test/plugin/test_out_copy.rb
+++ b/test/plugin/test_out_copy.rb
@@ -50,6 +50,22 @@ def test_configure
assert_equal "c0", outputs[0].name
assert_equal "c1", outputs[1].name
assert_equal "c2", outputs[2].name
+ assert_false d.instance.deep_copy
+ assert_equal :no_copy, d.instance.copy_mode
+ end
+
+ def test_configure_with_deep_copy_and_use_shallow_copy_mode
+ d = create_driver(%[
+ deep_copy true
+
+ @type test
+ name c0
+
+ ])
+
+ outputs = d.instance.outputs
+ assert_true d.instance.deep_copy
+ assert_equal :shallow, d.instance.copy_mode
end
def test_feed_events
@@ -86,9 +102,9 @@ def test_msgpack_unpacker_cache_bug_for_msgpack_event_stream
}
end
- def create_event_test_driver(does_deep_copy = false)
+ def create_event_test_driver(copy_mode = 'no_copy')
config = %[
- deep_copy #{does_deep_copy}
+ copy_mode #{copy_mode}
@type test
name output1
@@ -110,49 +126,60 @@ def create_event_test_driver(does_deep_copy = false)
end
time = event_time("2013-05-26 06:37:22 UTC")
- mes0 = Fluent::MultiEventStream.new
- mes0.add(time, {"a" => 1})
- mes0.add(time, {"b" => 1})
- mes1 = Fluent::MultiEventStream.new
- mes1.add(time, {"a" => 1})
- mes1.add(time, {"b" => 1})
+ gen_multi_es = Proc.new {
+ es = Fluent::MultiEventStream.new
+ es.add(time, {"a" => 1, "nest" => {'k' => 'v'}})
+ es.add(time, {"b" => 1, "nest" => {'k' => 'v'}})
+ es
+ }
data(
- "OneEventStream without deep_copy" => [false, Fluent::OneEventStream.new(time, {"a" => 1})],
- "OneEventStream with deep_copy" => [true, Fluent::OneEventStream.new(time, {"a" => 1})],
- "ArrayEventStream without deep_copy" => [false, Fluent::ArrayEventStream.new([ [time, {"a" => 1}], [time, {"b" => 2}] ])],
- "ArrayEventStream with deep_copy" => [true, Fluent::ArrayEventStream.new([ [time, {"a" => 1}], [time, {"b" => 2}] ])],
- "MultiEventStream without deep_copy" => [false, mes0],
- "MultiEventStream with deep_copy" => [true, mes1],
+ "OneEventStream without copy" => ['no_copy', Fluent::OneEventStream.new(time, {"a" => 1, "nest" => {'k' => 'v'}})],
+ "OneEventStream with shallow" => ['shallow', Fluent::OneEventStream.new(time, {"a" => 1, "nest" => {'k' => 'v'}})],
+ "OneEventStream with marshal" => ['marshal', Fluent::OneEventStream.new(time, {"a" => 1, "nest" => {'k' => 'v'}})],
+ "OneEventStream with deep" => ['deep', Fluent::OneEventStream.new(time, {"a" => 1, "nest" => {'k' => 'v'}})],
+ "ArrayEventStream without copy" => ['no_copy', Fluent::ArrayEventStream.new([[time, {"a" => 1, "nest" => {'k' => 'v'}}], [time, {"b" => 2, "nest" => {'k' => 'v'}}]])],
+ "ArrayEventStream with shallow" => ['shallow', Fluent::ArrayEventStream.new([[time, {"a" => 1, "nest" => {'k' => 'v'}}], [time, {"b" => 2, "nest" => {'k' => 'v'}}]])],
+ "ArrayEventStream with marshal" => ['marshal', Fluent::ArrayEventStream.new([[time, {"a" => 1, "nest" => {'k' => 'v'}}], [time, {"b" => 2, "nest" => {'k' => 'v'}}]])],
+ "ArrayEventStream with deep" => ['deep', Fluent::ArrayEventStream.new([[time, {"a" => 1, "nest" => {'k' => 'v'}}], [time, {"b" => 2, "nest" => {'k' => 'v'}}]])],
+ "MultiEventStream without copy" => ['no_copy', gen_multi_es.call],
+ "MultiEventStream with shallow" => ['shallow', gen_multi_es.call],
+ "MultiEventStream with marshal" => ['marshal', gen_multi_es.call],
+ "MultiEventStream with deep" => ['deep', gen_multi_es.call],
)
- def test_deep_copy_controls_shallow_or_deep_copied(data)
- does_deep_copy, es = data
-
- d = create_event_test_driver(does_deep_copy)
+ def test_copy_mode_with_event_streams(data)
+ copy_mode, es = data
+ d = create_event_test_driver(copy_mode)
d.run(default_tag: 'test') do
d.feed(es)
end
events = d.instance.outputs.map(&:events)
- if does_deep_copy
+ if copy_mode != 'no_copy'
events[0].each_with_index do |entry0, i|
record0 = entry0.last
record1 = events[1][i].last
- assert{ record0.object_id != record1.object_id }
+ assert_not_equal record0.object_id, record1.object_id
assert_equal "bar", record0["foo"]
assert !record1.has_key?("foo")
+ if copy_mode == 'shallow'
+ assert_equal record0['nest'].object_id, record1['nest'].object_id
+ else
+ assert_not_equal record0['nest'].object_id, record1['nest'].object_id
+ end
end
else
events[0].each_with_index do |entry0, i|
record0 = entry0.last
record1 = events[1][i].last
- assert{ record0.object_id == record1.object_id }
+ assert_equal record0.object_id, record1.object_id
assert_equal "bar", record0["foo"]
assert_equal "bar", record1["foo"]
+ assert_equal record0['nest'].object_id, record1['nest'].object_id
end
end
end