Skip to content

Commit

Permalink
Add method #formatted_to_msgpack_binary to mark that #format of this …
Browse files Browse the repository at this point in the history
…plugin returns msgpack binary

to enable chunk.each to iterate content objects.
  • Loading branch information
tagomoris committed Oct 6, 2016
1 parent c290962 commit 446d2dc
Show file tree
Hide file tree
Showing 2 changed files with 122 additions and 1 deletion.
10 changes: 9 additions & 1 deletion lib/fluent/plugin/output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,12 @@ def format(tag, time, record)
raise NotImplementedError, "BUG: output plugins MUST implement this method"
end

def formatted_to_msgpack_binary
# To indicate custom format method (#format) returns msgpack binary or not.
# If #format returns msgpack binary, override this method to return true.
false
end

def prefer_buffered_processing
# override this method to return false only when all of these are true:
# * plugin has both implementation for buffered and non-buffered methods
Expand Down Expand Up @@ -176,6 +182,7 @@ def initialize
@buffering = true
end
@custom_format = implement?(:custom_format)
@enable_msgpack_streamer = false # decided later

@buffer = nil
@secondary = nil
Expand Down Expand Up @@ -340,6 +347,7 @@ def start
end

@custom_format = implement?(:custom_format)
@enable_msgpack_streamer = @custom_format ? formatted_to_msgpack_binary : true
@delayed_commit = if implement?(:buffered) && implement?(:delayed_commit)
prefer_delayed_commit
else
Expand Down Expand Up @@ -955,7 +963,7 @@ def try_flush
using_secondary = true
end

unless @custom_format
if @enable_msgpack_streamer
chunk.extend ChunkMessagePackEventStreamer
end

Expand Down
113 changes: 113 additions & 0 deletions test/plugin/test_output_as_buffered.rb
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,47 @@ def shutdown
super
end
end
class DummyStandardBufferedOutput < DummyBareOutput
def initialize
super
@prefer_delayed_commit = nil
@write = nil
@try_write = nil
end
def prefer_delayed_commit
@prefer_delayed_commit ? @prefer_delayed_commit.call : false
end
def write(chunk)
@write ? @write.call(chunk) : nil
end
def try_write(chunk)
@try_write ? @try_write.call(chunk) : nil
end
end
class DummyCustomFormatBufferedOutput < DummyBareOutput
def initialize
super
@format_type_is_msgpack = nil
@prefer_delayed_commit = nil
@write = nil
@try_write = nil
end
def format(tag, time, record)
@format ? @format.call(tag, time, record) : [tag, time, record].to_json
end
def formatted_to_msgpack_binary
@format_type_is_msgpack ? @format_type_is_msgpack.call : false
end
def prefer_delayed_commit
@prefer_delayed_commit ? @prefer_delayed_commit.call : false
end
def write(chunk)
@write ? @write.call(chunk) : nil
end
def try_write(chunk)
@try_write ? @try_write.call(chunk) : nil
end
end
class DummyFullFeatureOutput < DummyBareOutput
def initialize
super
Expand Down Expand Up @@ -94,6 +135,8 @@ def create_output(type=:full)
when :sync then FluentPluginOutputAsBufferedTest::DummySyncOutput.new
when :buffered then FluentPluginOutputAsBufferedTest::DummyAsyncOutput.new
when :delayed then FluentPluginOutputAsBufferedTest::DummyDelayedOutput.new
when :standard then FluentPluginOutputAsBufferedTest::DummyStandardBufferedOutput.new
when :custom then FluentPluginOutputAsBufferedTest::DummyCustomFormatBufferedOutput.new
when :full then FluentPluginOutputAsBufferedTest::DummyFullFeatureOutput.new
else
raise ArgumentError, "unknown type: #{type}"
Expand Down Expand Up @@ -125,6 +168,76 @@ def waiting(seconds)
Timecop.return
end

sub_test_case 'chunk feature in #write for output plugins' do
setup do
@stored_global_logger = $log
$log = Fluent::Test::TestLogger.new
@hash = {
'flush_mode' => 'immediate',
'flush_thread_interval' => '0.01',
'flush_thread_burst_interval' => '0.01',
}
end

teardown do
$log = @stored_global_logger
end

test 'plugin using standard format can iterate chunk for time, record in #write' do
events_from_chunk = []
@i = create_output(:standard)
@i.configure(config_element('ROOT','',{},[config_element('buffer','',@hash)]))
@i.register(:prefer_delayed_commit){ false }
@i.register(:write){ |chunk| e = []; assert chunk.respond_to?(:each); chunk.each{|t,r| e << [t,r]}; events_from_chunk << [:write, e] }
@i.register(:try_write){ |chunk| e = []; assert chunk.respond_to?(:each); chunk.each{|t,r| e << [t,r]}; events_from_chunk << [:try_write, e] }
@i.start
@i.after_start

events = [
[event_time('2016-10-05 16:16:16 -0700'), {"message" => "yaaaaaaaaay!"}],
[event_time('2016-10-05 16:16:17 -0700'), {"message" => "yoooooooooy!"}],
]

@i.emit_events("test.tag", Fluent::ArrayEventStream.new(events))
@i.emit_events("test.tag", Fluent::ArrayEventStream.new(events))

waiting(5){ sleep 0.1 until events_from_chunk.size == 2 }

assert_equal 2, events_from_chunk.size
2.times.each do |i|
assert_equal :write, events_from_chunk[i][0]
assert_equal events, events_from_chunk[i][1]
end
end

test 'plugin using standard format can iterate chunk for time, record in #try_write' do
events_from_chunk = []
@i = create_output(:standard)
@i.configure(config_element('ROOT','',{},[config_element('buffer','',@hash)]))
@i.register(:prefer_delayed_commit){ true }
@i.register(:write){ |chunk| e = []; assert chunk.respond_to?(:each); chunk.each{|t,r| e << [t,r]}; events_from_chunk << [:write, e] }
@i.register(:try_write){ |chunk| e = []; assert chunk.respond_to?(:each); chunk.each{|t,r| e << [t,r]}; events_from_chunk << [:try_write, e] }
@i.start
@i.after_start

events = [
[event_time('2016-10-05 16:16:16 -0700'), {"message" => "yaaaaaaaaay!"}],
[event_time('2016-10-05 16:16:17 -0700'), {"message" => "yoooooooooy!"}],
]

@i.emit_events("test.tag", Fluent::ArrayEventStream.new(events))
@i.emit_events("test.tag", Fluent::ArrayEventStream.new(events))

waiting(5){ sleep 0.1 until events_from_chunk.size == 2 }

assert_equal 2, events_from_chunk.size
2.times.each do |i|
assert_equal :try_write, events_from_chunk[i][0]
assert_equal events, events_from_chunk[i][1]
end
end
end

sub_test_case 'buffered output configured with many chunk keys' do
setup do
@stored_global_logger = $log
Expand Down

0 comments on commit 446d2dc

Please sign in to comment.