From c2909621e06e21773aad785cf51b3a3e5060075f Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Thu, 6 Oct 2016 18:44:08 +0900 Subject: [PATCH] chunk.each (chunk.msgpack_each) should be available only for chunks contains msgpack binary data. It's original design, but not implemented when v0.14 output plugins were introduced. Currently there are not so many output plugins using v0.14 buffered output API, so it's time to change. chunk.each (chunk.msgpack_each) will be available only for: * output plugins using standard format (doesn't implement #format method) * output plugins marked to have #format method returning msgpack binary data --- lib/fluent/plugin/buffer/chunk.rb | 1 - test/plugin/test_buffer_chunk.rb | 18 +++++++++++++++--- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/lib/fluent/plugin/buffer/chunk.rb b/lib/fluent/plugin/buffer/chunk.rb index 2f6b3bcb09..dc6b9f40bd 100644 --- a/lib/fluent/plugin/buffer/chunk.rb +++ b/lib/fluent/plugin/buffer/chunk.rb @@ -29,7 +29,6 @@ class Buffer # fluent/plugin/buffer is already loaded class Chunk include MonitorMixin include UniqueId::Mixin - include ChunkMessagePackEventStreamer # Chunks has 2 part: # * metadata: contains metadata which should be restored after resume (if possible) diff --git a/test/plugin/test_buffer_chunk.rb b/test/plugin/test_buffer_chunk.rb index ceb39d891d..5a1bfeb9da 100644 --- a/test/plugin/test_buffer_chunk.rb +++ b/test/plugin/test_buffer_chunk.rb @@ -31,7 +31,6 @@ class BufferChunkTest < Test::Unit::TestCase assert chunk.respond_to?(:read) assert chunk.respond_to?(:open) assert chunk.respond_to?(:write_to) - assert chunk.respond_to?(:msgpack_each) assert_raise(NotImplementedError){ chunk.append([]) } assert_raise(NotImplementedError){ chunk.concat(nil, 0) } assert_raise(NotImplementedError){ chunk.commit } @@ -43,7 +42,19 @@ class BufferChunkTest < Test::Unit::TestCase assert_raise(NotImplementedError){ chunk.read } assert_raise(NotImplementedError){ chunk.open(){} } assert_raise(NotImplementedError){ chunk.write_to(nil) } - assert_raise(NotImplementedError){ chunk.msgpack_each(){|v| v} } + assert !chunk.respond_to?(:msgpack_each) + end + + test 'has method #each and #msgpack_each only when extended by ChunkMessagePackEventStreamer' do + meta = Object.new + chunk = Fluent::Plugin::Buffer::Chunk.new(meta) + + assert !chunk.respond_to?(:each) + assert !chunk.respond_to?(:msgpack_each) + + chunk.extend Fluent::ChunkMessagePackEventStreamer + assert chunk.respond_to?(:each) + assert chunk.respond_to?(:msgpack_each) end test 'some methods raise ArgumentError with an option of `compressed: :gzip` and without extending Compressble`' do @@ -162,9 +173,10 @@ def open(**kwargs) assert "my data\nyour data\n", io.to_s end - test 'can feed objects into blocks with unpacking msgpack' do + test 'can feed objects into blocks with unpacking msgpack if ChunkMessagePackEventStreamer is included' do require 'msgpack' c = TestChunk.new(Object.new) + c.extend Fluent::ChunkMessagePackEventStreamer c.data << MessagePack.pack(['my data', 1]) c.data << MessagePack.pack(['your data', 2]) ary = []