Skip to content

Commit

Permalink
Support compression in forward transfer
Browse files Browse the repository at this point in the history
  • Loading branch information
repeatedly committed Jul 27, 2015
1 parent 4e3a4ec commit a005481
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 17 deletions.
9 changes: 8 additions & 1 deletion lib/fluent/plugin/in_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ class ForwardInput < Input

def initialize
super
require 'zlib'
require 'stringio'
require 'fluent/plugin/socket_util'
end

Expand Down Expand Up @@ -143,9 +145,14 @@ def on_message(msg, chunk_size, source)

if entries.class == String
# PackedForward
option = msg[2]
if option && option['compress']
Zlib::GzipReader.wrap(StringIO.new(entries)) { |gz|
entries = gz.read
}
end
es = MessagePackEventStream.new(entries)
router.emit_stream(tag, es)
option = msg[2]

elsif entries.class == Array
# Forward
Expand Down
54 changes: 38 additions & 16 deletions lib/fluent/plugin/out_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class ForwardOutput < ObjectBufferedOutput

def initialize
super
require 'zlib'
require "base64"
require 'socket'
require 'fileutils'
Expand All @@ -55,6 +56,7 @@ def initialize
config_param :expire_dns_cache, :time, :default => nil # 0 means disable cache
config_param :phi_threshold, :integer, :default => 16
config_param :phi_failure_detector, :bool, :default => true
config_param :compress, :bool, :default => false

# if any options added that requires extended forward api, fix @extend_internal_protocol

Expand Down Expand Up @@ -250,7 +252,7 @@ def rebuild_weight_array
FORWARD_HEADER = [0x92].pack('C').freeze
FORWARD_HEADER_EXT = [0x93].pack('C').freeze
def forward_header
if @extend_internal_protocol
if @extend_internal_protocol || @compress
FORWARD_HEADER_EXT
else
FORWARD_HEADER
Expand Down Expand Up @@ -288,24 +290,43 @@ def send_data(node, tag, chunk)
# writeRaw(tag)
sock.write tag.to_msgpack # tag

# beginRaw(size)
sz = chunk.size
#if sz < 32
# # FixRaw
# sock.write [0xa0 | sz].pack('C')
#elsif sz < 65536
# # raw 16
# sock.write [0xda, sz].pack('Cn')
#else
# raw 32
sock.write [0xdb, sz].pack('CN')
#end

# writeRawBody(packed_es)
chunk.write_to(sock)
option = nil
if @compress
tmp = Tempfile.new("forward-#{chunk.key}")
gz = Zlib::GzipWriter.new(tmp)
chunk.write_to(gz)
gz.finish
gz = nil

sock.write [0xdb, tmp.pos].pack('CN')

tmp.pos = 0
FileUtils.copy_stream(tmp, sock)

option = {'compress' => true}
unless @extend_internal_protocol
sock.write option.to_msgpack
end
else
# beginRaw(size)
sz = chunk.size
#if sz < 32
# # FixRaw
# sock.write [0xa0 | sz].pack('C')
#elsif sz < 65536
# # raw 16
# sock.write [0xda, sz].pack('Cn')
#else
# raw 32
sock.write [0xdb, sz].pack('CN')
#end

chunk.write_to(sock)
end

if @extend_internal_protocol
option = {}
option ||= {}
option['chunk'] = Base64.encode64(chunk.unique_id) if @require_ack_response
sock.write option.to_msgpack

Expand Down Expand Up @@ -347,6 +368,7 @@ def send_data(node, tag, chunk)
node.heartbeat(false)
return res # for test
ensure
tmp.close(true) if tmp
sock.close
end
end
Expand Down

0 comments on commit a005481

Please sign in to comment.