diff --git a/example/in_dummy_with_compression.conf b/example/in_dummy_with_compression.conf
new file mode 100644
index 0000000000..55dda796e2
--- /dev/null
+++ b/example/in_dummy_with_compression.conf
@@ -0,0 +1,23 @@
+
+
+
diff --git a/lib/fluent/event.rb b/lib/fluent/event.rb
index 91aecfa505..8ea514c9d8 100644
--- a/lib/fluent/event.rb
+++ b/lib/fluent/event.rb
@@ -15,11 +15,13 @@
#
require 'fluent/msgpack_factory'
+require 'fluent/plugin/compressable'
module Fluent
class EventStream
include Enumerable
include MessagePackFactory::Mixin
+ include Fluent::Plugin::Compressable
# dup does deep copy for event stream
def dup
@@ -61,6 +63,11 @@ def to_msgpack_stream(time_int: false)
out.to_s
end
+ def to_compressed_msgpack_stream(time_int: false)
+ packed = to_msgpack_stream(time_int: time_int)
+ compress(packed)
+ end
+
def to_msgpack_stream_forced_integer
out = msgpack_packer
each {|time,record|
diff --git a/lib/fluent/plugin/buf_file.rb b/lib/fluent/plugin/buf_file.rb
index 945b9682f0..eb547d8e29 100644
--- a/lib/fluent/plugin/buf_file.rb
+++ b/lib/fluent/plugin/buf_file.rb
@@ -144,9 +144,9 @@ def resume
def generate_chunk(metadata)
# FileChunk generates real path with unique_id
if @file_permission
- Fluent::Plugin::Buffer::FileChunk.new(metadata, @path, :create, perm: @file_permission)
+ Fluent::Plugin::Buffer::FileChunk.new(metadata, @path, :create, perm: @file_permission, compress: @compress)
else
- Fluent::Plugin::Buffer::FileChunk.new(metadata, @path, :create)
+ Fluent::Plugin::Buffer::FileChunk.new(metadata, @path, :create, compress: @compress)
end
end
end
diff --git a/lib/fluent/plugin/buf_memory.rb b/lib/fluent/plugin/buf_memory.rb
index 59f11c535c..c6c9554306 100644
--- a/lib/fluent/plugin/buf_memory.rb
+++ b/lib/fluent/plugin/buf_memory.rb
@@ -27,7 +27,7 @@ def resume
end
def generate_chunk(metadata)
- Fluent::Plugin::Buffer::MemoryChunk.new(metadata)
+ Fluent::Plugin::Buffer::MemoryChunk.new(metadata, compress: @compress)
end
end
end
diff --git a/lib/fluent/plugin/buffer.rb b/lib/fluent/plugin/buffer.rb
index b41ec29ae0..c427bd4c5a 100644
--- a/lib/fluent/plugin/buffer.rb
+++ b/lib/fluent/plugin/buffer.rb
@@ -55,6 +55,9 @@ class BufferChunkOverflowError < BufferError; end # A record size is larger than
# if chunk size (or records) is 95% or more after #write, then that chunk will be enqueued
config_param :chunk_full_threshold, :float, default: DEFAULT_CHUNK_FULL_THRESHOLD
+ desc 'Compress buffered data.'
+ config_param :compress, :enum, list: [:text, :gzip], default: :text
+
Metadata = Struct.new(:timekey, :tag, :variables) do
def empty?
timekey.nil? && tag.nil? && variables.nil?
@@ -458,7 +461,7 @@ def write_once(metadata, data, format: nil, size: nil, &block)
serialized = format.call(data)
chunk.concat(serialized, size ? size.call : data.size)
else
- chunk.append(data)
+ chunk.append(data, compress: @compress)
end
adding_bytesize = chunk.bytesize - original_bytesize
@@ -558,7 +561,7 @@ def write_step_by_step(metadata, data, format, splits_count, &block)
if format
chunk.concat(format.call(split), split.size)
else
- chunk.append(split)
+ chunk.append(split, compress: @compress)
end
if chunk_size_over?(chunk) # split size is larger than difference between size_full? and size_over?
diff --git a/lib/fluent/plugin/buffer/chunk.rb b/lib/fluent/plugin/buffer/chunk.rb
index 157c3e809c..672bf3fe98 100644
--- a/lib/fluent/plugin/buffer/chunk.rb
+++ b/lib/fluent/plugin/buffer/chunk.rb
@@ -15,10 +15,13 @@
#
require 'fluent/plugin/buffer'
+require 'fluent/plugin/compressable'
require 'fluent/unique_id'
require 'fluent/event'
require 'monitor'
+require 'tempfile'
+require 'zlib'
module Fluent
module Plugin
@@ -46,7 +49,7 @@ class Chunk
# TODO: CompressedPackedMessage of forward protocol?
- def initialize(metadata)
+ def initialize(metadata, compress: :text)
super()
@unique_id = generate_unique_id
@metadata = metadata
@@ -57,12 +60,15 @@ def initialize(metadata)
@size = 0
@created_at = Time.now
@modified_at = Time.now
+
+ extend Decompressable if compress == :gzip
end
attr_reader :unique_id, :metadata, :created_at, :modified_at, :state
# data is array of formatted record string
- def append(data)
+ def append(data, **kwargs)
+ raise ArgumentError, '`compress: gzip` can be used for Compressable module' if kwargs[:compress] == :gzip
adding = ''.b
data.each do |d|
adding << d.b
@@ -141,19 +147,75 @@ def purge
self
end
- def read
+ def read(**kwargs)
+ raise ArgumentError, '`compressed: gzip` can be used for Compressable module' if kwargs[:compressed] == :gzip
raise NotImplementedError, "Implement this method in child class"
end
- def open(&block)
+ def open(**kwargs, &block)
+ raise ArgumentError, '`compressed: gzip` can be used for Compressable module' if kwargs[:compressed] == :gzip
raise NotImplementedError, "Implement this method in child class"
end
- def write_to(io)
+ def write_to(io, **kwargs)
+ raise ArgumentError, '`compressed: gzip` can be used for Compressable module' if kwargs[:compressed] == :gzip
open do |i|
IO.copy_stream(i, io)
end
end
+
+ module Decompressable
+ include Fluent::Plugin::Compressable
+
+ def append(data, **kwargs)
+ if kwargs[:compress] == :gzip
+ io = StringIO.new
+ Zlib::GzipWriter.wrap(io) do |gz|
+ data.each do |d|
+ gz.write d
+ end
+ end
+ concat(io.string, data.size)
+ else
+ super
+ end
+ end
+
+ def open(**kwargs, &block)
+ if kwargs[:compressed] == :gzip
+ super
+ else
+ super(kwargs) do |chunk_io|
+ output_io = if chunk_io.is_a?(StringIO)
+ StringIO.new
+ else
+ Tempfile.new('decompressed-data')
+ end
+ decompress(input_io: chunk_io, output_io: output_io)
+ output_io.seek(0, IO::SEEK_SET)
+ yield output_io
+ end
+ end
+ end
+
+ def read(**kwargs)
+ if kwargs[:compressed] == :gzip
+ super
+ else
+ decompress(super)
+ end
+ end
+
+ def write_to(io, **kwargs)
+ open(compressed: :gzip) do |chunk_io|
+ if kwargs[:compressed] == :gzip
+ IO.copy_stream(chunk_io, io)
+ else
+ decompress(input_io: chunk_io, output_io: io)
+ end
+ end
+ end
+ end
end
end
end
diff --git a/lib/fluent/plugin/buffer/file_chunk.rb b/lib/fluent/plugin/buffer/file_chunk.rb
index 6fda86da16..3232bf2d46 100644
--- a/lib/fluent/plugin/buffer/file_chunk.rb
+++ b/lib/fluent/plugin/buffer/file_chunk.rb
@@ -40,8 +40,8 @@ class FileChunk < Chunk
attr_reader :path, :permission
- def initialize(metadata, path, mode, perm: system_config.file_permission || FILE_PERMISSION)
- super(metadata)
+ def initialize(metadata, path, mode, perm: system_config.file_permission || FILE_PERMISSION, compress: :text)
+ super(metadata, compress: compress)
@permission = perm
@bytesize = @size = @adding_bytes = @adding_size = 0
@meta = nil
@@ -133,12 +133,12 @@ def purge
File.unlink(@path, @meta_path)
end
- def read
+ def read(**kwargs)
@chunk.seek(0, IO::SEEK_SET)
@chunk.read
end
- def open(&block)
+ def open(**kwargs, &block)
@chunk.seek(0, IO::SEEK_SET)
val = yield @chunk
@chunk.seek(0, IO::SEEK_END) if self.staged?
diff --git a/lib/fluent/plugin/buffer/memory_chunk.rb b/lib/fluent/plugin/buffer/memory_chunk.rb
index 6ad5cc8073..ddb16e4a65 100644
--- a/lib/fluent/plugin/buffer/memory_chunk.rb
+++ b/lib/fluent/plugin/buffer/memory_chunk.rb
@@ -20,7 +20,7 @@ module Fluent
module Plugin
class Buffer
class MemoryChunk < Chunk
- def initialize(metadata)
+ def initialize(metadata, compress: :text)
super
@chunk = ''.force_encoding(Encoding::ASCII_8BIT)
@chunk_bytes = 0
@@ -72,15 +72,15 @@ def purge
true
end
- def read
+ def read(**kwargs)
@chunk
end
- def open(&block)
+ def open(**kwargs, &block)
StringIO.open(@chunk, &block)
end
- def write_to(io)
+ def write_to(io, **kwargs)
# re-implementation to optimize not to create StringIO
io.write @chunk
end
diff --git a/lib/fluent/plugin/compressable.rb b/lib/fluent/plugin/compressable.rb
new file mode 100644
index 0000000000..e508be1e3c
--- /dev/null
+++ b/lib/fluent/plugin/compressable.rb
@@ -0,0 +1,91 @@
+#
+# Fluentd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+require 'zlib'
+
+module Fluent
+ module Plugin
+ module Compressable
+ def compress(data, **kwargs)
+ output_io = kwargs[:output_io]
+ io = output_io || StringIO.new
+ Zlib::GzipWriter.wrap(io) do |gz|
+ gz.write data
+ end
+
+ output_io || io.string
+ end
+
+ # compressed_data is String like `compress(data1) + compress(data2) + ... + compress(dataN)`
+ # https://www.ruby-forum.com/topic/971591#979503
+ def decompress(compressed_data = nil, output_io: nil, input_io: nil)
+ case
+ when input_io && output_io
+ io_decompress(input_io, output_io)
+ when input_io
+ output_io = StringIO.new
+ io = io_decompress(input_io, output_io)
+ io.string
+ when compressed_data.nil? || compressed_data.empty?
+ # check compressed_data(String) is 0 length
+ compressed_data
+ when output_io
+ # exeucte after checking compressed_data is empty or not
+ io = StringIO.new(compressed_data)
+ io_decompress(io, output_io)
+ else
+ string_decompress(compressed_data)
+ end
+ end
+
+ private
+
+ def string_decompress(compressed_data)
+ io = StringIO.new(compressed_data)
+
+ out = ''
+ loop do
+ gz = Zlib::GzipReader.new(io)
+ out += gz.read
+ unused = gz.unused
+ gz.finish
+
+ break if unused.nil?
+ adjust = unused.length
+ io.pos -= adjust
+ end
+
+ out
+ end
+
+ def io_decompress(input, output)
+ loop do
+ gz = Zlib::GzipReader.new(input)
+ v = gz.read
+ output.write(v)
+ unused = gz.unused
+ gz.finish
+
+ break if unused.nil?
+ adjust = unused.length
+ input.pos -= adjust
+ end
+
+ output
+ end
+ end
+ end
+end
diff --git a/lib/fluent/plugin/out_forward.rb b/lib/fluent/plugin/out_forward.rb
index 9417bc0848..4466abbb4c 100644
--- a/lib/fluent/plugin/out_forward.rb
+++ b/lib/fluent/plugin/out_forward.rb
@@ -440,8 +440,10 @@ def send_data(tag, chunk)
sock.write @sender.forward_header # beginArray(3)
sock.write tag.to_msgpack # 1. writeRaw(tag)
- sock.write [0xdb, chunk.size].pack('CN') # 2. beginRaw(size) raw32
- chunk.write_to(sock) # writeRawBody(packed_es)
+ chunk.open(compressed: :text) do |chunk_io|
+ sock.write [0xdb, chunk_io.size].pack('CN') # 2. beginRaw(size) raw32
+ IO.copy_stream(chunk_io, sock) # writeRawBody(packed_es)
+ end
sock.write option.to_msgpack # 3. writeOption(option)
if @sender.require_ack_response
diff --git a/lib/fluent/plugin/output.rb b/lib/fluent/plugin/output.rb
index 7829460ffa..8070183b71 100644
--- a/lib/fluent/plugin/output.rb
+++ b/lib/fluent/plugin/output.rb
@@ -641,7 +641,17 @@ def write_guard(&block)
end
FORMAT_MSGPACK_STREAM = ->(e){ e.to_msgpack_stream }
+ FORMAT_COMPRESSED_MSGPACK_STREAM = ->(e){ e.to_compressed_msgpack_stream }
FORMAT_MSGPACK_STREAM_TIME_INT = ->(e){ e.to_msgpack_stream(time_int: true) }
+ FORMAT_COMPRESSED_MSGPACK_STREAM_TIME_INT = ->(e){ e.to_compressed_msgpack_stream(time_int: true) }
+
+ def generate_format_proc
+ if @buffer && @buffer.compress == :gzip
+ @time_as_integer ? FORMAT_COMPRESSED_MSGPACK_STREAM_TIME_INT : FORMAT_COMPRESSED_MSGPACK_STREAM
+ else
+ @time_as_integer ? FORMAT_MSGPACK_STREAM_TIME_INT : FORMAT_MSGPACK_STREAM
+ end
+ end
# metadata_and_data is a Hash of:
# (standard format) metadata => event stream
@@ -669,7 +679,7 @@ def handle_stream_with_custom_format(tag, es, enqueue: false)
end
def handle_stream_with_standard_format(tag, es, enqueue: false)
- format_proc = @time_as_integer ? FORMAT_MSGPACK_STREAM_TIME_INT : FORMAT_MSGPACK_STREAM
+ format_proc = generate_format_proc
meta_and_data = {}
records = 0
es.each do |time, record|
@@ -697,7 +707,7 @@ def handle_stream_simple(tag, es, enqueue: false)
records += 1
end
else
- format_proc = @time_as_integer ? FORMAT_MSGPACK_STREAM_TIME_INT : FORMAT_MSGPACK_STREAM
+ format_proc = generate_format_proc
data = es
end
write_guard do
diff --git a/test/plugin/test_buffer.rb b/test/plugin/test_buffer.rb
index 6cf5b4264d..91b0642df9 100644
--- a/test/plugin/test_buffer.rb
+++ b/test/plugin/test_buffer.rb
@@ -1,6 +1,8 @@
require_relative '../helper'
require 'fluent/plugin/buffer'
require 'fluent/plugin/buffer/memory_chunk'
+require 'fluent/plugin/compressable'
+require 'fluent/plugin/buffer/chunk'
require 'fluent/event'
require 'flexmock/test_unit'
@@ -18,7 +20,7 @@ class DummyMemoryChunkError < StandardError; end
class DummyMemoryChunk < Fluent::Plugin::Buffer::MemoryChunk
attr_reader :append_count, :rollbacked, :closed, :purged
attr_accessor :failing
- def initialize(metadata)
+ def initialize(metadata, compress: :text)
super
@append_count = 0
@rollbacked = false
@@ -77,7 +79,7 @@ def resume
return staged, queued
end
def generate_chunk(metadata)
- DummyMemoryChunk.new(metadata)
+ DummyMemoryChunk.new(metadata, compress: @compress)
end
end
end
@@ -865,6 +867,10 @@ def create_chunk_es(metadata, es)
assert{ @p.stage[@dm1].size == 2 }
assert @p.stage[@dm1].rollbacked
end
+
+ test '#compress returns :text' do
+ assert_equal :text, @p.compress
+ end
end
sub_test_case 'standard format with configuration for test with lower chunk limit size' do
@@ -1197,4 +1203,18 @@ def create_chunk_es(metadata, es)
end
end
+ sub_test_case 'when compress is gzip' do
+ setup do
+ @p = create_buffer({'compress' => 'gzip'})
+ end
+
+ test '#compress returns :gzip' do
+ assert_equal :gzip, @p.compress
+ end
+
+ test 'create decompressable chunk' do
+ chunk = @p.generate_chunk(create_metadata)
+ assert chunk.singleton_class.ancestors.include?(Fluent::Plugin::Buffer::Chunk::Decompressable)
+ end
+ end
end
diff --git a/test/plugin/test_buffer_chunk.rb b/test/plugin/test_buffer_chunk.rb
index 78e386cbc1..ceb39d891d 100644
--- a/test/plugin/test_buffer_chunk.rb
+++ b/test/plugin/test_buffer_chunk.rb
@@ -45,6 +45,16 @@ class BufferChunkTest < Test::Unit::TestCase
assert_raise(NotImplementedError){ chunk.write_to(nil) }
assert_raise(NotImplementedError){ chunk.msgpack_each(){|v| v} }
end
+
+ test 'some methods raise ArgumentError with an option of `compressed: :gzip` and without extending Compressble`' do
+ meta = Object.new
+ chunk = Fluent::Plugin::Buffer::Chunk.new(meta)
+
+ assert_raise(ArgumentError){ chunk.read(compressed: :gzip) }
+ assert_raise(ArgumentError){ chunk.open(compressed: :gzip){} }
+ assert_raise(ArgumentError){ chunk.write_to(nil, compressed: :gzip) }
+ assert_raise(ArgumentError){ chunk.append(nil, compress: :gzip) }
+ end
end
class TestChunk < Fluent::Plugin::Buffer::Chunk
@@ -56,7 +66,7 @@ def initialize(meta)
def size
@data.size
end
- def open
+ def open(**kwargs)
require 'stringio'
io = StringIO.new(@data)
yield io
@@ -165,4 +175,12 @@ def open
assert_equal ['your data', 2], ary[1]
end
end
+
+ sub_test_case 'when compress is gzip' do
+ test 'create decompressable chunk' do
+ meta = Object.new
+ chunk = Fluent::Plugin::Buffer::Chunk.new(meta, compress: :gzip)
+ assert chunk.singleton_class.ancestors.include?(Fluent::Plugin::Buffer::Chunk::Decompressable)
+ end
+ end
end
diff --git a/test/plugin/test_buffer_file_chunk.rb b/test/plugin/test_buffer_file_chunk.rb
index 13e72f8b68..c1d059c72a 100644
--- a/test/plugin/test_buffer_file_chunk.rb
+++ b/test/plugin/test_buffer_file_chunk.rb
@@ -1,5 +1,6 @@
require_relative '../helper'
require 'fluent/plugin/buffer/file_chunk'
+require 'fluent/plugin/compressable'
require 'fluent/unique_id'
require 'fileutils'
@@ -8,6 +9,8 @@
require 'timecop'
class BufferFileChunkTest < Test::Unit::TestCase
+ include Fluent::Plugin::Compressable
+
setup do
@klass = Fluent::Plugin::Buffer::FileChunk
@chunkdir = File.expand_path('../../tmp/buffer_file_chunk', __FILE__)
@@ -768,4 +771,74 @@ def gen_chunk_path(prefix, unique_id)
end
end
end
+
+ sub_test_case 'compressed buffer' do
+ setup do
+ @src = 'text data for compressing' * 5
+ @gzipped_src = compress(@src)
+ end
+
+ test '#append with compress option writes compressed data to chunk when compress is gzip' do
+ c = @klass.new(gen_metadata, File.join(@chunkdir,'test.*.log'), :create, compress: :gzip)
+ c.append([@src, @src], compress: :gzip)
+ c.commit
+
+ # check chunk is compressed
+ assert c.read(compressed: :gzip).size < [@src, @src].join("").size
+
+ assert_equal @src + @src, c.read
+ end
+
+ test '#open passes io object having decompressed data to a block when compress is gzip' do
+ c = @klass.new(gen_metadata, File.join(@chunkdir,'test.*.log'), :create, compress: :gzip)
+ c.concat(@gzipped_src, @src.size)
+ c.commit
+
+ decomressed_data = c.open do |io|
+ v = io.read
+ assert_equal @src, v
+ v
+ end
+ assert_equal @src, decomressed_data
+ end
+
+ test '#open with compressed option passes io object having decompressed data to a block when compress is gzip' do
+ c = @klass.new(gen_metadata, File.join(@chunkdir,'test.*.log'), :create, compress: :gzip)
+ c.concat(@gzipped_src, @src.size)
+ c.commit
+
+ comressed_data = c.open(compressed: :gzip) do |io|
+ v = io.read
+ assert_equal @gzipped_src, v
+ v
+ end
+ assert_equal @gzipped_src, comressed_data
+ end
+
+ test '#write_to writes decompressed data when compress is gzip' do
+ c = @klass.new(gen_metadata, File.join(@chunkdir,'test.*.log'), :create, compress: :gzip)
+ c.concat(@gzipped_src, @src.size)
+ c.commit
+
+ assert_equal @src, c.read
+ assert_equal @gzipped_src, c.read(compressed: :gzip)
+
+ io = StringIO.new
+ c.write_to(io)
+ assert_equal @src, io.string
+ end
+
+ test '#write_to with compressed option writes compressed data when compress is gzip' do
+ c = @klass.new(gen_metadata, File.join(@chunkdir,'test.*.log'), :create, compress: :gzip)
+ c.concat(@gzipped_src, @src.size)
+ c.commit
+
+ assert_equal @src, c.read
+ assert_equal @gzipped_src, c.read(compressed: :gzip)
+
+ io = StringIO.new
+ c.write_to(io, compressed: :gzip)
+ assert_equal @gzipped_src, io.string
+ end
+ end
end
diff --git a/test/plugin/test_buffer_memory_chunk.rb b/test/plugin/test_buffer_memory_chunk.rb
index 1c5d85d65c..b3646b5936 100644
--- a/test/plugin/test_buffer_memory_chunk.rb
+++ b/test/plugin/test_buffer_memory_chunk.rb
@@ -1,9 +1,12 @@
require_relative '../helper'
require 'fluent/plugin/buffer/memory_chunk'
+require 'fluent/plugin/compressable'
require 'json'
class BufferMemoryChunkTest < Test::Unit::TestCase
+ include Fluent::Plugin::Compressable
+
setup do
@c = Fluent::Plugin::Buffer::MemoryChunk.new(Object.new)
end
@@ -262,4 +265,74 @@ class BufferMemoryChunkTest < Test::Unit::TestCase
assert_equal d3.to_json + "\n", lines[2]
assert_equal d4.to_json + "\n", lines[3]
end
+
+ sub_test_case 'compressed buffer' do
+ setup do
+ @src = 'text data for compressing' * 5
+ @gzipped_src = compress(@src)
+ end
+
+ test '#append with compress option writes compressed data to chunk when compress is gzip' do
+ c = Fluent::Plugin::Buffer::MemoryChunk.new(Object.new, compress: :gzip)
+ c.append([@src, @src], compress: :gzip)
+ c.commit
+
+ # check chunk is compressed
+ assert c.read(compressed: :gzip).size < [@src, @src].join("").size
+
+ assert_equal @src + @src, c.read
+ end
+
+ test '#open passes io object having decompressed data to a block when compress is gzip' do
+ c = Fluent::Plugin::Buffer::MemoryChunk.new(Object.new, compress: :gzip)
+ c.concat(@gzipped_src, @src.size)
+ c.commit
+
+ decomressed_data = c.open do |io|
+ v = io.read
+ assert_equal @src, v
+ v
+ end
+ assert_equal @src, decomressed_data
+ end
+
+ test '#open with compressed option passes io object having decompressed data to a block when compress is gzip' do
+ c = Fluent::Plugin::Buffer::MemoryChunk.new(Object.new, compress: :gzip)
+ c.concat(@gzipped_src, @src.size)
+ c.commit
+
+ comressed_data = c.open(compressed: :gzip) do |io|
+ v = io.read
+ assert_equal @gzipped_src, v
+ v
+ end
+ assert_equal @gzipped_src, comressed_data
+ end
+
+ test '#write_to writes decompressed data when compress is gzip' do
+ c = Fluent::Plugin::Buffer::MemoryChunk.new(Object.new, compress: :gzip)
+ c.concat(@gzipped_src, @src.size)
+ c.commit
+
+ assert_equal @src, c.read
+ assert_equal @gzipped_src, c.read(compressed: :gzip)
+
+ io = StringIO.new
+ c.write_to(io)
+ assert_equal @src, io.string
+ end
+
+ test '#write_to with compressed option writes compressed data when compress is gzip' do
+ c = Fluent::Plugin::Buffer::MemoryChunk.new(Object.new, compress: :gzip)
+ c.concat(@gzipped_src, @src.size)
+ c.commit
+
+ assert_equal @src, c.read
+ assert_equal @gzipped_src, c.read(compressed: :gzip)
+
+ io = StringIO.new
+ c.write_to(io, compressed: :gzip)
+ assert_equal @gzipped_src, io.string
+ end
+ end
end
diff --git a/test/plugin/test_compressable.rb b/test/plugin/test_compressable.rb
new file mode 100644
index 0000000000..763d200ebd
--- /dev/null
+++ b/test/plugin/test_compressable.rb
@@ -0,0 +1,81 @@
+require_relative '../helper'
+require 'fluent/plugin/compressable'
+
+class CompressableTest < Test::Unit::TestCase
+ include Fluent::Plugin::Compressable
+
+ sub_test_case '#compress' do
+ setup do
+ @src = 'text data for compressing' * 5
+ @gzipped_src = compress(@src)
+ end
+
+ test 'compress data' do
+ assert compress(@src).size < @src.size
+ assert_not_equal @gzipped_src, @src
+ end
+
+ test 'write compressed data to IO with output_io option' do
+ io = StringIO.new
+ compress(@src, output_io: io)
+ assert_equal @gzipped_src, io.string
+ end
+ end
+
+ sub_test_case '#decompress' do
+ setup do
+ @src = 'text data for compressing' * 5
+ @gzipped_src = compress(@src)
+ end
+
+ test 'decompress compressed data' do
+ assert_equal @src, decompress(@gzipped_src)
+ end
+
+ test 'write decompressed data to IO with output_io option' do
+ io = StringIO.new
+ decompress(@gzipped_src, output_io: io)
+ assert_equal @src, io.string
+ end
+
+ test 'return decompressed string with output_io option' do
+ io = StringIO.new(@gzipped_src)
+ assert_equal @src, decompress(input_io: io)
+ end
+
+ test 'decompress multiple compressed data' do
+ src1 = 'text data'
+ src2 = 'text data2'
+ gzipped_src = compress(src1) + compress(src2)
+
+ assert_equal src1 + src2, decompress(gzipped_src)
+ end
+
+ test 'decompress with input_io and output_io' do
+ input_io = StringIO.new(@gzipped_src)
+ output_io = StringIO.new
+
+ decompress(input_io: input_io, output_io: output_io)
+ assert_equal @src, output_io.string
+ end
+
+ test 'decompress multiple compressed data with input_io and output_io' do
+ src1 = 'text data'
+ src2 = 'text data2'
+ gzipped_src = compress(src1) + compress(src2)
+
+ input_io = StringIO.new(gzipped_src)
+ output_io = StringIO.new
+
+ decompress(input_io: input_io, output_io: output_io)
+ assert_equal src1 + src2, output_io.string
+ end
+
+ test 'return the received value as it is with empty string or nil' do
+ assert_equal nil, decompress
+ assert_equal nil, decompress(nil)
+ assert_equal '', decompress('')
+ assert_equal '', decompress('', output_io: StringIO.new)
+ end
+ end
+end
diff --git a/test/plugin/test_output.rb b/test/plugin/test_output.rb
index 5acc08e23a..039e92ae0c 100644
--- a/test/plugin/test_output.rb
+++ b/test/plugin/test_output.rb
@@ -565,4 +565,36 @@ def waiting(seconds)
@i.stop; @i.before_shutdown; @i.shutdown; @i.after_shutdown; @i.close; @i.terminate
end
end
+
+ sub_test_case '#generate_format_proc' do
+ test "when output doesn't have " do
+ i = create_output(:sync)
+ i.configure(config_element('ROOT', '', {}, []))
+ assert_equal Fluent::Plugin::Output::FORMAT_MSGPACK_STREAM, i.generate_format_proc
+ end
+
+ test "when output doesn't have and time_as_integer is true" do
+ i = create_output(:sync)
+ i.configure(config_element('ROOT', '', {'time_as_integer' => true}))
+ assert_equal Fluent::Plugin::Output::FORMAT_MSGPACK_STREAM_TIME_INT, i.generate_format_proc
+ end
+
+ test 'when output has and compress is gzip' do
+ i = create_output(:buffered)
+ i.configure(config_element('ROOT', '', {}, [config_element('buffer', '', {'compress' => 'gzip'})]))
+ assert_equal Fluent::Plugin::Output::FORMAT_COMPRESSED_MSGPACK_STREAM, i.generate_format_proc
+ end
+
+ test 'when output has and compress is gzip and time_as_integer is true' do
+ i = create_output(:buffered)
+ i.configure(config_element('ROOT', '', {'time_as_integer' => true}, [config_element('buffer', '', {'compress' => 'gzip'})]))
+ assert_equal Fluent::Plugin::Output::FORMAT_COMPRESSED_MSGPACK_STREAM_TIME_INT, i.generate_format_proc
+ end
+
+ test 'when output has and compress is text' do
+ i = create_output(:buffered)
+ i.configure(config_element('ROOT', '', {}, [config_element('buffer', '', {'compress' => 'text'})]))
+ assert_equal Fluent::Plugin::Output::FORMAT_MSGPACK_STREAM, i.generate_format_proc
+ end
+ end
end
diff --git a/test/plugin/test_output_as_buffered_compress.rb b/test/plugin/test_output_as_buffered_compress.rb
new file mode 100644
index 0000000000..b7b2aee3e6
--- /dev/null
+++ b/test/plugin/test_output_as_buffered_compress.rb
@@ -0,0 +1,165 @@
+require_relative '../helper'
+require 'fluent/plugin/output'
+require 'fluent/plugin/buffer'
+require 'fluent/plugin/compressable'
+require 'fluent/event'
+
+require 'timeout'
+
+module FluentPluginOutputAsBufferedCompressTest
+ class DummyBareOutput < Fluent::Plugin::Output
+ def register(name, &block)
+ instance_variable_set("@#{name}", block)
+ end
+ end
+
+ class DummyAsyncOutput < DummyBareOutput
+ def initialize
+ super
+ @format = @write = nil
+ end
+ def write(chunk)
+ @write ? @write.call(chunk) : nil
+ end
+ end
+
+ class DummyAsyncOutputWithFormat < DummyBareOutput
+ def initialize
+ super
+ @format = nil
+ end
+ def write(chunk)
+ @write ? @write.call(chunk) : nil
+ end
+ def format(tag, time, record)
+ @format ? @format.call(tag, time, record) : [tag, time, record].to_json
+ end
+ end
+end
+
+class BufferedOutputCompressTest < Test::Unit::TestCase
+ include Fluent::Plugin::Compressable
+
+ def create_output(type=:async)
+ case type
+ when :async then FluentPluginOutputAsBufferedCompressTest::DummyAsyncOutput.new
+ when :async_with_format then FluentPluginOutputAsBufferedCompressTest::DummyAsyncOutputWithFormat.new
+ else
+ raise ArgumentError, "unknown type: #{type}"
+ end
+ end
+
+ def waiting(seconds)
+ begin
+ Timeout.timeout(seconds) do
+ yield
+ end
+ rescue Timeout::Error
+ STDERR.print(*@i.log.out.logs)
+ raise
+ end
+ end
+
+ def dummy_event_stream
+ Fluent::ArrayEventStream.new(
+ [
+ [event_time('2016-04-13 18:33:00'), { 'name' => 'moris', 'age' => 36, 'message' => 'data1' }],
+ [event_time('2016-04-13 18:33:13'), { 'name' => 'moris', 'age' => 36, 'message' => 'data2' }],
+ [event_time('2016-04-13 18:33:32'), { 'name' => 'moris', 'age' => 36, 'message' => 'data3' }],
+ ]
+ )
+ end
+
+ TMP_DIR = File.expand_path('../../tmp/test_output_as_buffered_compress', __FILE__)
+
+ setup do
+ FileUtils.rm_r TMP_DIR rescue nil
+ FileUtils.mkdir_p TMP_DIR
+ end
+
+ teardown do
+ if @i
+ @i.stop unless @i.stopped?
+ @i.before_shutdown unless @i.before_shutdown?
+ @i.shutdown unless @i.shutdown?
+ @i.after_shutdown unless @i.after_shutdown?
+ @i.close unless @i.closed?
+ @i.terminate unless @i.terminated?
+ end
+ end
+
+ data(
+ handle_simple_stream: config_element('buffer', '', { 'flush_interval' => 1, 'compress' => 'gzip' }),
+ handle_stream_with_standard_format: config_element('buffer', 'tag', { 'flush_interval' => 1, 'compress' => 'gzip' }),
+ handle_simple_stream_and_file_chunk: config_element('buffer', '', { '@type' => 'file', 'path' => File.join(TMP_DIR,'test.*.log'), 'flush_interval' => 1, 'compress' => 'gzip' }),
+ handle_stream_with_standard_format_and_file_chunk: config_element('buffer', 'tag', { '@type' => 'file', 'path' => File.join(TMP_DIR,'test.*.log'), 'flush_interval' => 1, 'compress' => 'gzip' }),
+ )
+ test 'call a standard format when output plugin adds data to chunk' do |buffer_config|
+ @i = create_output(:async)
+ @i.configure(config_element('ROOT','', {}, [buffer_config]))
+ @i.start
+ @i.after_start
+
+ io = StringIO.new
+ es = dummy_event_stream
+ expected = es.map { |e| e }
+ compressed_data = ''
+
+ assert_equal :gzip, @i.buffer.compress
+
+ @i.register(:write) do |c|
+ compressed_data = c.instance_variable_get(:@chunk)
+ if compressed_data.is_a?(File)
+ compressed_data.seek(0, IO::SEEK_SET)
+ compressed_data = compressed_data.read
+ end
+ c.write_to(io)
+ end
+
+ @i.emit_events('tag', es)
+ @i.enqueue_thread_wait
+ @i.flush_thread_wakeup
+ waiting(4) { Thread.pass until io.size > 0 }
+
+ assert_equal expected, Fluent::MessagePackEventStream.new(decompress(compressed_data)).map { |t, r| [t, r] }
+ assert_equal expected, Fluent::MessagePackEventStream.new(io.string).map { |t, r| [t, r] }
+ end
+
+ data(
+ handle_simple_stream: config_element('buffer', '', { 'flush_interval' => 1, 'compress' => 'gzip' }),
+ handle_stream_with_custom_format: config_element('buffer', 'tag', { 'flush_interval' => 1, 'compress' => 'gzip' }),
+ handle_simple_stream_and_file_chunk: config_element('buffer', '', { '@type' => 'file', 'path' => File.join(TMP_DIR,'test.*.log'), 'flush_interval' => 1, 'compress' => 'gzip' }),
+ handle_stream_with_custom_format_and_file_chunk: config_element('buffer', 'tag', { '@type' => 'file', 'path' => File.join(TMP_DIR,'test.*.log'), 'flush_interval' => 1, 'compress' => 'gzip' }),
+ )
+ test 'call a custom format when output plugin adds data to chunk' do |buffer_config|
+ @i = create_output(:async_with_format)
+ @i.configure(config_element('ROOT','', {}, [buffer_config]))
+ @i.start
+ @i.after_start
+
+ io = StringIO.new
+ es = dummy_event_stream
+ expected = es.map { |e| "#{e[1]}\n" }.join # e[1] is record
+ compressed_data = ''
+
+ assert_equal :gzip, @i.buffer.compress
+
+ @i.register(:format) { |tag, time, record| "#{record}\n" }
+ @i.register(:write) { |c|
+ compressed_data = c.instance_variable_get(:@chunk)
+ if compressed_data.is_a?(File)
+ compressed_data.seek(0, IO::SEEK_SET)
+ compressed_data = compressed_data.read
+ end
+ c.write_to(io)
+ }
+
+ @i.emit_events('tag', es)
+ @i.enqueue_thread_wait
+ @i.flush_thread_wakeup
+ waiting(4) { Thread.pass until io.size > 0 }
+
+ assert_equal expected, decompress(compressed_data)
+ assert_equal expected, io.string
+ end
+end
diff --git a/test/test_event.rb b/test/test_event.rb
index e07aa65439..d6e232c64f 100644
--- a/test/test_event.rb
+++ b/test/test_event.rb
@@ -1,6 +1,7 @@
require_relative 'helper'
require 'fluent/test'
require 'fluent/event'
+require 'fluent/plugin/compressable'
module EventTest
module DeepCopyAssertion
@@ -23,6 +24,7 @@ def assert_duplicated_records(es1, es2)
class OneEventStreamTest < ::Test::Unit::TestCase
include Fluent
include DeepCopyAssertion
+ include Fluent::Plugin::Compressable
def setup
@time = event_time()
@@ -85,11 +87,28 @@ def setup
assert_equal @record, record
}
end
+
+ test 'to_compressed_msgpack_stream' do
+ stream = @es.to_compressed_msgpack_stream
+ Fluent::Engine.msgpack_factory.unpacker.feed_each(decompress(stream)) { |time, record|
+ assert_equal @time, time
+ assert_equal @record, record
+ }
+ end
+
+ test 'to_compressed_msgpack_stream with time_int argument' do
+ stream = @es.to_compressed_msgpack_stream(time_int: true)
+ Fluent::Engine.msgpack_factory.unpacker.feed_each(decompress(stream)) { |time, record|
+ assert_equal @time.to_i, time
+ assert_equal @record, record
+ }
+ end
end
class ArrayEventStreamTest < ::Test::Unit::TestCase
include Fluent
include DeepCopyAssertion
+ include Fluent::Plugin::Compressable
def setup
time = Engine.now
@@ -161,11 +180,34 @@ def setup
i += 1
}
end
+
+ test 'to_compressed_msgpack_stream' do
+ i = 0
+ compressed_stream = @es.to_compressed_msgpack_stream
+ stream = decompress(compressed_stream)
+ Fluent::Engine.msgpack_factory.unpacker.feed_each(stream) { |time, record|
+ assert_equal @times[i], time
+ assert_equal @records[i], record
+ i += 1
+ }
+ end
+
+ test 'to_compressed_msgpack_stream with time_int argument' do
+ i = 0
+ compressed_stream = @es.to_compressed_msgpack_stream(time_int: true)
+ stream = decompress(compressed_stream)
+ Fluent::Engine.msgpack_factory.unpacker.feed_each(stream) { |time, record|
+ assert_equal @times[i].to_i, time
+ assert_equal @records[i], record
+ i += 1
+ }
+ end
end
class MultiEventStreamTest < ::Test::Unit::TestCase
include Fluent
include DeepCopyAssertion
+ include Fluent::Plugin::Compressable
def setup
time = Engine.now
@@ -240,11 +282,34 @@ def setup
i += 1
}
end
+
+ test 'to_compressed_msgpack_stream' do
+ i = 0
+ compressed_stream = @es.to_compressed_msgpack_stream
+ stream = decompress(compressed_stream)
+ Fluent::Engine.msgpack_factory.unpacker.feed_each(stream) { |time, record|
+ assert_equal @times[i], time
+ assert_equal @records[i], record
+ i += 1
+ }
+ end
+
+ test 'to_compressed_msgpack_stream with time_int argument' do
+ i = 0
+ compressed_stream = @es.to_compressed_msgpack_stream(time_int: true)
+ stream = decompress(compressed_stream)
+ Fluent::Engine.msgpack_factory.unpacker.feed_each(stream) { |time, record|
+ assert_equal @times[i].to_i, time
+ assert_equal @records[i], record
+ i += 1
+ }
+ end
end
class MessagePackEventStreamTest < ::Test::Unit::TestCase
include Fluent
include DeepCopyAssertion
+ include Fluent::Plugin::Compressable
def setup
pk = Fluent::Engine.msgpack_factory.packer
@@ -325,5 +390,16 @@ def setup
i += 1
}
end
+
+ test 'to_compressed_msgpack_stream' do
+ i = 0
+ compressed_stream = @es.to_compressed_msgpack_stream
+ stream = decompress(compressed_stream)
+ Fluent::Engine.msgpack_factory.unpacker.feed_each(stream) { |time, record|
+ assert_equal @times[i], time
+ assert_equal @records[i], record
+ i += 1
+ }
+ end
end
end