diff --git a/lib/fluent/plugin/out_forward.rb b/lib/fluent/plugin/out_forward.rb index 24f790cc7a..2f5a936906 100644 --- a/lib/fluent/plugin/out_forward.rb +++ b/lib/fluent/plugin/out_forward.rb @@ -433,7 +433,7 @@ def read_ack_from_sock(sock, unpacker) if raw_data.empty? log.warn "destination node closed the connection. regard it as unavailable.", host: info.node.host, port: info.node.port info.node.disable! - rollback_write(info.chunk_id) + rollback_write(info.chunk_id, update_retry: false) return nil else unpacker.feed(raw_data) @@ -442,7 +442,7 @@ def read_ack_from_sock(sock, unpacker) if res['ack'] != info.chunk_id_base64 # Some errors may have occured when ack and chunk id is different, so send the chunk again. log.warn "ack in response and chunk id in sent data are different", chunk_id: dump_unique_id_hex(info.chunk_id), ack: res['ack'] - rollback_write(info.chunk_id) + rollback_write(info.chunk_id, update_retry: false) return nil else log.trace "got a correct ack response", chunk_id: dump_unique_id_hex(info.chunk_id) @@ -483,7 +483,7 @@ def ack_reader log.warn "no response from node. regard it as unavailable.", host: info.node.host, port: info.node.port info.node.disable! info.sock.close rescue nil - rollback_write(info.chunk_id) + rollback_write(info.chunk_id, update_retry: false) else sockets << info.sock new_list << info diff --git a/lib/fluent/plugin/output.rb b/lib/fluent/plugin/output.rb index 8a085af301..b90c1d04c2 100644 --- a/lib/fluent/plugin/output.rb +++ b/lib/fluent/plugin/output.rb @@ -226,7 +226,7 @@ def acts_as_secondary(primary) singleton_class.module_eval do define_method(:commit_write){ |chunk_id| @primary_instance.commit_write(chunk_id, delayed: delayed_commit, secondary: true) } - define_method(:rollback_write){ |chunk_id| @primary_instance.rollback_write(chunk_id) } + define_method(:rollback_write){ |chunk_id, update_retry: true| @primary_instance.rollback_write(chunk_id, update_retry) } end end @@ -991,7 +991,9 @@ def commit_write(chunk_id, delayed: @delayed_commit, secondary: false) end end - def rollback_write(chunk_id) + # update_retry parameter is for preventing busy loop by async write + # We will remove this parameter by re-design retry_state management between threads. + def rollback_write(chunk_id, update_retry: true) # This API is to rollback chunks explicitly from plugins. # 3rd party plugins can depend it on automatic rollback of #try_rollback_write @dequeued_chunks_mutex.synchronize do @@ -1002,8 +1004,10 @@ def rollback_write(chunk_id) # in many cases, false can be just ignored if @buffer.takeback_chunk(chunk_id) @counters_monitor.synchronize{ @rollback_count += 1 } - primary = @as_secondary ? @primary_instance : self - primary.update_retry_state(chunk_id, @as_secondary) + if update_retry + primary = @as_secondary ? @primary_instance : self + primary.update_retry_state(chunk_id, @as_secondary) + end true else false