Skip to content

Commit

Permalink
Merge pull request #1263 from fluent/enable-msgpack-each-for-output-w…
Browse files Browse the repository at this point in the history
…ith-custom-format

Enable chunk.each only for limited plugins using msgpack
  • Loading branch information
tagomoris authored Oct 7, 2016
2 parents 36b547d + f63b675 commit 0eb12ba
Show file tree
Hide file tree
Showing 4 changed files with 247 additions and 5 deletions.
1 change: 0 additions & 1 deletion lib/fluent/plugin/buffer/chunk.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ class Buffer # fluent/plugin/buffer is already loaded
class Chunk
include MonitorMixin
include UniqueId::Mixin
include ChunkMessagePackEventStreamer

# Chunks has 2 part:
# * metadata: contains metadata which should be restored after resume (if possible)
Expand Down
10 changes: 9 additions & 1 deletion lib/fluent/plugin/output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,12 @@ def format(tag, time, record)
raise NotImplementedError, "BUG: output plugins MUST implement this method"
end

def formatted_to_msgpack_binary
# To indicate custom format method (#format) returns msgpack binary or not.
# If #format returns msgpack binary, override this method to return true.
false
end

def prefer_buffered_processing
# override this method to return false only when all of these are true:
# * plugin has both implementation for buffered and non-buffered methods
Expand Down Expand Up @@ -176,6 +182,7 @@ def initialize
@buffering = true
end
@custom_format = implement?(:custom_format)
@enable_msgpack_streamer = false # decided later

@buffer = nil
@secondary = nil
Expand Down Expand Up @@ -340,6 +347,7 @@ def start
end

@custom_format = implement?(:custom_format)
@enable_msgpack_streamer = @custom_format ? formatted_to_msgpack_binary : true
@delayed_commit = if implement?(:buffered) && implement?(:delayed_commit)
prefer_delayed_commit
else
Expand Down Expand Up @@ -955,7 +963,7 @@ def try_flush
using_secondary = true
end

unless @custom_format
if @enable_msgpack_streamer
chunk.extend ChunkMessagePackEventStreamer
end

Expand Down
18 changes: 15 additions & 3 deletions test/plugin/test_buffer_chunk.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ class BufferChunkTest < Test::Unit::TestCase
assert chunk.respond_to?(:read)
assert chunk.respond_to?(:open)
assert chunk.respond_to?(:write_to)
assert chunk.respond_to?(:msgpack_each)
assert_raise(NotImplementedError){ chunk.append([]) }
assert_raise(NotImplementedError){ chunk.concat(nil, 0) }
assert_raise(NotImplementedError){ chunk.commit }
Expand All @@ -43,7 +42,19 @@ class BufferChunkTest < Test::Unit::TestCase
assert_raise(NotImplementedError){ chunk.read }
assert_raise(NotImplementedError){ chunk.open(){} }
assert_raise(NotImplementedError){ chunk.write_to(nil) }
assert_raise(NotImplementedError){ chunk.msgpack_each(){|v| v} }
assert !chunk.respond_to?(:msgpack_each)
end

test 'has method #each and #msgpack_each only when extended by ChunkMessagePackEventStreamer' do
meta = Object.new
chunk = Fluent::Plugin::Buffer::Chunk.new(meta)

assert !chunk.respond_to?(:each)
assert !chunk.respond_to?(:msgpack_each)

chunk.extend Fluent::ChunkMessagePackEventStreamer
assert chunk.respond_to?(:each)
assert chunk.respond_to?(:msgpack_each)
end

test 'some methods raise ArgumentError with an option of `compressed: :gzip` and without extending Compressble`' do
Expand Down Expand Up @@ -162,9 +173,10 @@ def open(**kwargs)
assert "my data\nyour data\n", io.to_s
end

test 'can feed objects into blocks with unpacking msgpack' do
test 'can feed objects into blocks with unpacking msgpack if ChunkMessagePackEventStreamer is included' do
require 'msgpack'
c = TestChunk.new(Object.new)
c.extend Fluent::ChunkMessagePackEventStreamer
c.data << MessagePack.pack(['my data', 1])
c.data << MessagePack.pack(['your data', 2])
ary = []
Expand Down
223 changes: 223 additions & 0 deletions test/plugin/test_output_as_buffered.rb
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,47 @@ def shutdown
super
end
end
class DummyStandardBufferedOutput < DummyBareOutput
def initialize
super
@prefer_delayed_commit = nil
@write = nil
@try_write = nil
end
def prefer_delayed_commit
@prefer_delayed_commit ? @prefer_delayed_commit.call : false
end
def write(chunk)
@write ? @write.call(chunk) : nil
end
def try_write(chunk)
@try_write ? @try_write.call(chunk) : nil
end
end
class DummyCustomFormatBufferedOutput < DummyBareOutput
def initialize
super
@format_type_is_msgpack = nil
@prefer_delayed_commit = nil
@write = nil
@try_write = nil
end
def format(tag, time, record)
@format ? @format.call(tag, time, record) : [tag, time, record].to_json
end
def formatted_to_msgpack_binary
@format_type_is_msgpack ? @format_type_is_msgpack.call : false
end
def prefer_delayed_commit
@prefer_delayed_commit ? @prefer_delayed_commit.call : false
end
def write(chunk)
@write ? @write.call(chunk) : nil
end
def try_write(chunk)
@try_write ? @try_write.call(chunk) : nil
end
end
class DummyFullFeatureOutput < DummyBareOutput
def initialize
super
Expand Down Expand Up @@ -94,6 +135,8 @@ def create_output(type=:full)
when :sync then FluentPluginOutputAsBufferedTest::DummySyncOutput.new
when :buffered then FluentPluginOutputAsBufferedTest::DummyAsyncOutput.new
when :delayed then FluentPluginOutputAsBufferedTest::DummyDelayedOutput.new
when :standard then FluentPluginOutputAsBufferedTest::DummyStandardBufferedOutput.new
when :custom then FluentPluginOutputAsBufferedTest::DummyCustomFormatBufferedOutput.new
when :full then FluentPluginOutputAsBufferedTest::DummyFullFeatureOutput.new
else
raise ArgumentError, "unknown type: #{type}"
Expand Down Expand Up @@ -125,6 +168,186 @@ def waiting(seconds)
Timecop.return
end

sub_test_case 'chunk feature in #write for output plugins' do
setup do
@stored_global_logger = $log
$log = Fluent::Test::TestLogger.new
@hash = {
'flush_mode' => 'immediate',
'flush_thread_interval' => '0.01',
'flush_thread_burst_interval' => '0.01',
}
end

teardown do
$log = @stored_global_logger
end

test 'plugin using standard format can iterate chunk for time, record in #write' do
events_from_chunk = []
@i = create_output(:standard)
@i.configure(config_element('ROOT','',{},[config_element('buffer','',@hash)]))
@i.register(:prefer_delayed_commit){ false }
@i.register(:write){ |chunk| e = []; assert chunk.respond_to?(:each); chunk.each{|t,r| e << [t,r]}; events_from_chunk << [:write, e] }
@i.register(:try_write){ |chunk| e = []; assert chunk.respond_to?(:each); chunk.each{|t,r| e << [t,r]}; events_from_chunk << [:try_write, e] }
@i.start
@i.after_start

events = [
[event_time('2016-10-05 16:16:16 -0700'), {"message" => "yaaaaaaaaay!"}],
[event_time('2016-10-05 16:16:17 -0700'), {"message" => "yoooooooooy!"}],
]

@i.emit_events("test.tag", Fluent::ArrayEventStream.new(events))
@i.emit_events("test.tag", Fluent::ArrayEventStream.new(events))

waiting(5){ sleep 0.1 until events_from_chunk.size == 2 }

assert_equal 2, events_from_chunk.size
2.times.each do |i|
assert_equal :write, events_from_chunk[i][0]
assert_equal events, events_from_chunk[i][1]
end
end

test 'plugin using standard format can iterate chunk for time, record in #try_write' do
events_from_chunk = []
@i = create_output(:standard)
@i.configure(config_element('ROOT','',{},[config_element('buffer','',@hash)]))
@i.register(:prefer_delayed_commit){ true }
@i.register(:write){ |chunk| e = []; assert chunk.respond_to?(:each); chunk.each{|t,r| e << [t,r]}; events_from_chunk << [:write, e] }
@i.register(:try_write){ |chunk| e = []; assert chunk.respond_to?(:each); chunk.each{|t,r| e << [t,r]}; events_from_chunk << [:try_write, e] }
@i.start
@i.after_start

events = [
[event_time('2016-10-05 16:16:16 -0700'), {"message" => "yaaaaaaaaay!"}],
[event_time('2016-10-05 16:16:17 -0700'), {"message" => "yoooooooooy!"}],
]

@i.emit_events("test.tag", Fluent::ArrayEventStream.new(events))
@i.emit_events("test.tag", Fluent::ArrayEventStream.new(events))

waiting(5){ sleep 0.1 until events_from_chunk.size == 2 }

assert_equal 2, events_from_chunk.size
2.times.each do |i|
assert_equal :try_write, events_from_chunk[i][0]
assert_equal events, events_from_chunk[i][1]
end
end

test 'plugin using custom format cannot iterate chunk in #write' do
events_from_chunk = []
@i = create_output(:custom)
@i.configure(config_element('ROOT','',{},[config_element('buffer','',@hash)]))
@i.register(:prefer_delayed_commit){ false }
@i.register(:format){ |tag, time, record| [tag,time,record].to_json }
@i.register(:format_type_is_msgpack){ false }
@i.register(:write){ |chunk| assert !(chunk.respond_to?(:each)) }
@i.register(:try_write){ |chunk| assert !(chunk.respond_to?(:each)) }
@i.start
@i.after_start

events = [
[event_time('2016-10-05 16:16:16 -0700'), {"message" => "yaaaaaaaaay!"}],
[event_time('2016-10-05 16:16:17 -0700'), {"message" => "yoooooooooy!"}],
]

@i.emit_events("test.tag", Fluent::ArrayEventStream.new(events))

assert_equal 0, events_from_chunk.size
end

test 'plugin using custom format cannot iterate chunk in #try_write' do
events_from_chunk = []
@i = create_output(:custom)
@i.configure(config_element('ROOT','',{},[config_element('buffer','',@hash)]))
@i.register(:prefer_delayed_commit){ true }
@i.register(:format){ |tag, time, record| [tag,time,record].to_json }
@i.register(:format_type_is_msgpack){ false }
@i.register(:write){ |chunk| assert !(chunk.respond_to?(:each)) }
@i.register(:try_write){ |chunk| assert !(chunk.respond_to?(:each)) }
@i.start
@i.after_start

events = [
[event_time('2016-10-05 16:16:16 -0700'), {"message" => "yaaaaaaaaay!"}],
[event_time('2016-10-05 16:16:17 -0700'), {"message" => "yoooooooooy!"}],
]

@i.emit_events("test.tag", Fluent::ArrayEventStream.new(events))

assert_equal 0, events_from_chunk.size
end

test 'plugin using custom format can iterate chunk in #write if #format returns msgpack' do
events_from_chunk = []
@i = create_output(:custom)
@i.configure(config_element('ROOT','',{},[config_element('buffer','',@hash)]))
@i.register(:prefer_delayed_commit){ false }
@i.register(:format){ |tag, time, record| [tag,time,record].to_msgpack }
@i.register(:format_type_is_msgpack){ true }
@i.register(:write){ |chunk| e = []; assert chunk.respond_to?(:each); chunk.each{|ta,t,r| e << [ta,t,r]}; events_from_chunk << [:write, e] }
@i.register(:try_write){ |chunk| e = []; assert chunk.respond_to?(:each); chunk.each{|ta,t,r| e << [ta,t,r]}; events_from_chunk << [:try_write, e] }
@i.start
@i.after_start

events = [
[event_time('2016-10-05 16:16:16 -0700'), {"message" => "yaaaaaaaaay!"}],
[event_time('2016-10-05 16:16:17 -0700'), {"message" => "yoooooooooy!"}],
]

@i.emit_events("test.tag", Fluent::ArrayEventStream.new(events))
@i.emit_events("test.tag", Fluent::ArrayEventStream.new(events))

waiting(5){ sleep 0.1 until events_from_chunk.size == 2 }

assert_equal 2, events_from_chunk.size
2.times.each do |i|
assert_equal :write, events_from_chunk[i][0]
each_pushed = events_from_chunk[i][1]
assert_equal 2, each_pushed.size
assert_equal 'test.tag', each_pushed[0][0]
assert_equal 'test.tag', each_pushed[1][0]
assert_equal events, each_pushed.map{|tag,time,record| [time,record]}
end
end

test 'plugin using custom format can iterate chunk in #try_write if #format returns msgpack' do
events_from_chunk = []
@i = create_output(:custom)
@i.configure(config_element('ROOT','',{},[config_element('buffer','',@hash)]))
@i.register(:prefer_delayed_commit){ true }
@i.register(:format){ |tag, time, record| [tag,time,record].to_msgpack }
@i.register(:format_type_is_msgpack){ true }
@i.register(:write){ |chunk| events_from_chunk = []; assert chunk.respond_to?(:each); chunk.each{|ta,t,r| e << [ta,t,r]}; events_from_chunk << [:write, e] }
@i.register(:try_write){ |chunk| e = []; assert chunk.respond_to?(:each); chunk.each{|ta,t,r| e << [ta,t,r]}; events_from_chunk << [:try_write, e] }
@i.start
@i.after_start

events = [
[event_time('2016-10-05 16:16:16 -0700'), {"message" => "yaaaaaaaaay!"}],
[event_time('2016-10-05 16:16:17 -0700'), {"message" => "yoooooooooy!"}],
]

@i.emit_events("test.tag", Fluent::ArrayEventStream.new(events))
@i.emit_events("test.tag", Fluent::ArrayEventStream.new(events))

waiting(5){ sleep 0.1 until events_from_chunk.size == 2 }

assert_equal 2, events_from_chunk.size
2.times.each do |i|
assert_equal :try_write, events_from_chunk[i][0]
each_pushed = events_from_chunk[i][1]
assert_equal 2, each_pushed.size
assert_equal 'test.tag', each_pushed[0][0]
assert_equal 'test.tag', each_pushed[1][0]
assert_equal events, each_pushed.map{|tag,time,record| [time,record]}
end
end
end

sub_test_case 'buffered output configured with many chunk keys' do
setup do
@stored_global_logger = $log
Expand Down

0 comments on commit 0eb12ba

Please sign in to comment.