Skip to content

Commit

Permalink
Don't update retry state when failed to get ack response. fix #1665
Browse files Browse the repository at this point in the history
Ack failure is not buffer flush error so it should not update retry state.
The problematic node is disabled and its node is excluded at next flush after rollback.

To resolve this problem, we add update_retry parameter to Output#rollback_write.
This parameter controls rollback behaviour considers retry or not.
  • Loading branch information
repeatedly committed Sep 6, 2017
1 parent d14ea82 commit 2f40abb
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 7 deletions.
6 changes: 3 additions & 3 deletions lib/fluent/plugin/out_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ def read_ack_from_sock(sock, unpacker)
if raw_data.empty?
log.warn "destination node closed the connection. regard it as unavailable.", host: info.node.host, port: info.node.port
info.node.disable!
rollback_write(info.chunk_id)
rollback_write(info.chunk_id, false)
return nil
else
unpacker.feed(raw_data)
Expand All @@ -442,7 +442,7 @@ def read_ack_from_sock(sock, unpacker)
if res['ack'] != info.chunk_id_base64
# Some errors may have occured when ack and chunk id is different, so send the chunk again.
log.warn "ack in response and chunk id in sent data are different", chunk_id: dump_unique_id_hex(info.chunk_id), ack: res['ack']
rollback_write(info.chunk_id)
rollback_write(info.chunk_id, false)
return nil
else
log.trace "got a correct ack response", chunk_id: dump_unique_id_hex(info.chunk_id)
Expand Down Expand Up @@ -483,7 +483,7 @@ def ack_reader
log.warn "no response from node. regard it as unavailable.", host: info.node.host, port: info.node.port
info.node.disable!
info.sock.close rescue nil
rollback_write(info.chunk_id)
rollback_write(info.chunk_id, false)
else
sockets << info.sock
new_list << info
Expand Down
10 changes: 6 additions & 4 deletions lib/fluent/plugin/output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ def acts_as_secondary(primary)

(class << self; self; end).module_eval do
define_method(:commit_write){ |chunk_id| @primary_instance.commit_write(chunk_id, delayed: delayed_commit, secondary: true) }
define_method(:rollback_write){ |chunk_id| @primary_instance.rollback_write(chunk_id) }
define_method(:rollback_write){ |chunk_id, update_retry = true| @primary_instance.rollback_write(chunk_id, update_retry) }
end
end

Expand Down Expand Up @@ -963,7 +963,7 @@ def commit_write(chunk_id, delayed: @delayed_commit, secondary: false)
end
end

def rollback_write(chunk_id)
def rollback_write(chunk_id, update_retry = true)
# This API is to rollback chunks explicitly from plugins.
# 3rd party plugins can depend it on automatic rollback of #try_rollback_write
@dequeued_chunks_mutex.synchronize do
Expand All @@ -974,8 +974,10 @@ def rollback_write(chunk_id)
# in many cases, false can be just ignored
if @buffer.takeback_chunk(chunk_id)
@counters_monitor.synchronize{ @rollback_count += 1 }
primary = @as_secondary ? @primary_instance : self
primary.update_retry_state(chunk_id, @as_secondary)
if update_retry
primary = @as_secondary ? @primary_instance : self
primary.update_retry_state(chunk_id, @as_secondary)
end
true
else
false
Expand Down

0 comments on commit 2f40abb

Please sign in to comment.