diff --git a/lib/fluent/event.rb b/lib/fluent/event.rb index 97711f4621..d1216fe148 100644 --- a/lib/fluent/event.rb +++ b/lib/fluent/event.rb @@ -294,7 +294,7 @@ def to_msgpack_stream(time_int: false, packer: nil) super end - def to_compressed_msgpack_stream(time_int: false) + def to_compressed_msgpack_stream(time_int: false, packer: nil) # time_int is always ignored because @data is always packed binary in this class @compressed_data end diff --git a/test/plugin/test_output_as_buffered_compress.rb b/test/plugin/test_output_as_buffered_compress.rb index b698aeafee..1a78332d47 100644 --- a/test/plugin/test_output_as_buffered_compress.rb +++ b/test/plugin/test_output_as_buffered_compress.rb @@ -35,6 +35,16 @@ def format(tag, time, record) @format ? @format.call(tag, time, record) : [tag, time, record].to_json end end + + def self.dummy_event_stream + Fluent::ArrayEventStream.new( + [ + [event_time('2016-04-13 18:33:00'), { 'name' => 'moris', 'age' => 36, 'message' => 'data1' }], + [event_time('2016-04-13 18:33:13'), { 'name' => 'moris', 'age' => 36, 'message' => 'data2' }], + [event_time('2016-04-13 18:33:32'), { 'name' => 'moris', 'age' => 36, 'message' => 'data3' }], + ] + ) + end end class BufferedOutputCompressTest < Test::Unit::TestCase @@ -60,16 +70,6 @@ def waiting(seconds) end end - def dummy_event_stream - Fluent::ArrayEventStream.new( - [ - [event_time('2016-04-13 18:33:00'), { 'name' => 'moris', 'age' => 36, 'message' => 'data1' }], - [event_time('2016-04-13 18:33:13'), { 'name' => 'moris', 'age' => 36, 'message' => 'data2' }], - [event_time('2016-04-13 18:33:32'), { 'name' => 'moris', 'age' => 36, 'message' => 'data3' }], - ] - ) - end - TMP_DIR = File.expand_path('../../tmp/test_output_as_buffered_compress', __FILE__) setup do @@ -89,20 +89,34 @@ def dummy_event_stream end data( - handle_simple_stream: config_element('buffer', '', { 'flush_interval' => 1, 'compress' => 'gzip' }), - handle_stream_with_standard_format: config_element('buffer', 'tag', { 'flush_interval' => 1, 'compress' => 'gzip' }), - handle_simple_stream_and_file_chunk: config_element('buffer', '', { '@type' => 'file', 'path' => File.join(TMP_DIR,'test.*.log'), 'flush_interval' => 1, 'compress' => 'gzip' }), - handle_stream_with_standard_format_and_file_chunk: config_element('buffer', 'tag', { '@type' => 'file', 'path' => File.join(TMP_DIR,'test.*.log'), 'flush_interval' => 1, 'compress' => 'gzip' }), + :buffer_config, + [ + config_element('buffer', '', { 'flush_interval' => 1, 'compress' => 'gzip' }), + config_element('buffer', 'tag', { 'flush_interval' => 1, 'compress' => 'gzip' }), + config_element('buffer', '', { '@type' => 'file', 'path' => File.join(TMP_DIR,'test.*.log'), 'flush_interval' => 1, 'compress' => 'gzip' }), + config_element('buffer', 'tag', { '@type' => 'file', 'path' => File.join(TMP_DIR,'test.*.log'), 'flush_interval' => 1, 'compress' => 'gzip' }), + ], ) - test 'call a standard format when output plugin adds data to chunk' do |buffer_config| + data( + :input_es, + [ + FluentPluginOutputAsBufferedCompressTest.dummy_event_stream, + # If already compressed data is incoming, it must be written as is (i.e. without decompressed). + # https://github.com/fluent/fluentd/issues/4146 + Fluent::CompressedMessagePackEventStream.new(FluentPluginOutputAsBufferedCompressTest.dummy_event_stream.to_compressed_msgpack_stream), + ], + ) + test 'call a standard format when output plugin adds data to chunk' do |data| + buffer_config = data[:buffer_config] + es = data[:input_es].dup # Note: the data matrix is shared in all patterns, so we need `dup` here. + @i = create_output(:async) @i.configure(config_element('ROOT','', {}, [buffer_config])) @i.start @i.after_start io = StringIO.new - es = dummy_event_stream - expected = es.map { |e| e } + expected = es.dup.map { |t, r| [t, r] } compressed_data = '' assert_equal :gzip, @i.buffer.compress @@ -138,7 +152,7 @@ def dummy_event_stream @i.after_start io = StringIO.new - es = dummy_event_stream + es = FluentPluginOutputAsBufferedCompressTest.dummy_event_stream expected = es.map { |e| "#{e[1]}\n" }.join # e[1] is record compressed_data = ''