Skip to content

Commit

Permalink
Merge pull request #2516 from ganmacs/ack_handler
Browse files Browse the repository at this point in the history
Ack handler
  • Loading branch information
ganmacs authored Jul 31, 2019
2 parents 5662a39 + 07a85d3 commit 9b17037
Show file tree
Hide file tree
Showing 6 changed files with 344 additions and 159 deletions.
147 changes: 36 additions & 111 deletions lib/fluent/plugin/out_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
require 'fluent/plugin/out_forward/failure_detector'
require 'fluent/plugin/out_forward/error'
require 'fluent/plugin/out_forward/connection_manager'
require 'fluent/plugin/out_forward/ack_handler'

module Fluent::Plugin
class ForwardOutput < Output
Expand Down Expand Up @@ -156,8 +157,6 @@ def initialize
@thread = nil

@usock = nil
@sock_ack_waiting = nil
@sock_ack_waiting_mutex = nil
@keep_alive_watcher_interval = 5 # TODO
end

Expand Down Expand Up @@ -201,6 +200,7 @@ def configure(conf)
end
end

@ack_handler = @require_ack_response ? AckHandler.new(timeout: @ack_response_timeout, log: @log, read_length: @read_length) : nil
socket_cache = @keepalive ? SocketCache.new(@keepalive_timeout, @log) : nil
@connection_manager = ConnectionManager.new(
log: @log,
Expand All @@ -215,9 +215,9 @@ def configure(conf)

log.info "adding forwarding server '#{name}'", host: server.host, port: server.port, weight: server.weight, plugin_id: plugin_id
if @heartbeat_type == :none
@nodes << NoneHeartbeatNode.new(self, server, failure: failure, connection_manager: @connection_manager)
@nodes << NoneHeartbeatNode.new(self, server, failure: failure, connection_manager: @connection_manager, ack_handler: @ack_handler)
else
node = Node.new(self, server, failure: failure, connection_manager: @connection_manager)
node = Node.new(self, server, failure: failure, connection_manager: @connection_manager, ack_handler: @ack_handler)
begin
node.validate_host_resolution!
rescue => e
Expand Down Expand Up @@ -259,13 +259,6 @@ def prefer_delayed_commit
def start
super

# Output#start sets @delayed_commit_timeout by @buffer_config.delayed_commit_timeout
# But it should be overwritten by ack_response_timeout to rollback chunks after timeout
if @ack_response_timeout && @delayed_commit_timeout != @ack_response_timeout
log.info "delayed_commit_timeout is overwritten by ack_response_timeout"
@delayed_commit_timeout = @ack_response_timeout + 2 # minimum ack_reader IO.select interval is 1s
end

@load_balancer = LoadBalancer.new(log)
@load_balancer.rebuild_weight_array(@nodes)

Expand All @@ -278,8 +271,13 @@ def start
end

if @require_ack_response
@sock_ack_waiting_mutex = Mutex.new
@sock_ack_waiting = []
# Output#start sets @delayed_commit_timeout by @buffer_config.delayed_commit_timeout
# But it should be overwritten by ack_response_timeout to rollback chunks after timeout
if @delayed_commit_timeout != @ack_response_timeout
log.info "delayed_commit_timeout is overwritten by ack_response_timeout"
@delayed_commit_timeout = @ack_response_timeout + 2 # minimum ack_reader IO.select interval is 1s
end

thread_create(:out_forward_receiving_ack, &method(:ack_reader))
end

Expand Down Expand Up @@ -323,26 +321,14 @@ def write(chunk)
@load_balancer.select_healthy_node { |node| node.send_data(tag, chunk) }
end

ACKWaitingSockInfo = Struct.new(:sock, :chunk_id, :chunk_id_base64, :node, :time, :timeout) do
def expired?(now)
time + timeout < now
end
end

def try_write(chunk)
log.trace "writing a chunk to destination", chunk_id: dump_unique_id_hex(chunk.unique_id)
if chunk.empty?
commit_write(chunk.unique_id)
return
end
tag = chunk.metadata.tag
sock, node = @load_balancer.select_healthy_node { |n| n.send_data(tag, chunk) }
chunk_id_base64 = Base64.encode64(chunk.unique_id)
current_time = Fluent::Clock.now
info = ACKWaitingSockInfo.new(sock, chunk.unique_id, chunk_id_base64, node, current_time, @ack_response_timeout)
@sock_ack_waiting_mutex.synchronize do
@sock_ack_waiting << info
end
@load_balancer.select_healthy_node { |n| n.send_data(tag, chunk) }
end

def create_transfer_socket(host, port, hostname, &block)
Expand Down Expand Up @@ -428,95 +414,40 @@ def on_purge_obsolete_socks
@connection_manager.purge_obsolete_socks
end

# return chunk id to be committed
def read_ack_from_sock(sock, unpacker)
begin
raw_data = sock.instance_of?(Fluent::PluginHelper::Socket::WrappedSocket::TLS) ? sock.readpartial(@read_length) : sock.recv(@read_length)
rescue Errno::ECONNRESET, EOFError # ECONNRESET for #recv, #EOFError for #readpartial
raw_data = ""
end
info = @sock_ack_waiting_mutex.synchronize{ @sock_ack_waiting.find{|i| i.sock == sock } }

# When connection is closed by remote host, socket is ready to read and #recv returns an empty string that means EOF.
# If this happens we assume the data wasn't delivered and retry it.
if raw_data.empty?
log.warn "destination node closed the connection. regard it as unavailable.", host: info.node.host, port: info.node.port
info.node.disable!
rollback_write(info.chunk_id, update_retry: false)
return nil
else
unpacker.feed(raw_data)
res = unpacker.read
log.trace "getting response from destination", host: info.node.host, port: info.node.port, chunk_id: dump_unique_id_hex(info.chunk_id), response: res
if res['ack'] != info.chunk_id_base64
# Some errors may have occurred when ack and chunk id is different, so send the chunk again.
log.warn "ack in response and chunk id in sent data are different", chunk_id: dump_unique_id_hex(info.chunk_id), ack: res['ack']
rollback_write(info.chunk_id, update_retry: false)
return nil
else
log.trace "got a correct ack response", chunk_id: dump_unique_id_hex(info.chunk_id)
end
return info.chunk_id
end
rescue => e
log.error "unexpected error while receiving ack message", error: e
log.error_backtrace
ensure
info.node.close(info.sock)
@sock_ack_waiting_mutex.synchronize do
@sock_ack_waiting.delete(info)
end
end

def ack_reader
select_interval = if @delayed_commit_timeout > 3
1
else
@delayed_commit_timeout / 3.0
end

unpacker = Fluent::Engine.msgpack_unpacker

while thread_current_running?
now = Fluent::Clock.now
sockets = []
begin
@sock_ack_waiting_mutex.synchronize do
new_list = []
@sock_ack_waiting.each do |info|
if info.expired?(now)
# There are 2 types of cases when no response has been received from socket:
# (1) the node does not support sending responses
# (2) the node does support sending response but responses have not arrived for some reasons.
log.warn "no response from node. regard it as unavailable.", host: info.node.host, port: info.node.port
info.node.disable!
info.node.close(info.sock)
rollback_write(info.chunk_id, update_retry: false)
else
sockets << info.sock
new_list << info
end
end
@sock_ack_waiting = new_list
end

readable_sockets, _, _ = IO.select(sockets, nil, nil, select_interval)
next unless readable_sockets
@ack_handler.collect_response(select_interval) do |chunk_id, node, sock, result|
@connection_manager.close(sock)

readable_sockets.each do |sock|
chunk_id = read_ack_from_sock(sock, unpacker)
commit_write(chunk_id) if chunk_id
case result
when AckHandler::Result::SUCCESS
commit_write(chunk_id)
when AckHandler::Result::FAILED
node.disable!
rollback_write(chunk_id, update_retry: false)
when AckHandler::Result::CHUNKID_UNMATCHED
rollback_write(chunk_id, update_retry: false)
else
log.warn("BUG: invalid status #{result} #{chunk_id}")

if chunk_id
rollback_write(chunk_id, update_retry: false)
end
end
rescue => e
log.error "unexpected error while receiving ack", error: e
log.error_backtrace
end
end
end

class Node
# @param connection_manager [Fluent::Plugin::ForwardOutput::ConnectionManager]
def initialize(sender, server, failure:, connection_manager:)
# @param ack_handler [Fluent::Plugin::ForwardOutput::AckHandler]
def initialize(sender, server, failure:, connection_manager:, ack_handler:)
@sender = sender
@log = sender.log
@compress = sender.compress
Expand Down Expand Up @@ -554,6 +485,7 @@ def initialize(sender, server, failure:, connection_manager:)
@resolved_once = false

@connection_manager = connection_manager
@ack_handler = ack_handler
end

attr_accessor :usock
Expand Down Expand Up @@ -631,7 +563,7 @@ def establish_connection(sock, ri)

def send_data_actual(sock, tag, chunk)
option = { 'size' => chunk.size, 'compressed' => @compress }
option['chunk'] = Base64.encode64(chunk.unique_id) if @sender.require_ack_response
option['chunk'] = Base64.encode64(chunk.unique_id) if @ack_handler

# https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1#packedforward-mode
# out_forward always uses str32 type for entries.
Expand All @@ -652,7 +584,8 @@ def send_data_actual(sock, tag, chunk)
end

def send_data(tag, chunk)
connect(nil, require_ack: @sender.require_ack_response) do |sock, ri|
ack = @ack_handler && @ack_handler.create_ack(chunk.unique_id, self)
connect(nil, ack: ack) do |sock, ri|
if ri.state != :established
establish_connection(sock, ri)

Expand All @@ -662,20 +595,12 @@ def send_data(tag, chunk)
end

send_data_actual(sock, tag, chunk)

if @sender.require_ack_response
return sock # to read ACK from socket
end
end

heartbeat(false)
nil
end

def close(sock)
@connection_manager.close(sock)
end

# FORWARD_TCP_HEARTBEAT_DATA = FORWARD_HEADER + ''.to_msgpack + [].to_msgpack
#
# @return [Boolean] return true if it needs to rebuild nodes
Expand Down Expand Up @@ -785,8 +710,8 @@ def heartbeat(detect=true)

private

def connect(host = nil, require_ack: false, &block)
@connection_manager.connect(host: host || resolved_host, port: port, hostname: @hostname, require_ack: require_ack, &block)
def connect(host = nil, ack: false, &block)
@connection_manager.connect(host: host || resolved_host, port: port, hostname: @hostname, ack: ack, &block)
end
end

Expand Down
Loading

0 comments on commit 9b17037

Please sign in to comment.