Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

out_copy: Add copy_mode parameter. fix #2744 #2747

Merged
merged 1 commit into from
Dec 25, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 40 additions & 2 deletions lib/fluent/plugin/out_copy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ class CopyOutput < MultiOutput
Fluent::Plugin.register_output('copy', self)

desc 'If true, pass different record to each `store` plugin.'
config_param :deep_copy, :bool, default: false
config_param :deep_copy, :bool, default: false, deprecated: "use 'copy_mode' parameter instead"
desc 'Pass different record to each `store` plugin by specified method'
config_param :copy_mode, :enum, list: [:no_copy, :shallow, :deep, :marshal], default: :no_copy

attr_reader :ignore_errors

Expand All @@ -35,6 +37,7 @@ def initialize
def configure(conf)
super

@copy_proc = gen_copy_proc
@stores.each { |store|
@ignore_errors << (store.arg == 'ignore_error')
}
Expand All @@ -55,7 +58,7 @@ def process(tag, es)

outputs.each_with_index do |output, i|
begin
output.emit_events(tag, @deep_copy ? es.dup : es)
output.emit_events(tag, @copy_proc ? @copy_proc.call(es) : es)
rescue => e
if @ignore_errors[i]
log.error "ignore emit error", error: e
Expand All @@ -65,5 +68,40 @@ def process(tag, es)
end
end
end

private

def gen_copy_proc
@copy_mode = :shallow if @deep_copy

case @copy_mode
when :no_copy
nil
when :shallow
Proc.new { |es| es.dup }
when :deep
Proc.new { |es|
packer = Fluent::MessagePackFactory.msgpack_packer
times = []
records = []
es.each { |time, record|
times << time
packer.pack(record)
}
Fluent::MessagePackFactory.msgpack_unpacker.feed_each(packer.full_pack) { |record|
records << record
}
Fluent::MultiEventStream.new(times, records)
}
when :marshal
Proc.new { |es|
new_es = Fluent::MultiEventStream.new
es.each { |time, record|
new_es.add(time, Marshal.load(Marshal.dump(record)))
}
new_es
}
end
end
end
end
69 changes: 48 additions & 21 deletions test/plugin/test_out_copy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,22 @@ def test_configure
assert_equal "c0", outputs[0].name
assert_equal "c1", outputs[1].name
assert_equal "c2", outputs[2].name
assert_false d.instance.deep_copy
assert_equal :no_copy, d.instance.copy_mode
end

def test_configure_with_deep_copy_and_use_shallow_copy_mode
d = create_driver(%[
deep_copy true
<store>
@type test
name c0
</store>
])

outputs = d.instance.outputs
assert_true d.instance.deep_copy
assert_equal :shallow, d.instance.copy_mode
end

def test_feed_events
Expand Down Expand Up @@ -86,9 +102,9 @@ def test_msgpack_unpacker_cache_bug_for_msgpack_event_stream
}
end

def create_event_test_driver(does_deep_copy = false)
def create_event_test_driver(copy_mode = 'no_copy')
config = %[
deep_copy #{does_deep_copy}
copy_mode #{copy_mode}
<store>
@type test
name output1
Expand All @@ -110,49 +126,60 @@ def create_event_test_driver(does_deep_copy = false)
end

time = event_time("2013-05-26 06:37:22 UTC")
mes0 = Fluent::MultiEventStream.new
mes0.add(time, {"a" => 1})
mes0.add(time, {"b" => 1})
mes1 = Fluent::MultiEventStream.new
mes1.add(time, {"a" => 1})
mes1.add(time, {"b" => 1})
gen_multi_es = Proc.new {
es = Fluent::MultiEventStream.new
es.add(time, {"a" => 1, "nest" => {'k' => 'v'}})
es.add(time, {"b" => 1, "nest" => {'k' => 'v'}})
es
}

data(
"OneEventStream without deep_copy" => [false, Fluent::OneEventStream.new(time, {"a" => 1})],
"OneEventStream with deep_copy" => [true, Fluent::OneEventStream.new(time, {"a" => 1})],
"ArrayEventStream without deep_copy" => [false, Fluent::ArrayEventStream.new([ [time, {"a" => 1}], [time, {"b" => 2}] ])],
"ArrayEventStream with deep_copy" => [true, Fluent::ArrayEventStream.new([ [time, {"a" => 1}], [time, {"b" => 2}] ])],
"MultiEventStream without deep_copy" => [false, mes0],
"MultiEventStream with deep_copy" => [true, mes1],
"OneEventStream without copy" => ['no_copy', Fluent::OneEventStream.new(time, {"a" => 1, "nest" => {'k' => 'v'}})],
"OneEventStream with shallow" => ['shallow', Fluent::OneEventStream.new(time, {"a" => 1, "nest" => {'k' => 'v'}})],
"OneEventStream with marshal" => ['marshal', Fluent::OneEventStream.new(time, {"a" => 1, "nest" => {'k' => 'v'}})],
"OneEventStream with deep" => ['deep', Fluent::OneEventStream.new(time, {"a" => 1, "nest" => {'k' => 'v'}})],
"ArrayEventStream without copy" => ['no_copy', Fluent::ArrayEventStream.new([[time, {"a" => 1, "nest" => {'k' => 'v'}}], [time, {"b" => 2, "nest" => {'k' => 'v'}}]])],
"ArrayEventStream with shallow" => ['shallow', Fluent::ArrayEventStream.new([[time, {"a" => 1, "nest" => {'k' => 'v'}}], [time, {"b" => 2, "nest" => {'k' => 'v'}}]])],
"ArrayEventStream with marshal" => ['marshal', Fluent::ArrayEventStream.new([[time, {"a" => 1, "nest" => {'k' => 'v'}}], [time, {"b" => 2, "nest" => {'k' => 'v'}}]])],
"ArrayEventStream with deep" => ['deep', Fluent::ArrayEventStream.new([[time, {"a" => 1, "nest" => {'k' => 'v'}}], [time, {"b" => 2, "nest" => {'k' => 'v'}}]])],
"MultiEventStream without copy" => ['no_copy', gen_multi_es.call],
"MultiEventStream with shallow" => ['shallow', gen_multi_es.call],
"MultiEventStream with marshal" => ['marshal', gen_multi_es.call],
"MultiEventStream with deep" => ['deep', gen_multi_es.call],
)
def test_deep_copy_controls_shallow_or_deep_copied(data)
does_deep_copy, es = data

d = create_event_test_driver(does_deep_copy)
def test_copy_mode_with_event_streams(data)
copy_mode, es = data

d = create_event_test_driver(copy_mode)
d.run(default_tag: 'test') do
d.feed(es)
end

events = d.instance.outputs.map(&:events)

if does_deep_copy
if copy_mode != 'no_copy'
events[0].each_with_index do |entry0, i|
record0 = entry0.last
record1 = events[1][i].last

assert{ record0.object_id != record1.object_id }
assert_not_equal record0.object_id, record1.object_id
assert_equal "bar", record0["foo"]
assert !record1.has_key?("foo")
if copy_mode == 'shallow'
assert_equal record0['nest'].object_id, record1['nest'].object_id
else
assert_not_equal record0['nest'].object_id, record1['nest'].object_id
end
end
else
events[0].each_with_index do |entry0, i|
record0 = entry0.last
record1 = events[1][i].last

assert{ record0.object_id == record1.object_id }
assert_equal record0.object_id, record1.object_id
assert_equal "bar", record0["foo"]
assert_equal "bar", record1["foo"]
assert_equal record0['nest'].object_id, record1['nest'].object_id
end
end
end
Expand Down