From 5121054857c3d0f0fae6b778864db42f5ea13285 Mon Sep 17 00:00:00 2001 From: Daijiro Fukuda Date: Tue, 11 Jul 2023 08:09:16 +0900 Subject: [PATCH] Fix ChunkMessagePackEventStreamer's invalid argument The argument `unpacker` of `ChunkMessagePackEventStreamer.each` seems to have been added in order to match the feature with `EventStream` at c6c6c038c5cacab6fcb9aa89a714d9366ac4902e. However, that previous implementation at that point does not work as expected. It has never causes any issues just because the argument was not used at all. It could be implemented so that this argument is used, but given that it has not been used so far, there is little need for it. Signed-off-by: Daijiro Fukuda --- lib/fluent/event.rb | 8 ++++++-- test/plugin/test_buffer_chunk.rb | 11 +++++++++++ 2 files changed, 17 insertions(+), 2 deletions(-) diff --git a/lib/fluent/event.rb b/lib/fluent/event.rb index db644ccf4d..bcb06998ed 100644 --- a/lib/fluent/event.rb +++ b/lib/fluent/event.rb @@ -308,11 +308,15 @@ def ensure_decompressed! end module ChunkMessagePackEventStreamer - # chunk.extend(ChunkEventStreamer) + # chunk.extend(ChunkMessagePackEventStreamer) # => chunk.each{|time, record| ... } def each(unpacker: nil, &block) + # Note: If need to use `unpacker`, then implement it, + # e.g., `unpacker.feed_each(io.read, &block)` (Not tested) + raise NotImplementedError, "'unpacker' argument is not implemented." if unpacker + open do |io| - (unpacker || Fluent::MessagePackFactory.msgpack_unpacker(io)).each(&block) + Fluent::MessagePackFactory.msgpack_unpacker(io).each(&block) end nil end diff --git a/test/plugin/test_buffer_chunk.rb b/test/plugin/test_buffer_chunk.rb index 5a1bfeb9da..535b0acd1a 100644 --- a/test/plugin/test_buffer_chunk.rb +++ b/test/plugin/test_buffer_chunk.rb @@ -57,6 +57,17 @@ class BufferChunkTest < Test::Unit::TestCase assert chunk.respond_to?(:msgpack_each) end + test 'unpacker arg is not implemented for ChunkMessagePackEventStreamer' do + meta = Object.new + chunk = Fluent::Plugin::Buffer::Chunk.new(meta) + chunk.extend Fluent::ChunkMessagePackEventStreamer + + unpacker = Fluent::MessagePackFactory.thread_local_msgpack_unpacker + + assert_raise(NotImplementedError){ chunk.each(unpacker: unpacker) } + assert_raise(NotImplementedError){ chunk.msgpack_each(unpacker: unpacker) } + end + test 'some methods raise ArgumentError with an option of `compressed: :gzip` and without extending Compressble`' do meta = Object.new chunk = Fluent::Plugin::Buffer::Chunk.new(meta)