From 2f40abb9b8a60d3dce555b23e89617feca5d7f9d Mon Sep 17 00:00:00 2001 From: Masahiro Nakagawa Date: Thu, 7 Sep 2017 07:06:43 +0900 Subject: [PATCH 1/3] Don't update retry state when failed to get ack response. fix #1665 Ack failure is not buffer flush error so it should not update retry state. The problematic node is disabled and its node is excluded at next flush after rollback. To resolve this problem, we add update_retry parameter to Output#rollback_write. This parameter controls rollback behaviour considers retry or not. --- lib/fluent/plugin/out_forward.rb | 6 +++--- lib/fluent/plugin/output.rb | 10 ++++++---- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/lib/fluent/plugin/out_forward.rb b/lib/fluent/plugin/out_forward.rb index b99cd5843b..bf36dc7963 100644 --- a/lib/fluent/plugin/out_forward.rb +++ b/lib/fluent/plugin/out_forward.rb @@ -433,7 +433,7 @@ def read_ack_from_sock(sock, unpacker) if raw_data.empty? log.warn "destination node closed the connection. regard it as unavailable.", host: info.node.host, port: info.node.port info.node.disable! - rollback_write(info.chunk_id) + rollback_write(info.chunk_id, false) return nil else unpacker.feed(raw_data) @@ -442,7 +442,7 @@ def read_ack_from_sock(sock, unpacker) if res['ack'] != info.chunk_id_base64 # Some errors may have occured when ack and chunk id is different, so send the chunk again. log.warn "ack in response and chunk id in sent data are different", chunk_id: dump_unique_id_hex(info.chunk_id), ack: res['ack'] - rollback_write(info.chunk_id) + rollback_write(info.chunk_id, false) return nil else log.trace "got a correct ack response", chunk_id: dump_unique_id_hex(info.chunk_id) @@ -483,7 +483,7 @@ def ack_reader log.warn "no response from node. regard it as unavailable.", host: info.node.host, port: info.node.port info.node.disable! info.sock.close rescue nil - rollback_write(info.chunk_id) + rollback_write(info.chunk_id, false) else sockets << info.sock new_list << info diff --git a/lib/fluent/plugin/output.rb b/lib/fluent/plugin/output.rb index b622720b37..df6689ccec 100644 --- a/lib/fluent/plugin/output.rb +++ b/lib/fluent/plugin/output.rb @@ -225,7 +225,7 @@ def acts_as_secondary(primary) (class << self; self; end).module_eval do define_method(:commit_write){ |chunk_id| @primary_instance.commit_write(chunk_id, delayed: delayed_commit, secondary: true) } - define_method(:rollback_write){ |chunk_id| @primary_instance.rollback_write(chunk_id) } + define_method(:rollback_write){ |chunk_id, update_retry = true| @primary_instance.rollback_write(chunk_id, update_retry) } end end @@ -963,7 +963,7 @@ def commit_write(chunk_id, delayed: @delayed_commit, secondary: false) end end - def rollback_write(chunk_id) + def rollback_write(chunk_id, update_retry = true) # This API is to rollback chunks explicitly from plugins. # 3rd party plugins can depend it on automatic rollback of #try_rollback_write @dequeued_chunks_mutex.synchronize do @@ -974,8 +974,10 @@ def rollback_write(chunk_id) # in many cases, false can be just ignored if @buffer.takeback_chunk(chunk_id) @counters_monitor.synchronize{ @rollback_count += 1 } - primary = @as_secondary ? @primary_instance : self - primary.update_retry_state(chunk_id, @as_secondary) + if update_retry + primary = @as_secondary ? @primary_instance : self + primary.update_retry_state(chunk_id, @as_secondary) + end true else false From 59a47a194aca49d1eccbc95cdfa3fec77209cd6d Mon Sep 17 00:00:00 2001 From: Masahiro Nakagawa Date: Tue, 28 Nov 2017 17:14:49 +0900 Subject: [PATCH 2/3] Use keyword argument instead of default argument --- lib/fluent/plugin/out_forward.rb | 6 +++--- lib/fluent/plugin/output.rb | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/lib/fluent/plugin/out_forward.rb b/lib/fluent/plugin/out_forward.rb index bf36dc7963..f6fe8a6329 100644 --- a/lib/fluent/plugin/out_forward.rb +++ b/lib/fluent/plugin/out_forward.rb @@ -433,7 +433,7 @@ def read_ack_from_sock(sock, unpacker) if raw_data.empty? log.warn "destination node closed the connection. regard it as unavailable.", host: info.node.host, port: info.node.port info.node.disable! - rollback_write(info.chunk_id, false) + rollback_write(info.chunk_id, update_retry: false) return nil else unpacker.feed(raw_data) @@ -442,7 +442,7 @@ def read_ack_from_sock(sock, unpacker) if res['ack'] != info.chunk_id_base64 # Some errors may have occured when ack and chunk id is different, so send the chunk again. log.warn "ack in response and chunk id in sent data are different", chunk_id: dump_unique_id_hex(info.chunk_id), ack: res['ack'] - rollback_write(info.chunk_id, false) + rollback_write(info.chunk_id, update_retry: false) return nil else log.trace "got a correct ack response", chunk_id: dump_unique_id_hex(info.chunk_id) @@ -483,7 +483,7 @@ def ack_reader log.warn "no response from node. regard it as unavailable.", host: info.node.host, port: info.node.port info.node.disable! info.sock.close rescue nil - rollback_write(info.chunk_id, false) + rollback_write(info.chunk_id, update_retry: false) else sockets << info.sock new_list << info diff --git a/lib/fluent/plugin/output.rb b/lib/fluent/plugin/output.rb index df6689ccec..881d6c11ff 100644 --- a/lib/fluent/plugin/output.rb +++ b/lib/fluent/plugin/output.rb @@ -225,7 +225,7 @@ def acts_as_secondary(primary) (class << self; self; end).module_eval do define_method(:commit_write){ |chunk_id| @primary_instance.commit_write(chunk_id, delayed: delayed_commit, secondary: true) } - define_method(:rollback_write){ |chunk_id, update_retry = true| @primary_instance.rollback_write(chunk_id, update_retry) } + define_method(:rollback_write){ |chunk_id, update_retry: true| @primary_instance.rollback_write(chunk_id, update_retry) } end end @@ -963,7 +963,7 @@ def commit_write(chunk_id, delayed: @delayed_commit, secondary: false) end end - def rollback_write(chunk_id, update_retry = true) + def rollback_write(chunk_id, update_retry: true) # This API is to rollback chunks explicitly from plugins. # 3rd party plugins can depend it on automatic rollback of #try_rollback_write @dequeued_chunks_mutex.synchronize do From 99d19a4e5ca563779d04e309201b0bc60c6b2ea3 Mon Sep 17 00:00:00 2001 From: Masahiro Nakagawa Date: Wed, 29 Nov 2017 17:46:45 +0900 Subject: [PATCH 3/3] Add note for update_retry parameter --- lib/fluent/plugin/output.rb | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lib/fluent/plugin/output.rb b/lib/fluent/plugin/output.rb index 881d6c11ff..045319e012 100644 --- a/lib/fluent/plugin/output.rb +++ b/lib/fluent/plugin/output.rb @@ -963,6 +963,8 @@ def commit_write(chunk_id, delayed: @delayed_commit, secondary: false) end end + # update_retry parameter is for preventing busy loop by async write + # We will remove this parameter by re-design retry_state management between threads. def rollback_write(chunk_id, update_retry: true) # This API is to rollback chunks explicitly from plugins. # 3rd party plugins can depend it on automatic rollback of #try_rollback_write