Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support compression in forward transfer #637

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion lib/fluent/plugin/in_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ class ForwardInput < Input

def initialize
super
require 'zlib'
require 'stringio'
require 'fluent/plugin/socket_util'
end

Expand Down Expand Up @@ -148,9 +150,14 @@ def on_message(msg, chunk_size, source)

if entries.class == String
# PackedForward
option = msg[2]
if option && option['compress']
Zlib::GzipReader.wrap(StringIO.new(entries)) { |gz|
entries = gz.read
}
end
es = MessagePackEventStream.new(entries)
router.emit_stream(tag, es)
option = msg[2]

elsif entries.class == Array
# Forward
Expand Down
54 changes: 38 additions & 16 deletions lib/fluent/plugin/out_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class ForwardOutput < ObjectBufferedOutput

def initialize
super
require 'zlib'
require "base64"
require 'socket'
require 'fileutils'
Expand Down Expand Up @@ -63,6 +64,7 @@ def initialize
config_param :phi_threshold, :integer, :default => 16
desc 'Use the "Phi accrual failure detector" to detect server failure.'
config_param :phi_failure_detector, :bool, :default => true
config_param :compress, :bool, :default => false

# if any options added that requires extended forward api, fix @extend_internal_protocol

Expand Down Expand Up @@ -265,7 +267,7 @@ def rebuild_weight_array
FORWARD_HEADER = [0x92].pack('C').freeze
FORWARD_HEADER_EXT = [0x93].pack('C').freeze
def forward_header
if @extend_internal_protocol
if @extend_internal_protocol || @compress
FORWARD_HEADER_EXT
else
FORWARD_HEADER
Expand Down Expand Up @@ -304,24 +306,43 @@ def send_data(node, tag, chunk)
# writeRaw(tag)
sock.write tag.to_msgpack # tag

# beginRaw(size)
sz = chunk.size
#if sz < 32
# # FixRaw
# sock.write [0xa0 | sz].pack('C')
#elsif sz < 65536
# # raw 16
# sock.write [0xda, sz].pack('Cn')
#else
# raw 32
sock.write [0xdb, sz].pack('CN')
#end

# writeRawBody(packed_es)
chunk.write_to(sock)
option = nil
if @compress
tmp = Tempfile.new("forward-#{chunk.key}")
gz = Zlib::GzipWriter.new(tmp)
chunk.write_to(gz)
gz.finish
gz = nil

sock.write [0xdb, tmp.pos].pack('CN')

tmp.pos = 0
FileUtils.copy_stream(tmp, sock)

option = {'compress' => true}
unless @extend_internal_protocol
sock.write option.to_msgpack
end
else
# beginRaw(size)
sz = chunk.size
#if sz < 32
# # FixRaw
# sock.write [0xa0 | sz].pack('C')
#elsif sz < 65536
# # raw 16
# sock.write [0xda, sz].pack('Cn')
#else
# raw 32
sock.write [0xdb, sz].pack('CN')
#end

chunk.write_to(sock)
end

if @extend_internal_protocol
option = {}
option ||= {}
option['chunk'] = Base64.encode64(chunk.unique_id) if @require_ack_response
sock.write option.to_msgpack

Expand Down Expand Up @@ -363,6 +384,7 @@ def send_data(node, tag, chunk)
node.heartbeat(false)
return res # for test
ensure
tmp.close(true) if tmp
sock.close_write
sock.close
end
Expand Down