Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add size key to option in forward protocol #1137

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions lib/fluent/compat/output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,10 @@ module ChunkSizeCompatMixin
def size
self.bytesize
end

def size_of_events
@size + @adding_size
end
end

module BufferedChunkMixin
Expand Down
7 changes: 4 additions & 3 deletions lib/fluent/plugin/in_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -169,10 +169,11 @@ def on_message(msg, chunk_size, source)

if entries.class == String
# PackedForward
es = MessagePackEventStream.new(entries)
option = msg[2]
size = (option && option['size']) || 0
es = MessagePackEventStream.new(entries, nil, size.to_i)
es = check_and_skip_invalid_event(tag, es, source) if @skip_invalid_event
router.emit_stream(tag, es)
option = msg[2]

elsif entries.class == Array
# Forward
Expand Down Expand Up @@ -265,7 +266,7 @@ def on_read(data)
@y = Yajl::Parser.new
@y.on_parse_complete = lambda { |obj|
option = @on_message.call(obj, @chunk_counter, @source)
respond option if option
respond option
@chunk_counter = 0
}
else
Expand Down
95 changes: 38 additions & 57 deletions lib/fluent/plugin/out_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,6 @@ def initialize
desc 'Use the "Phi accrual failure detector" to detect server failure.'
config_param :phi_failure_detector, :bool, default: true

# if any options added that requires extended forward api, fix @extend_internal_protocol

desc 'Change the protocol to at-least-once.'
config_param :require_ack_response, :bool, default: false # require in_forward to respond with ack
desc 'This option is used when require_ack_response is true.'
Expand All @@ -95,8 +93,6 @@ def initialize
config_param :port, :integer, default: LISTEN_PORT
config_param :host, :string, default: nil

attr_accessor :extend_internal_protocol

def configure(conf)
super

Expand All @@ -112,13 +108,6 @@ def configure(conf)

recover_sample_size = @recover_wait / @heartbeat_interval

# add options here if any options addes which uses extended protocol
@extend_internal_protocol = if @require_ack_response
true
else
false
end

if @dns_round_robin
if @heartbeat_type == :udp
raise ConfigError, "forward output heartbeat type must be 'tcp' or 'none' to use dns_round_robin option"
Expand Down Expand Up @@ -276,16 +265,10 @@ def rebuild_weight_array
@weight_array = weight_array
end

# MessagePack FixArray length = 3 (if @extend_internal_protocol)
# = 2 (else)
FORWARD_HEADER = [0x92].pack('C').freeze
FORWARD_HEADER_EXT = [0x93].pack('C').freeze
# MessagePack FixArray length is 3
FORWARD_HEADER = [0x93].pack('C').freeze
def forward_header
if @extend_internal_protocol
FORWARD_HEADER_EXT
else
FORWARD_HEADER
end
FORWARD_HEADER
end

#FORWARD_TCP_HEARTBEAT_DATA = FORWARD_HEADER + ''.to_msgpack + [].to_msgpack
Expand Down Expand Up @@ -314,7 +297,7 @@ def send_data(node, tag, chunk)
opt = [@send_timeout.to_i, 0].pack('L!L!') # struct timeval
sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_SNDTIMEO, opt)

# beginArray(2)
# beginArray(3)
sock.write forward_header

# writeRaw(tag)
Expand All @@ -336,48 +319,46 @@ def send_data(node, tag, chunk)
# writeRawBody(packed_es)
chunk.write_to(sock)

if @extend_internal_protocol
option = {}
option['chunk'] = Base64.encode64(chunk.unique_id) if @require_ack_response
sock.write option.to_msgpack

if @require_ack_response && @ack_response_timeout > 0
# Waiting for a response here results in a decrease of throughput because a chunk queue is locked.
# To avoid a decrease of troughput, it is necessary to prepare a list of chunks that wait for responses
# and process them asynchronously.
if IO.select([sock], nil, nil, @ack_response_timeout)
raw_data = sock.recv(1024)

# When connection is closed by remote host, socket is ready to read and #recv returns an empty string that means EOF.
# If this happens we assume the data wasn't delivered and retry it.
if raw_data.empty?
@log.warn "node #{node.host}:#{node.port} closed the connection. regard it as unavailable."
node.disable!
raise ForwardOutputConnectionClosedError, "node #{node.host}:#{node.port} closed connection"
else
# Serialization type of the response is same as sent data.
res = MessagePack.unpack(raw_data)

if res['ack'] != option['chunk']
# Some errors may have occured when ack and chunk id is different, so send the chunk again.
raise ForwardOutputResponseError, "ack in response and chunk id in sent data are different"
end
end

else
# IO.select returns nil on timeout.
# There are 2 types of cases when no response has been received:
# (1) the node does not support sending responses
# (2) the node does support sending response but responses have not arrived for some reasons.
@log.warn "no response from #{node.host}:#{node.port}. regard it as unavailable."
option = { 'size' => chunk.size_of_events }
option['chunk'] = Base64.encode64(chunk.unique_id) if @require_ack_response
sock.write option.to_msgpack

if @require_ack_response && @ack_response_timeout > 0
# Waiting for a response here results in a decrease of throughput because a chunk queue is locked.
# To avoid a decrease of troughput, it is necessary to prepare a list of chunks that wait for responses
# and process them asynchronously.
if IO.select([sock], nil, nil, @ack_response_timeout)
raw_data = sock.recv(1024)

# When connection is closed by remote host, socket is ready to read and #recv returns an empty string that means EOF.
# If this happens we assume the data wasn't delivered and retry it.
if raw_data.empty?
@log.warn "node #{node.host}:#{node.port} closed the connection. regard it as unavailable."
node.disable!
raise ForwardOutputACKTimeoutError, "node #{node.host}:#{node.port} does not return ACK"
raise ForwardOutputConnectionClosedError, "node #{node.host}:#{node.port} closed connection"
else
# Serialization type of the response is same as sent data.
res = MessagePack.unpack(raw_data)

if res['ack'] != option['chunk']
# Some errors may have occured when ack and chunk id is different, so send the chunk again.
raise ForwardOutputResponseError, "ack in response and chunk id in sent data are different"
end
end

else
# IO.select returns nil on timeout.
# There are 2 types of cases when no response has been received:
# (1) the node does not support sending responses
# (2) the node does support sending response but responses have not arrived for some reasons.
@log.warn "no response from #{node.host}:#{node.port}. regard it as unavailable."
node.disable!
raise ForwardOutputACKTimeoutError, "node #{node.host}:#{node.port} does not return ACK"
end
end

node.heartbeat(false)
return res # for test
res # for test
ensure
sock.close_write
sock.close
Expand Down
24 changes: 24 additions & 0 deletions test/plugin/test_in_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,30 @@ def test_message_json
end
end

def test_set_size_to_option
d = create_driver

time = Time.parse("2011-01-02 13:14:15 UTC").to_i
events = [
["tag1", time, {"a"=>1}],
["tag1", time, {"a"=>2}]
]

entries = ''
events.each {|_tag, _time, record|
[_time, record].to_msgpack(entries)
}

chunk = ["tag1", entries, { 'size' => events.length }].to_msgpack

d.run do
Fluent::Engine.msgpack_factory.unpacker.feed_each(chunk) do |obj|
option = d.instance.send(:on_message, obj, chunk.size, "host: 127.0.0.1, addr: 127.0.0.1, port: 0000")
assert_equal option['size'], events.length
end
end
end

def test_send_large_chunk_warning
d = create_driver(CONFIG + %[
chunk_size_warn_limit 16M
Expand Down
98 changes: 93 additions & 5 deletions test/plugin/test_out_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -107,19 +107,81 @@ def test_phi_failure_detector

def test_wait_response_timeout_config
d = create_driver(CONFIG)
assert_equal false, d.instance.extend_internal_protocol
assert_equal false, d.instance.require_ack_response
assert_equal 190, d.instance.ack_response_timeout

d = create_driver(CONFIG + %[
require_ack_response true
ack_response_timeout 2s
])
assert d.instance.extend_internal_protocol
assert d.instance.require_ack_response
assert_equal 2, d.instance.ack_response_timeout
end

def test_sending_contains_with_ack
target_input_driver = create_target_input_driver(true)

d = create_driver(CONFIG + %[
ack_response_timeout 1s
])

time = Time.parse("2011-01-02 13:14:15 UTC").to_i

records = [
{"a" => 1},
{"a" => 2}
]
d.register_run_post_condition do
d.instance.responses.length == 1
end

target_input_driver.run do
d.run do
records.each do |record|
d.emit record, time
end
end
end

emits = target_input_driver.emits
assert_equal ['test', time, records[0]], emits[0]
assert_equal ['test', time, records[1]], emits[1]

assert_equal target_input_driver.instance.received_options[0]['size'], 2
end

def test_sending_contains_without_ack
target_input_driver = create_target_input_driver(true)

d = create_driver(CONFIG + %[
ack_response_timeout 1s
])

time = Time.parse("2011-01-02 13:14:15 UTC").to_i

records = [
{"a" => 1},
{"a" => 2}
]
d.register_run_post_condition do
d.instance.responses.length == 1
end

target_input_driver.run do
d.run do
records.each do |record|
d.emit record, time
end
end
end

emits = target_input_driver.emits
assert_equal ['test', time, records[0]], emits[0]
assert_equal ['test', time, records[1]], emits[1]

assert_equal target_input_driver.instance.received_options[0]['size'], 2
end

def test_send_with_time_as_integer
target_input_driver = create_target_input_driver

Expand Down Expand Up @@ -379,12 +441,18 @@ def create_target_input_driver(do_respond=false, disconnect=false, conf=TARGET_C
DummyEngineDriver.new(Fluent::ForwardInput) {
handler_class = Class.new(Fluent::ForwardInput::Handler) { |klass|
attr_reader :chunk_counter # for checking if received data is successfully deserialized
attr_reader :received_options

def initialize(sock, log, on_message)
@sock = sock
@log = log
@chunk_counter = 0
@on_message = on_message
@received_options = []
@on_message = ->(msg, chunk_size, source) {
option = on_message.call(msg, chunk_size, source)
@received_options << option
option
}
@source = nil
end

Expand All @@ -409,7 +477,7 @@ def close
end
}

define_method(:start) do
define_method(:_start) do
@thread = Thread.new do
Socket.tcp_server_loop(@bind, @port) do |sock, client_addrinfo|
begin
Expand All @@ -431,12 +499,15 @@ def close
sock.close_write
sock.close
end
@received_options = handler.received_options
end
end
end
end

def shutdown
attr_reader :received_options

def _shutdown
@thread.kill
@thread.join
end
Expand All @@ -457,9 +528,26 @@ def test_heartbeat_type_none
assert_equal node.available, true
end

# To suppress calling `ForwardInput#start` in `DummyEngineDriver`,
# `DummyEnigneDriver` should avoid calling CallSuperMixin.prepend at `Fluent::Compat::Input#initialize`.
module SuppressCallSuperMixin
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add code comment why this module is needed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added a comment.
f43c084

def start
_start
end

def before_shutdown
# nothing
end

def shutdown
_shutdown
end
end

class DummyEngineDriver < Fluent::Test::TestDriver
def initialize(klass, &block)
super(klass, &block)
@instance.class.prepend(SuppressCallSuperMixin)
@engine = DummyEngineClass.new
end

Expand Down