Skip to content

Commit

Permalink
chunk.each (chunk.msgpack_each) should be available only for chunks c…
Browse files Browse the repository at this point in the history
…ontains msgpack binary data.

It's original design, but not implemented when v0.14 output plugins were introduced.
Currently there are not so many output plugins using v0.14 buffered output API, so it's time to change.
chunk.each (chunk.msgpack_each) will be available only for:
* output plugins using standard format (doesn't implement #format method)
* output plugins marked to have #format method returning msgpack binary data
  • Loading branch information
tagomoris committed Oct 6, 2016
1 parent 36b547d commit c290962
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 4 deletions.
1 change: 0 additions & 1 deletion lib/fluent/plugin/buffer/chunk.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ class Buffer # fluent/plugin/buffer is already loaded
class Chunk
include MonitorMixin
include UniqueId::Mixin
include ChunkMessagePackEventStreamer

# Chunks has 2 part:
# * metadata: contains metadata which should be restored after resume (if possible)
Expand Down
18 changes: 15 additions & 3 deletions test/plugin/test_buffer_chunk.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ class BufferChunkTest < Test::Unit::TestCase
assert chunk.respond_to?(:read)
assert chunk.respond_to?(:open)
assert chunk.respond_to?(:write_to)
assert chunk.respond_to?(:msgpack_each)
assert_raise(NotImplementedError){ chunk.append([]) }
assert_raise(NotImplementedError){ chunk.concat(nil, 0) }
assert_raise(NotImplementedError){ chunk.commit }
Expand All @@ -43,7 +42,19 @@ class BufferChunkTest < Test::Unit::TestCase
assert_raise(NotImplementedError){ chunk.read }
assert_raise(NotImplementedError){ chunk.open(){} }
assert_raise(NotImplementedError){ chunk.write_to(nil) }
assert_raise(NotImplementedError){ chunk.msgpack_each(){|v| v} }
assert !chunk.respond_to?(:msgpack_each)
end

test 'has method #each and #msgpack_each only when extended by ChunkMessagePackEventStreamer' do
meta = Object.new
chunk = Fluent::Plugin::Buffer::Chunk.new(meta)

assert !chunk.respond_to?(:each)
assert !chunk.respond_to?(:msgpack_each)

chunk.extend Fluent::ChunkMessagePackEventStreamer
assert chunk.respond_to?(:each)
assert chunk.respond_to?(:msgpack_each)
end

test 'some methods raise ArgumentError with an option of `compressed: :gzip` and without extending Compressble`' do
Expand Down Expand Up @@ -162,9 +173,10 @@ def open(**kwargs)
assert "my data\nyour data\n", io.to_s
end

test 'can feed objects into blocks with unpacking msgpack' do
test 'can feed objects into blocks with unpacking msgpack if ChunkMessagePackEventStreamer is included' do
require 'msgpack'
c = TestChunk.new(Object.new)
c.extend Fluent::ChunkMessagePackEventStreamer
c.data << MessagePack.pack(['my data', 1])
c.data << MessagePack.pack(['your data', 2])
ary = []
Expand Down

0 comments on commit c290962

Please sign in to comment.