diff --git a/lib/fluent/event.rb b/lib/fluent/event.rb index db644ccf4d..bcb06998ed 100644 --- a/lib/fluent/event.rb +++ b/lib/fluent/event.rb @@ -308,11 +308,15 @@ def ensure_decompressed! end module ChunkMessagePackEventStreamer - # chunk.extend(ChunkEventStreamer) + # chunk.extend(ChunkMessagePackEventStreamer) # => chunk.each{|time, record| ... } def each(unpacker: nil, &block) + # Note: If need to use `unpacker`, then implement it, + # e.g., `unpacker.feed_each(io.read, &block)` (Not tested) + raise NotImplementedError, "'unpacker' argument is not implemented." if unpacker + open do |io| - (unpacker || Fluent::MessagePackFactory.msgpack_unpacker(io)).each(&block) + Fluent::MessagePackFactory.msgpack_unpacker(io).each(&block) end nil end diff --git a/test/plugin/test_buffer_chunk.rb b/test/plugin/test_buffer_chunk.rb index 5a1bfeb9da..535b0acd1a 100644 --- a/test/plugin/test_buffer_chunk.rb +++ b/test/plugin/test_buffer_chunk.rb @@ -57,6 +57,17 @@ class BufferChunkTest < Test::Unit::TestCase assert chunk.respond_to?(:msgpack_each) end + test 'unpacker arg is not implemented for ChunkMessagePackEventStreamer' do + meta = Object.new + chunk = Fluent::Plugin::Buffer::Chunk.new(meta) + chunk.extend Fluent::ChunkMessagePackEventStreamer + + unpacker = Fluent::MessagePackFactory.thread_local_msgpack_unpacker + + assert_raise(NotImplementedError){ chunk.each(unpacker: unpacker) } + assert_raise(NotImplementedError){ chunk.msgpack_each(unpacker: unpacker) } + end + test 'some methods raise ArgumentError with an option of `compressed: :gzip` and without extending Compressble`' do meta = Object.new chunk = Fluent::Plugin::Buffer::Chunk.new(meta)