Skip to content

Commit

Permalink
Add size size key in option at out_forward
Browse files Browse the repository at this point in the history
Sending array's length with msgpack is always 3, so remove FORWARD_HEADER_EXT
  • Loading branch information
ganmacs committed Aug 4, 2016
1 parent 3213d10 commit 8b3f97a
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 13 deletions.
4 changes: 4 additions & 0 deletions lib/fluent/compat/output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,10 @@ module ChunkSizeCompatMixin
def size
self.bytesize
end

def size_of_events
@size + @adding_size
end
end

module BufferedChunkMixin
Expand Down
17 changes: 7 additions & 10 deletions lib/fluent/plugin/out_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -276,16 +276,10 @@ def rebuild_weight_array
@weight_array = weight_array
end

# MessagePack FixArray length = 3 (if @extend_internal_protocol)
# = 2 (else)
FORWARD_HEADER = [0x92].pack('C').freeze
FORWARD_HEADER_EXT = [0x93].pack('C').freeze
# MessagePack FixArray length is 3
FORWARD_HEADER = [0x93].pack('C').freeze
def forward_header
if @extend_internal_protocol
FORWARD_HEADER_EXT
else
FORWARD_HEADER
end
FORWARD_HEADER
end

#FORWARD_TCP_HEARTBEAT_DATA = FORWARD_HEADER + ''.to_msgpack + [].to_msgpack
Expand Down Expand Up @@ -336,8 +330,9 @@ def send_data(node, tag, chunk)
# writeRawBody(packed_es)
chunk.write_to(sock)

option = { 'size' => chunk.size_of_events }

if @extend_internal_protocol
option = {}
option['chunk'] = Base64.encode64(chunk.unique_id) if @require_ack_response
sock.write option.to_msgpack

Expand Down Expand Up @@ -374,6 +369,8 @@ def send_data(node, tag, chunk)
raise ForwardOutputACKTimeoutError, "node #{node.host}:#{node.port} does not return ACK"
end
end
else
sock.write option.to_msgpack
end

node.heartbeat(false)
Expand Down
94 changes: 91 additions & 3 deletions test/plugin/test_out_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,70 @@ def test_wait_response_timeout_config
assert_equal 2, d.instance.ack_response_timeout
end

def test_sending_contains_with_ack
target_input_driver = create_target_input_driver(true)

d = create_driver(CONFIG + %[
ack_response_timeout 1s
])

time = Time.parse("2011-01-02 13:14:15 UTC").to_i

records = [
{"a" => 1},
{"a" => 2}
]
d.register_run_post_condition do
d.instance.responses.length == 1
end

target_input_driver.run do
d.run do
records.each do |record|
d.emit record, time
end
end
end

emits = target_input_driver.emits
assert_equal ['test', time, records[0]], emits[0]
assert_equal ['test', time, records[1]], emits[1]

assert_equal target_input_driver.instance.received_options[0]['size'], 2
end

def test_sending_contains_without_ack
target_input_driver = create_target_input_driver(true)

d = create_driver(CONFIG + %[
ack_response_timeout 1s
])

time = Time.parse("2011-01-02 13:14:15 UTC").to_i

records = [
{"a" => 1},
{"a" => 2}
]
d.register_run_post_condition do
d.instance.responses.length == 1
end

target_input_driver.run do
d.run do
records.each do |record|
d.emit record, time
end
end
end

emits = target_input_driver.emits
assert_equal ['test', time, records[0]], emits[0]
assert_equal ['test', time, records[1]], emits[1]

assert_equal target_input_driver.instance.received_options[0]['size'], 2
end

def test_send_with_time_as_integer
target_input_driver = create_target_input_driver

Expand Down Expand Up @@ -379,12 +443,18 @@ def create_target_input_driver(do_respond=false, disconnect=false, conf=TARGET_C
DummyEngineDriver.new(Fluent::ForwardInput) {
handler_class = Class.new(Fluent::ForwardInput::Handler) { |klass|
attr_reader :chunk_counter # for checking if received data is successfully deserialized
attr_reader :received_options

def initialize(sock, log, on_message)
@sock = sock
@log = log
@chunk_counter = 0
@on_message = on_message
@received_options = []
@on_message = ->(msg, chunk_size, source) {
option = on_message.call(msg, chunk_size, source)
@received_options << option
option
}
@source = nil
end

Expand All @@ -409,7 +479,7 @@ def close
end
}

define_method(:start) do
define_method(:_start) do
@thread = Thread.new do
Socket.tcp_server_loop(@bind, @port) do |sock, client_addrinfo|
begin
Expand All @@ -431,12 +501,15 @@ def close
sock.close_write
sock.close
end
@received_options = handler.received_options
end
end
end
end

def shutdown
attr_reader :received_options

def _shutdown
@thread.kill
@thread.join
end
Expand All @@ -457,9 +530,24 @@ def test_heartbeat_type_none
assert_equal node.available, true
end

module SuppressCallSuperMixin
def start
_start
end

def before_shutdown
# nothing
end

def shutdown
_shutdown
end
end

class DummyEngineDriver < Fluent::Test::TestDriver
def initialize(klass, &block)
super(klass, &block)
@instance.class.prepend(SuppressCallSuperMixin)
@engine = DummyEngineClass.new
end

Expand Down

0 comments on commit 8b3f97a

Please sign in to comment.