Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support data compression in buffer plugins #1172

Merged
merged 21 commits into from
Sep 6, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions example/in_dummy_with_compression.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
<source>
@type dummy
@label @main
tag "test.data"
size 2
rate 10
dummy {"message":"yaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaay"}
auto_increment_key number
</source>

<label @main>
<match test.data>
@type buffered_stdout
<buffer>
@type file
path "#{Dir.pwd}/compressed_buffers"
flush_at_shutdown false
chunk_limit_size 1m
flush_interval 10s
compress gzip
</buffer>
</match>
</label>
7 changes: 7 additions & 0 deletions lib/fluent/event.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@
#

require 'fluent/msgpack_factory'
require 'fluent/plugin/compressable'

module Fluent
class EventStream
include Enumerable
include MessagePackFactory::Mixin
include Fluent::Plugin::Compressable

# dup does deep copy for event stream
def dup
Expand Down Expand Up @@ -61,6 +63,11 @@ def to_msgpack_stream(time_int: false)
out.to_s
end

def to_compressed_msgpack_stream(time_int: false)
packed = to_msgpack_stream(time_int: time_int)
compress(packed)
end

def to_msgpack_stream_forced_integer
out = msgpack_packer
each {|time,record|
Expand Down
4 changes: 2 additions & 2 deletions lib/fluent/plugin/buf_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,9 @@ def resume
def generate_chunk(metadata)
# FileChunk generates real path with unique_id
if @file_permission
Fluent::Plugin::Buffer::FileChunk.new(metadata, @path, :create, perm: @file_permission)
Fluent::Plugin::Buffer::FileChunk.new(metadata, @path, :create, perm: @file_permission, compress: @compress)
else
Fluent::Plugin::Buffer::FileChunk.new(metadata, @path, :create)
Fluent::Plugin::Buffer::FileChunk.new(metadata, @path, :create, compress: @compress)
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/fluent/plugin/buf_memory.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def resume
end

def generate_chunk(metadata)
Fluent::Plugin::Buffer::MemoryChunk.new(metadata)
Fluent::Plugin::Buffer::MemoryChunk.new(metadata, compress: @compress)
end
end
end
Expand Down
7 changes: 5 additions & 2 deletions lib/fluent/plugin/buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ class BufferChunkOverflowError < BufferError; end # A record size is larger than
# if chunk size (or records) is 95% or more after #write, then that chunk will be enqueued
config_param :chunk_full_threshold, :float, default: DEFAULT_CHUNK_FULL_THRESHOLD

desc 'Compress buffered data.'
config_param :compress, :enum, list: [:text, :gzip], default: :text

Metadata = Struct.new(:timekey, :tag, :variables) do
def empty?
timekey.nil? && tag.nil? && variables.nil?
Expand Down Expand Up @@ -458,7 +461,7 @@ def write_once(metadata, data, format: nil, size: nil, &block)
serialized = format.call(data)
chunk.concat(serialized, size ? size.call : data.size)
else
chunk.append(data)
chunk.append(data, compress: @compress)
end
adding_bytesize = chunk.bytesize - original_bytesize

Expand Down Expand Up @@ -558,7 +561,7 @@ def write_step_by_step(metadata, data, format, splits_count, &block)
if format
chunk.concat(format.call(split), split.size)
else
chunk.append(split)
chunk.append(split, compress: @compress)
end

if chunk_size_over?(chunk) # split size is larger than difference between size_full? and size_over?
Expand Down
72 changes: 67 additions & 5 deletions lib/fluent/plugin/buffer/chunk.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,13 @@
#

require 'fluent/plugin/buffer'
require 'fluent/plugin/compressable'
require 'fluent/unique_id'
require 'fluent/event'

require 'monitor'
require 'tempfile'
require 'zlib'

module Fluent
module Plugin
Expand Down Expand Up @@ -46,7 +49,7 @@ class Chunk

# TODO: CompressedPackedMessage of forward protocol?

def initialize(metadata)
def initialize(metadata, compress: :text)
super()
@unique_id = generate_unique_id
@metadata = metadata
Expand All @@ -57,12 +60,15 @@ def initialize(metadata)
@size = 0
@created_at = Time.now
@modified_at = Time.now

extend Decompressable if compress == :gzip
end

attr_reader :unique_id, :metadata, :created_at, :modified_at, :state

# data is array of formatted record string
def append(data)
def append(data, **kwargs)
raise ArgumentError, '`compress: gzip` can be used for Compressable module' if kwargs[:compress] == :gzip
adding = ''.b
data.each do |d|
adding << d.b
Expand Down Expand Up @@ -141,19 +147,75 @@ def purge
self
end

def read
def read(**kwargs)
raise ArgumentError, '`compressed: gzip` can be used for Compressable module' if kwargs[:compressed] == :gzip
raise NotImplementedError, "Implement this method in child class"
end

def open(&block)
def open(**kwargs, &block)
raise ArgumentError, '`compressed: gzip` can be used for Compressable module' if kwargs[:compressed] == :gzip
raise NotImplementedError, "Implement this method in child class"
end

def write_to(io)
def write_to(io, **kwargs)
raise ArgumentError, '`compressed: gzip` can be used for Compressable module' if kwargs[:compressed] == :gzip
open do |i|
IO.copy_stream(i, io)
end
end

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are no care about chunk.read, so that call returns raw value of compressed data.
Is it intended?
I think chunk.read (without any optional arguments) should return decompressed data.

module Decompressable
include Fluent::Plugin::Compressable

def append(data, **kwargs)
if kwargs[:compress] == :gzip
io = StringIO.new
Zlib::GzipWriter.wrap(io) do |gz|
data.each do |d|
gz.write d
end
end
concat(io.string, data.size)
else
super
end
end

def open(**kwargs, &block)
if kwargs[:compressed] == :gzip
super
else
super(kwargs) do |chunk_io|
output_io = if chunk_io.is_a?(StringIO)
StringIO.new
else
Tempfile.new('decompressed-data')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add require 'tempfile' in this file.

end
decompress(input_io: chunk_io, output_io: output_io)
output_io.seek(0, IO::SEEK_SET)
yield output_io
end
end
end

def read(**kwargs)
if kwargs[:compressed] == :gzip
super
else
decompress(super)
end
end

def write_to(io, **kwargs)
open(compressed: :gzip) do |chunk_io|
if kwargs[:compressed] == :gzip
IO.copy_stream(chunk_io, io)
else
decompress(input_io: chunk_io, output_io: io)
end
end
end
end
end
end
end
Expand Down
8 changes: 4 additions & 4 deletions lib/fluent/plugin/buffer/file_chunk.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ class FileChunk < Chunk

attr_reader :path, :permission

def initialize(metadata, path, mode, perm: system_config.file_permission || FILE_PERMISSION)
super(metadata)
def initialize(metadata, path, mode, perm: system_config.file_permission || FILE_PERMISSION, compress: :text)
super(metadata, compress: compress)
@permission = perm
@bytesize = @size = @adding_bytes = @adding_size = 0
@meta = nil
Expand Down Expand Up @@ -133,12 +133,12 @@ def purge
File.unlink(@path, @meta_path)
end

def read
def read(**kwargs)
@chunk.seek(0, IO::SEEK_SET)
@chunk.read
end

def open(&block)
def open(**kwargs, &block)
@chunk.seek(0, IO::SEEK_SET)
val = yield @chunk
@chunk.seek(0, IO::SEEK_END) if self.staged?
Expand Down
8 changes: 4 additions & 4 deletions lib/fluent/plugin/buffer/memory_chunk.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ module Fluent
module Plugin
class Buffer
class MemoryChunk < Chunk
def initialize(metadata)
def initialize(metadata, compress: :text)
super
@chunk = ''.force_encoding(Encoding::ASCII_8BIT)
@chunk_bytes = 0
Expand Down Expand Up @@ -72,15 +72,15 @@ def purge
true
end

def read
def read(**kwargs)
@chunk
end

def open(&block)
def open(**kwargs, &block)
StringIO.open(@chunk, &block)
end

def write_to(io)
def write_to(io, **kwargs)
# re-implementation to optimize not to create StringIO
io.write @chunk
end
Expand Down
91 changes: 91 additions & 0 deletions lib/fluent/plugin/compressable.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
#
# Fluentd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

require 'zlib'

module Fluent
module Plugin
module Compressable
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO these methods should be able to handle IO objects as output destination (by specifying it as optional keyword arguments).
It reduces overhead of StringIO -> String -> IO (file chunks).

Copy link
Member

@tagomoris tagomoris Aug 29, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wrote "as output destination". Of course, IO objects as input data source are useful.
On the other hand, #decompress still returns 2560MB string for file buffer chunks configured as chunk_limit_size 256m(default value of file buffer). It's a large overhead.

def compress(data, **kwargs)
output_io = kwargs[:output_io]
io = output_io || StringIO.new
Zlib::GzipWriter.wrap(io) do |gz|
gz.write data
end

output_io || io.string
end

# compressed_data is String like `compress(data1) + compress(data2) + ... + compress(dataN)`
# https://www.ruby-forum.com/topic/971591#979503
def decompress(compressed_data = nil, output_io: nil, input_io: nil)
case
when input_io && output_io
io_decompress(input_io, output_io)
when input_io
output_io = StringIO.new
io = io_decompress(input_io, output_io)
io.string
when compressed_data.nil? || compressed_data.empty?
# check compressed_data(String) is 0 length
compressed_data
when output_io
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check compressed_data is valid string.

# exeucte after checking compressed_data is empty or not
io = StringIO.new(compressed_data)
io_decompress(io, output_io)
else
string_decompress(compressed_data)
end
end

private

def string_decompress(compressed_data)
io = StringIO.new(compressed_data)

out = ''
loop do
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note where this hack came from.

gz = Zlib::GzipReader.new(io)
out += gz.read
unused = gz.unused
gz.finish

break if unused.nil?
adjust = unused.length
io.pos -= adjust
end

out
end

def io_decompress(input, output)
loop do
gz = Zlib::GzipReader.new(input)
v = gz.read
output.write(v)
unused = gz.unused
gz.finish

break if unused.nil?
adjust = unused.length
input.pos -= adjust
end

output
end
end
end
end
6 changes: 4 additions & 2 deletions lib/fluent/plugin/out_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -440,8 +440,10 @@ def send_data(tag, chunk)

sock.write @sender.forward_header # beginArray(3)
sock.write tag.to_msgpack # 1. writeRaw(tag)
sock.write [0xdb, chunk.size].pack('CN') # 2. beginRaw(size) raw32
chunk.write_to(sock) # writeRawBody(packed_es)
chunk.open(compressed: :text) do |chunk_io|
sock.write [0xdb, chunk_io.size].pack('CN') # 2. beginRaw(size) raw32
IO.copy_stream(chunk_io, sock) # writeRawBody(packed_es)
end
sock.write option.to_msgpack # 3. writeOption(option)

if @sender.require_ack_response
Expand Down
Loading