Skip to content

Commit

Permalink
Merge pull request #1686 from fluent/fix-busy-flush-in-out_forward
Browse files Browse the repository at this point in the history
Don't update retry state when failed to get ack response. fix #1665
  • Loading branch information
repeatedly authored Nov 29, 2017
2 parents 5342dd2 + 99d19a4 commit 1b9cddd
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 7 deletions.
6 changes: 3 additions & 3 deletions lib/fluent/plugin/out_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ def read_ack_from_sock(sock, unpacker)
if raw_data.empty?
log.warn "destination node closed the connection. regard it as unavailable.", host: info.node.host, port: info.node.port
info.node.disable!
rollback_write(info.chunk_id)
rollback_write(info.chunk_id, update_retry: false)
return nil
else
unpacker.feed(raw_data)
Expand All @@ -442,7 +442,7 @@ def read_ack_from_sock(sock, unpacker)
if res['ack'] != info.chunk_id_base64
# Some errors may have occured when ack and chunk id is different, so send the chunk again.
log.warn "ack in response and chunk id in sent data are different", chunk_id: dump_unique_id_hex(info.chunk_id), ack: res['ack']
rollback_write(info.chunk_id)
rollback_write(info.chunk_id, update_retry: false)
return nil
else
log.trace "got a correct ack response", chunk_id: dump_unique_id_hex(info.chunk_id)
Expand Down Expand Up @@ -483,7 +483,7 @@ def ack_reader
log.warn "no response from node. regard it as unavailable.", host: info.node.host, port: info.node.port
info.node.disable!
info.sock.close rescue nil
rollback_write(info.chunk_id)
rollback_write(info.chunk_id, update_retry: false)
else
sockets << info.sock
new_list << info
Expand Down
12 changes: 8 additions & 4 deletions lib/fluent/plugin/output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ def acts_as_secondary(primary)

singleton_class.module_eval do
define_method(:commit_write){ |chunk_id| @primary_instance.commit_write(chunk_id, delayed: delayed_commit, secondary: true) }
define_method(:rollback_write){ |chunk_id| @primary_instance.rollback_write(chunk_id) }
define_method(:rollback_write){ |chunk_id, update_retry: true| @primary_instance.rollback_write(chunk_id, update_retry) }
end
end

Expand Down Expand Up @@ -991,7 +991,9 @@ def commit_write(chunk_id, delayed: @delayed_commit, secondary: false)
end
end

def rollback_write(chunk_id)
# update_retry parameter is for preventing busy loop by async write
# We will remove this parameter by re-design retry_state management between threads.
def rollback_write(chunk_id, update_retry: true)
# This API is to rollback chunks explicitly from plugins.
# 3rd party plugins can depend it on automatic rollback of #try_rollback_write
@dequeued_chunks_mutex.synchronize do
Expand All @@ -1002,8 +1004,10 @@ def rollback_write(chunk_id)
# in many cases, false can be just ignored
if @buffer.takeback_chunk(chunk_id)
@counters_monitor.synchronize{ @rollback_count += 1 }
primary = @as_secondary ? @primary_instance : self
primary.update_retry_state(chunk_id, @as_secondary)
if update_retry
primary = @as_secondary ? @primary_instance : self
primary.update_retry_state(chunk_id, @as_secondary)
end
true
else
false
Expand Down

0 comments on commit 1b9cddd

Please sign in to comment.