Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ack handler #2516

Merged
merged 29 commits into from
Jul 31, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
f467eda
Merge condtion
ganmacs Jul 19, 2019
d452d50
Wrap mutex and waiting queue
ganmacs Jul 19, 2019
1bc39a4
create ACKWaitingSockInfo in enqueue
ganmacs Jul 19, 2019
69af0cb
share ack_handler between nodes
ganmacs Jul 19, 2019
a94646d
Copy read_ack_from_sock
ganmacs Jul 22, 2019
77488a1
log
ganmacs Jul 22, 2019
a21e94c
trim receiver
ganmacs Jul 22, 2019
f22490a
make find and delete private
ganmacs Jul 22, 2019
f6d0765
move code and return chunk_id directly
ganmacs Jul 22, 2019
a1d87f0
Return the value success or not and info
ganmacs Jul 22, 2019
c9f4e9a
copy ack_reader
ganmacs Jul 22, 2019
5d6e700
same class
ganmacs Jul 22, 2019
0d4435b
log
ganmacs Jul 22, 2019
41e4b50
pass success or not to caller
ganmacs Jul 22, 2019
f6259c0
readable_sockets can be nil
ganmacs Jul 22, 2019
2623750
Move method position and remove unused code
ganmacs Jul 25, 2019
f8aecda
cosmetic change
ganmacs Jul 25, 2019
c4b7629
extract as file
ganmacs Jul 22, 2019
3c637ac
move unapcker into ack_handler
ganmacs Jul 22, 2019
08a3954
cosmetic change
ganmacs Jul 22, 2019
1dcd3dc
Add result module
ganmacs Jul 22, 2019
3bac793
chunk_id is better
ganmacs Jul 25, 2019
11a8232
Fix failed test
ganmacs Jul 25, 2019
f6e4dda
Add test for ack_handler
ganmacs Jul 26, 2019
8940b4d
Add require for
ganmacs Jul 26, 2019
134070d
Change method name to proper name
ganmacs Jul 26, 2019
cce3112
Change timeout to keyword arg
ganmacs Jul 26, 2019
bf149c8
Calculate expired_time in advance
ganmacs Jul 30, 2019
07a85d3
add BUG prefix
ganmacs Jul 31, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 36 additions & 111 deletions lib/fluent/plugin/out_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
require 'fluent/plugin/out_forward/failure_detector'
require 'fluent/plugin/out_forward/error'
require 'fluent/plugin/out_forward/connection_manager'
require 'fluent/plugin/out_forward/ack_handler'

module Fluent::Plugin
class ForwardOutput < Output
Expand Down Expand Up @@ -156,8 +157,6 @@ def initialize
@thread = nil

@usock = nil
@sock_ack_waiting = nil
@sock_ack_waiting_mutex = nil
@keep_alive_watcher_interval = 5 # TODO
end

Expand Down Expand Up @@ -201,6 +200,7 @@ def configure(conf)
end
end

@ack_handler = @require_ack_response ? AckHandler.new(timeout: @ack_response_timeout, log: @log, read_length: @read_length) : nil
socket_cache = @keepalive ? SocketCache.new(@keepalive_timeout, @log) : nil
@connection_manager = ConnectionManager.new(
log: @log,
Expand All @@ -215,9 +215,9 @@ def configure(conf)

log.info "adding forwarding server '#{name}'", host: server.host, port: server.port, weight: server.weight, plugin_id: plugin_id
if @heartbeat_type == :none
@nodes << NoneHeartbeatNode.new(self, server, failure: failure, connection_manager: @connection_manager)
@nodes << NoneHeartbeatNode.new(self, server, failure: failure, connection_manager: @connection_manager, ack_handler: @ack_handler)
else
node = Node.new(self, server, failure: failure, connection_manager: @connection_manager)
node = Node.new(self, server, failure: failure, connection_manager: @connection_manager, ack_handler: @ack_handler)
begin
node.validate_host_resolution!
rescue => e
Expand Down Expand Up @@ -259,13 +259,6 @@ def prefer_delayed_commit
def start
super

# Output#start sets @delayed_commit_timeout by @buffer_config.delayed_commit_timeout
# But it should be overwritten by ack_response_timeout to rollback chunks after timeout
if @ack_response_timeout && @delayed_commit_timeout != @ack_response_timeout
log.info "delayed_commit_timeout is overwritten by ack_response_timeout"
@delayed_commit_timeout = @ack_response_timeout + 2 # minimum ack_reader IO.select interval is 1s
end

@load_balancer = LoadBalancer.new(log)
@load_balancer.rebuild_weight_array(@nodes)

Expand All @@ -278,8 +271,13 @@ def start
end

if @require_ack_response
@sock_ack_waiting_mutex = Mutex.new
@sock_ack_waiting = []
# Output#start sets @delayed_commit_timeout by @buffer_config.delayed_commit_timeout
# But it should be overwritten by ack_response_timeout to rollback chunks after timeout
if @delayed_commit_timeout != @ack_response_timeout
log.info "delayed_commit_timeout is overwritten by ack_response_timeout"
@delayed_commit_timeout = @ack_response_timeout + 2 # minimum ack_reader IO.select interval is 1s
end

thread_create(:out_forward_receiving_ack, &method(:ack_reader))
end

Expand Down Expand Up @@ -323,26 +321,14 @@ def write(chunk)
@load_balancer.select_healthy_node { |node| node.send_data(tag, chunk) }
end

ACKWaitingSockInfo = Struct.new(:sock, :chunk_id, :chunk_id_base64, :node, :time, :timeout) do
def expired?(now)
time + timeout < now
end
end

def try_write(chunk)
log.trace "writing a chunk to destination", chunk_id: dump_unique_id_hex(chunk.unique_id)
if chunk.empty?
commit_write(chunk.unique_id)
return
end
tag = chunk.metadata.tag
sock, node = @load_balancer.select_healthy_node { |n| n.send_data(tag, chunk) }
chunk_id_base64 = Base64.encode64(chunk.unique_id)
current_time = Fluent::Clock.now
info = ACKWaitingSockInfo.new(sock, chunk.unique_id, chunk_id_base64, node, current_time, @ack_response_timeout)
@sock_ack_waiting_mutex.synchronize do
@sock_ack_waiting << info
end
@load_balancer.select_healthy_node { |n| n.send_data(tag, chunk) }
end

def create_transfer_socket(host, port, hostname, &block)
Expand Down Expand Up @@ -428,95 +414,40 @@ def on_purge_obsolete_socks
@connection_manager.purge_obsolete_socks
end

# return chunk id to be committed
def read_ack_from_sock(sock, unpacker)
begin
raw_data = sock.instance_of?(Fluent::PluginHelper::Socket::WrappedSocket::TLS) ? sock.readpartial(@read_length) : sock.recv(@read_length)
rescue Errno::ECONNRESET, EOFError # ECONNRESET for #recv, #EOFError for #readpartial
raw_data = ""
end
info = @sock_ack_waiting_mutex.synchronize{ @sock_ack_waiting.find{|i| i.sock == sock } }

# When connection is closed by remote host, socket is ready to read and #recv returns an empty string that means EOF.
# If this happens we assume the data wasn't delivered and retry it.
if raw_data.empty?
log.warn "destination node closed the connection. regard it as unavailable.", host: info.node.host, port: info.node.port
info.node.disable!
rollback_write(info.chunk_id, update_retry: false)
return nil
else
unpacker.feed(raw_data)
res = unpacker.read
log.trace "getting response from destination", host: info.node.host, port: info.node.port, chunk_id: dump_unique_id_hex(info.chunk_id), response: res
if res['ack'] != info.chunk_id_base64
# Some errors may have occurred when ack and chunk id is different, so send the chunk again.
log.warn "ack in response and chunk id in sent data are different", chunk_id: dump_unique_id_hex(info.chunk_id), ack: res['ack']
rollback_write(info.chunk_id, update_retry: false)
return nil
else
log.trace "got a correct ack response", chunk_id: dump_unique_id_hex(info.chunk_id)
end
return info.chunk_id
end
rescue => e
log.error "unexpected error while receiving ack message", error: e
log.error_backtrace
ensure
info.node.close(info.sock)
@sock_ack_waiting_mutex.synchronize do
@sock_ack_waiting.delete(info)
end
end

def ack_reader
select_interval = if @delayed_commit_timeout > 3
1
else
@delayed_commit_timeout / 3.0
end

unpacker = Fluent::Engine.msgpack_unpacker

while thread_current_running?
now = Fluent::Clock.now
sockets = []
begin
@sock_ack_waiting_mutex.synchronize do
new_list = []
@sock_ack_waiting.each do |info|
if info.expired?(now)
# There are 2 types of cases when no response has been received from socket:
# (1) the node does not support sending responses
# (2) the node does support sending response but responses have not arrived for some reasons.
log.warn "no response from node. regard it as unavailable.", host: info.node.host, port: info.node.port
info.node.disable!
info.node.close(info.sock)
rollback_write(info.chunk_id, update_retry: false)
else
sockets << info.sock
new_list << info
end
end
@sock_ack_waiting = new_list
end

readable_sockets, _, _ = IO.select(sockets, nil, nil, select_interval)
next unless readable_sockets
@ack_handler.collect_response(select_interval) do |chunk_id, node, sock, result|
@connection_manager.close(sock)

readable_sockets.each do |sock|
chunk_id = read_ack_from_sock(sock, unpacker)
commit_write(chunk_id) if chunk_id
case result
when AckHandler::Result::SUCCESS
commit_write(chunk_id)
when AckHandler::Result::FAILED
node.disable!
rollback_write(chunk_id, update_retry: false)
when AckHandler::Result::CHUNKID_UNMATCHED
rollback_write(chunk_id, update_retry: false)
else
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

result is 3 types so this else flow is for catching implementation bug, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. this condition is for the case I don't expect now happen.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If so, add BUG: to log message is better for notification.

log.warn "BUG: failed to verify certification while connecting (but not raised, why?)", host: host, fqdn: fqdn, error: err_name

log.warn("BUG: invalid status #{result} #{chunk_id}")

if chunk_id
rollback_write(chunk_id, update_retry: false)
end
end
rescue => e
log.error "unexpected error while receiving ack", error: e
log.error_backtrace
end
end
end

class Node
# @param connection_manager [Fluent::Plugin::ForwardOutput::ConnectionManager]
def initialize(sender, server, failure:, connection_manager:)
# @param ack_handler [Fluent::Plugin::ForwardOutput::AckHandler]
def initialize(sender, server, failure:, connection_manager:, ack_handler:)
@sender = sender
@log = sender.log
@compress = sender.compress
Expand Down Expand Up @@ -554,6 +485,7 @@ def initialize(sender, server, failure:, connection_manager:)
@resolved_once = false

@connection_manager = connection_manager
@ack_handler = ack_handler
end

attr_accessor :usock
Expand Down Expand Up @@ -635,7 +567,7 @@ def send_data_actual(sock, tag, chunk)
end

option = { 'size' => chunk.size, 'compressed' => @compress }
option['chunk'] = Base64.encode64(chunk.unique_id) if @sender.require_ack_response
option['chunk'] = Base64.encode64(chunk.unique_id) if @ack_handler

# https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1#packedforward-mode
# out_forward always uses str32 type for entries.
Expand All @@ -656,26 +588,19 @@ def send_data_actual(sock, tag, chunk)
end

def send_data(tag, chunk)
connect(nil, require_ack: @sender.require_ack_response) do |sock, ri|
ack = @ack_handler && @ack_handler.create_ack(chunk.unique_id, self)
connect(nil, ack: ack) do |sock, ri|
if ri.state != :established
establish_connection(sock, ri)
end

send_data_actual(sock, tag, chunk)

if @sender.require_ack_response
return sock # to read ACK from socket
end
end

heartbeat(false)
nil
end

def close(sock)
@connection_manager.close(sock)
end

# FORWARD_TCP_HEARTBEAT_DATA = FORWARD_HEADER + ''.to_msgpack + [].to_msgpack
#
# @return [Boolean] return true if it needs to rebuild nodes
Expand Down Expand Up @@ -785,8 +710,8 @@ def heartbeat(detect=true)

private

def connect(host = nil, require_ack: false, &block)
@connection_manager.connect(host: host || resolved_host, port: port, hostname: @hostname, require_ack: require_ack, &block)
def connect(host = nil, ack: false, &block)
@connection_manager.connect(host: host || resolved_host, port: port, hostname: @hostname, ack: ack, &block)
end
end

Expand Down
Loading