diff --git a/lib/fluent/plugin/out_forward.rb b/lib/fluent/plugin/out_forward.rb index 00bc2cc125..712447b29f 100644 --- a/lib/fluent/plugin/out_forward.rb +++ b/lib/fluent/plugin/out_forward.rb @@ -431,6 +431,7 @@ def read_ack_from_sock(sock, unpacker) if raw_data.empty? log.warn "destination node closed the connection. regard it as unavailable.", host: info.node.host, port: info.node.port info.node.disable! + rollback_write(info.chunk_id) return nil else unpacker.feed(raw_data) @@ -450,6 +451,8 @@ def read_ack_from_sock(sock, unpacker) log.error "unexpected error while receiving ack message", error: e log.error_backtrace ensure + info.sock.close_write rescue nil + info.sock.close rescue nil @sock_ack_waiting_mutex.synchronize do @sock_ack_waiting.delete(info) end @@ -492,7 +495,7 @@ def ack_reader readable_sockets.each do |sock| chunk_id = read_ack_from_sock(sock, unpacker) - commit_write(chunk_id) + commit_write(chunk_id) if chunk_id end rescue => e log.error "unexpected error while receiving ack", error: e