Skip to content

Commit

Permalink
Merge pull request #4159 from daipom/fix-ChunkMessagePackEventStreame…
Browse files Browse the repository at this point in the history
…r-invalid-argument

Fix broken argument `unpacker` of `ChunkMessagePackEventStreamer.each`
  • Loading branch information
ashie authored Jul 11, 2023
2 parents aaa40f7 + 5121054 commit e120693
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 2 deletions.
8 changes: 6 additions & 2 deletions lib/fluent/event.rb
Original file line number Diff line number Diff line change
Expand Up @@ -308,11 +308,15 @@ def ensure_decompressed!
end

module ChunkMessagePackEventStreamer
# chunk.extend(ChunkEventStreamer)
# chunk.extend(ChunkMessagePackEventStreamer)
# => chunk.each{|time, record| ... }
def each(unpacker: nil, &block)
# Note: If need to use `unpacker`, then implement it,
# e.g., `unpacker.feed_each(io.read, &block)` (Not tested)
raise NotImplementedError, "'unpacker' argument is not implemented." if unpacker

open do |io|
(unpacker || Fluent::MessagePackFactory.msgpack_unpacker(io)).each(&block)
Fluent::MessagePackFactory.msgpack_unpacker(io).each(&block)
end
nil
end
Expand Down
11 changes: 11 additions & 0 deletions test/plugin/test_buffer_chunk.rb
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,17 @@ class BufferChunkTest < Test::Unit::TestCase
assert chunk.respond_to?(:msgpack_each)
end

test 'unpacker arg is not implemented for ChunkMessagePackEventStreamer' do
meta = Object.new
chunk = Fluent::Plugin::Buffer::Chunk.new(meta)
chunk.extend Fluent::ChunkMessagePackEventStreamer

unpacker = Fluent::MessagePackFactory.thread_local_msgpack_unpacker

assert_raise(NotImplementedError){ chunk.each(unpacker: unpacker) }
assert_raise(NotImplementedError){ chunk.msgpack_each(unpacker: unpacker) }
end

test 'some methods raise ArgumentError with an option of `compressed: :gzip` and without extending Compressble`' do
meta = Object.new
chunk = Fluent::Plugin::Buffer::Chunk.new(meta)
Expand Down

0 comments on commit e120693

Please sign in to comment.