Skip to content

Commit

Permalink
Merge pull request #1547 from fluent/make-formatted_to_msgpack_binary…
Browse files Browse the repository at this point in the history
…-rubyish

Add formatted_to_msgpack_binary? to Output plugin API. fix #1538
  • Loading branch information
repeatedly authored Apr 19, 2017
2 parents 2ac070c + e8aa739 commit 4dddd63
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 3 deletions.
7 changes: 6 additions & 1 deletion lib/fluent/plugin/output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,17 @@ def format(tag, time, record)
raise NotImplementedError, "BUG: output plugins MUST implement this method"
end

def formatted_to_msgpack_binary
def formatted_to_msgpack_binary?
# To indicate custom format method (#format) returns msgpack binary or not.
# If #format returns msgpack binary, override this method to return true.
false
end

# Compatibility for existing plugins
def formatted_to_msgpack_binary
formatted_to_msgpack_binary?
end

def prefer_buffered_processing
# override this method to return false only when all of these are true:
# * plugin has both implementation for buffered and non-buffered methods
Expand Down
32 changes: 30 additions & 2 deletions test/plugin/test_output_as_buffered.rb
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,31 @@ def try_write(chunk)
end
end
class DummyCustomFormatBufferedOutput < DummyBareOutput
def initialize
super
@format_type_is_msgpack = nil
@prefer_delayed_commit = nil
@write = nil
@try_write = nil
end
def format(tag, time, record)
@format ? @format.call(tag, time, record) : [tag, time, record].to_json
end
def formatted_to_msgpack_binary?
@format_type_is_msgpack ? @format_type_is_msgpack.call : false
end
def prefer_delayed_commit
@prefer_delayed_commit ? @prefer_delayed_commit.call : false
end
def write(chunk)
@write ? @write.call(chunk) : nil
end
def try_write(chunk)
@try_write ? @try_write.call(chunk) : nil
end
end
# check for formatted_to_msgpack_binary compatibility
class DummyOldCustomFormatBufferedOutput < DummyBareOutput
def initialize
super
@format_type_is_msgpack = nil
Expand Down Expand Up @@ -163,6 +188,7 @@ def create_output(type=:full)
when :full then FluentPluginOutputAsBufferedTest::DummyFullFeatureOutput.new
when :old_buf then FluentPluginOutputAsBufferedTest::DummyOldBufferedOutput.new
when :old_obj then FluentPluginOutputAsBufferedTest::DummyOldObjectBufferedOutput.new
when :old_custom then FluentPluginOutputAsBufferedTest::DummyOldCustomFormatBufferedOutput.new
else
raise ArgumentError, "unknown type: #{type}"
end
Expand Down Expand Up @@ -306,9 +332,11 @@ def waiting(seconds)
assert_equal 0, events_from_chunk.size
end

test 'plugin using custom format can iterate chunk in #write if #format returns msgpack' do
data('formatted_to_msgpack_binary?' => :custom,
'formatted_to_msgpack_binary' => :old_custom)
test 'plugin using custom format can iterate chunk in #write if #format returns msgpack' do |out_type|
events_from_chunk = []
@i = create_output(:custom)
@i = create_output(out_type)
@i.configure(config_element('ROOT','',{},[config_element('buffer','',@hash)]))
@i.register(:prefer_delayed_commit){ false }
@i.register(:format){ |tag, time, record| [tag,time,record].to_msgpack }
Expand Down

0 comments on commit 4dddd63

Please sign in to comment.