From ecbdf8d027960af3fd73bb3ceba41c43edcc0181 Mon Sep 17 00:00:00 2001 From: Masahiro Nakagawa Date: Wed, 24 May 2017 17:57:25 +0900 Subject: [PATCH 1/3] Close socket before delete waiting socket from list. fix #1484 --- lib/fluent/plugin/out_forward.rb | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lib/fluent/plugin/out_forward.rb b/lib/fluent/plugin/out_forward.rb index 00bc2cc125..f34a267692 100644 --- a/lib/fluent/plugin/out_forward.rb +++ b/lib/fluent/plugin/out_forward.rb @@ -450,6 +450,8 @@ def read_ack_from_sock(sock, unpacker) log.error "unexpected error while receiving ack message", error: e log.error_backtrace ensure + info.sock.close_write rescue nil + info.sock.close rescue nil @sock_ack_waiting_mutex.synchronize do @sock_ack_waiting.delete(info) end From f8c4554fc517dabc5e31a0058be76c2b3bcda229 Mon Sep 17 00:00:00 2001 From: Masahiro Nakagawa Date: Wed, 24 May 2017 18:04:08 +0900 Subject: [PATCH 2/3] Rollback chunk when sent node closed the conneciton --- lib/fluent/plugin/out_forward.rb | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/fluent/plugin/out_forward.rb b/lib/fluent/plugin/out_forward.rb index f34a267692..b81f681c9d 100644 --- a/lib/fluent/plugin/out_forward.rb +++ b/lib/fluent/plugin/out_forward.rb @@ -431,6 +431,7 @@ def read_ack_from_sock(sock, unpacker) if raw_data.empty? log.warn "destination node closed the connection. regard it as unavailable.", host: info.node.host, port: info.node.port info.node.disable! + rollback_write(info.chunk_id) return nil else unpacker.feed(raw_data) From ae69f72b71b34504dbd74cd1d57658b7096a672e Mon Sep 17 00:00:00 2001 From: Masahiro Nakagawa Date: Wed, 24 May 2017 18:06:45 +0900 Subject: [PATCH 3/3] Don't call commit_write when can't get ack --- lib/fluent/plugin/out_forward.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/fluent/plugin/out_forward.rb b/lib/fluent/plugin/out_forward.rb index b81f681c9d..712447b29f 100644 --- a/lib/fluent/plugin/out_forward.rb +++ b/lib/fluent/plugin/out_forward.rb @@ -495,7 +495,7 @@ def ack_reader readable_sockets.each do |sock| chunk_id = read_ack_from_sock(sock, unpacker) - commit_write(chunk_id) + commit_write(chunk_id) if chunk_id end rescue => e log.error "unexpected error while receiving ack", error: e