Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

out_forward: fix error of ack handling conflict on stopping with require_ack_response enabled #4030

Merged
merged 7 commits into from
Feb 10, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions lib/fluent/plugin/out_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -521,8 +521,8 @@ def ack_check(select_interval)
when AckHandler::Result::SUCCESS
commit_write(chunk_id)
when AckHandler::Result::FAILED
node.disable!
rollback_write(chunk_id, update_retry: false)
node.disable! if node
daipom marked this conversation as resolved.
Show resolved Hide resolved
rollback_write(chunk_id, update_retry: false) if chunk_id
when AckHandler::Result::CHUNKID_UNMATCHED
rollback_write(chunk_id, update_retry: false)
else
Expand Down
23 changes: 19 additions & 4 deletions lib/fluent/plugin/out_forward/ack_handler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,13 @@ def collect_response(select_interval)
@ack_waitings = new_list
end

readable_sockets, _, _ = IO.select(sockets, nil, nil, select_interval)
begin
readable_sockets, _, _ = IO.select(sockets, nil, nil, select_interval)
rescue IOError
@log.info "connection closed while waiting for readable sockets"
readable_sockets = nil
end

if readable_sockets
readable_sockets.each do |sock|
results << read_ack_from_sock(sock)
Expand Down Expand Up @@ -109,13 +115,22 @@ def read_ack_from_sock(sock)
raw_data = sock.instance_of?(Fluent::PluginHelper::Socket::WrappedSocket::TLS) ? sock.readpartial(@read_length) : sock.recv(@read_length)
rescue Errno::ECONNRESET, EOFError # ECONNRESET for #recv, #EOFError for #readpartial
raw_data = ''
rescue IOError
@log.info "socket closed while receiving ack response"
return nil, Result::FAILED
end

info = find(sock)

# When connection is closed by remote host, socket is ready to read and #recv returns an empty string that means EOF.
# If this happens we assume the data wasn't delivered and retry it.
if raw_data.empty?
if info.nil?
# The info can be deleted by another thread during `sock.recv()` and `find()`.
# This is OK since another thread has completed to process the ack, so we can skip this.
# Note: exclusion mechanism about `collect_response()` may need to be considered.
@log.debug "could not find the ack info. this ack may be processed by another thread."
return nil, Result::FAILED
elsif raw_data.empty?
# When connection is closed by remote host, socket is ready to read and #recv returns an empty string that means EOF.
# If this happens we assume the data wasn't delivered and retry it.
@log.warn 'destination node closed the connection. regard it as unavailable.', host: info.node.host, port: info.node.port
# info.node.disable!
return info, Result::FAILED
Expand Down
39 changes: 39 additions & 0 deletions test/plugin/out_forward/test_ack_handler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -98,4 +98,43 @@ class AckHandlerTest < Test::Unit::TestCase
w.close rescue nil
end
end

# ForwardOutput uses AckHandler in multiple threads, so we need to assume this case.
# If exclusive control for this case is implemented, this test may not be necessary.
test 'raises no error when another thread closes a socket' do
ack_handler = Fluent::Plugin::ForwardOutput::AckHandler.new(timeout: 10, log: $log, read_length: 100)

node = flexmock('node', host: '127.0.0.1', port: '1000') # for log
chunk_id = 'chunk_id 111'
ack = ack_handler.create_ack(chunk_id, node)

r, w = IO.pipe
begin
w.write(chunk_id)
stub(r).recv { |_|
sleep(1) # To ensure that multiple threads select the socket before closing.
raise IOError, 'stream closed in another thread' if r.closed?
MessagePack.pack({ 'ack' => Base64.encode64('chunk_id 111') })
}
ack.enqueue(r)

threads = []
2.times do
threads << Thread.new do
ack_handler.collect_response(1) do |cid, n, s, ret|
s&.close
end
end
end

assert_true threads.map{ |t| t.join(10) }.all?
assert_false(
$log.out.logs.any? { |log| log.include?('[error]') },
$log.out.logs.select{ |log| log.include?('[error]') }.join('\n')
)
ensure
r.close rescue nil
w.close rescue nil
end
end
end