Skip to content

Commit

Permalink
Merge pull request #3649 from fluent/issue-3609-2
Browse files Browse the repository at this point in the history
Fix wrong calcuration of retry interval and detecting retry limit
  • Loading branch information
ashie authored Mar 28, 2022
2 parents 6927305 + a4f15f9 commit 2da6e1b
Show file tree
Hide file tree
Showing 5 changed files with 664 additions and 85 deletions.
75 changes: 40 additions & 35 deletions lib/fluent/plugin/output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1275,52 +1275,57 @@ def update_retry_state(chunk_id, using_secondary, error = nil)

unless @retry
@retry = retry_state(@buffer_config.retry_randomize)

if @retry.limit?
# @retry_max_times == 0, fail imediately by the following block
else
if error
log.warn "failed to flush the buffer.", retry_times: @retry.steps, next_retry_time: @retry.next_time.round, chunk: chunk_id_hex, error: error
log.warn_backtrace error.backtrace
end
return
handle_limit_reached(error)
elsif error
log_retry_error(error, chunk_id_hex, using_secondary)
end

return
end

# @retry exists

if @retry.limit?
if error
records = @buffer.queued_records
msg = "failed to flush the buffer, and hit limit for retries. dropping all chunks in the buffer queue."
log.error msg, retry_times: @retry.steps, records: records, error: error
log.error_backtrace error.backtrace
end
@buffer.clear_queue!
log.debug "buffer queue cleared"
@retry = nil
# Ensure that the current time is greater than or equal to @retry.next_time to avoid the situation when
# @retry.step is called almost as many times as the number of flush threads in a short time.
if Time.now >= @retry.next_time
@retry.step
else
# Ensure that the current time is greater than or equal to @retry.next_time to avoid the situation when
# @retry.step is called almost as many times as the number of flush threads in a short time.
if Time.now >= @retry.next_time
@retry.step
else
@retry.recalc_next_time # to prevent all flush threads from retrying at the same time
end
if error
if using_secondary
msg = "failed to flush the buffer with secondary output."
log.warn msg, retry_times: @retry.steps, next_retry_time: @retry.next_time.round, chunk: chunk_id_hex, error: error
log.warn_backtrace error.backtrace
else
msg = "failed to flush the buffer."
log.warn msg, retry_times: @retry.steps, next_retry_time: @retry.next_time.round, chunk: chunk_id_hex, error: error
log.warn_backtrace error.backtrace
end
end
@retry.recalc_next_time # to prevent all flush threads from retrying at the same time
end

if @retry.limit?
handle_limit_reached(error)
elsif error
log_retry_error(error, chunk_id_hex, using_secondary)
end
end
end

def log_retry_error(error, chunk_id_hex, using_secondary)
return unless error
if using_secondary
msg = "failed to flush the buffer with secondary output."
else
msg = "failed to flush the buffer."
end
log.warn(msg, retry_times: @retry.steps, next_retry_time: @retry.next_time.round, chunk: chunk_id_hex, error: error)
log.warn_backtrace(error.backtrace)
end

def handle_limit_reached(error)
if error
records = @buffer.queued_records
msg = "Hit limit for retries. dropping all chunks in the buffer queue."
log.error msg, retry_times: @retry.steps, records: records, error: error
log.error_backtrace error.backtrace
end
@buffer.clear_queue!
log.debug "buffer queue cleared"
@retry = nil
end

def retry_state(randomize)
if @secondary
retry_state_create(
Expand Down
20 changes: 15 additions & 5 deletions lib/fluent/plugin_helper/retry_state.rb
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ def initialize(title, wait, timeout, forever, max_steps, randomize, randomize_wi

@timeout = timeout
@timeout_at = @start + timeout
@has_reached_timeout = false
@has_timed_out = false
@current = :primary

if randomize_width < 0 || randomize_width > 0.5
Expand Down Expand Up @@ -98,7 +100,7 @@ def calc_next_time
naive
end
elsif @current == :secondary
naive = naive_next_time(@steps - @secondary_transition_steps + 1)
naive = naive_next_time(@steps - @secondary_transition_steps)
if naive >= @timeout_at
@timeout_at
else
Expand All @@ -123,7 +125,15 @@ def step
@current = :secondary
@secondary_transition_steps = @steps
end

@next_time = calc_next_time

if @has_reached_timeout
@has_timed_out = @next_time >= @timeout_at
else
@has_reached_timeout = @next_time >= @timeout_at
end

nil
end

Expand All @@ -135,7 +145,7 @@ def limit?
if @forever
false
else
@next_time >= @timeout_at || !!(@max_steps && @steps >= @max_steps)
@has_timed_out || !!(@max_steps && @steps >= @max_steps)
end
end
end
Expand All @@ -159,13 +169,13 @@ def naive_next_time(retry_next_times)
def calc_max_retry_timeout(max_steps)
result = 0
max_steps.times { |i|
result += calc_interval(i + 1)
result += calc_interval(i)
}
result
end

def calc_interval(num)
interval = raw_interval(num - 1)
interval = raw_interval(num)
if @max_interval && interval > @max_interval
@max_interval
else
Expand All @@ -175,7 +185,7 @@ def calc_interval(num)
# Calculate previous finite value to avoid inf related errors. If this re-computing is heavy, use cache.
until interval.finite?
num -= 1
interval = raw_interval(num - 1)
interval = raw_interval(num)
end
interval
end
Expand Down
14 changes: 7 additions & 7 deletions test/plugin/test_output_as_buffered_retries.rb
Original file line number Diff line number Diff line change
Expand Up @@ -140,13 +140,13 @@ def get_log_time(msg, logs)

retry_state = @i.retry_state( @i.buffer_config.retry_randomize )
retry_state.step
assert_equal 1, (retry_state.next_time - now)
retry_state.step
assert_equal (1 * (2 ** 1)), (retry_state.next_time - now)
retry_state.step
assert_equal (1 * (2 ** 2)), (retry_state.next_time - now)
retry_state.step
assert_equal (1 * (2 ** 3)), (retry_state.next_time - now)
retry_state.step
assert_equal (1 * (2 ** 4)), (retry_state.next_time - now)
end

test 'does retries correctly when #write fails' do
Expand Down Expand Up @@ -332,7 +332,7 @@ def get_log_time(msg, logs)
@i.emit_events("test.tag.3", dummy_event_stream())

logs = @i.log.out.logs
assert{ logs.any?{|l| l.include?("[error]: failed to flush the buffer, and hit limit for retries. dropping all chunks in the buffer queue.") } }
assert{ logs.any?{|l| l.include?("[error]: Hit limit for retries. dropping all chunks in the buffer queue.") } }
end

test 'output plugin give retries up by retry_max_times, and clear queue in buffer' do
Expand Down Expand Up @@ -409,7 +409,7 @@ def get_log_time(msg, logs)
@i.emit_events("test.tag.3", dummy_event_stream())

logs = @i.log.out.logs
assert{ logs.any?{|l| l.include?("[error]: failed to flush the buffer, and hit limit for retries. dropping all chunks in the buffer queue.") && l.include?("retry_times=10") } }
assert{ logs.any?{|l| l.include?("[error]: Hit limit for retries. dropping all chunks in the buffer queue.") && l.include?("retry_times=10") } }

assert{ @i.buffer.queue.size == 0 }
assert{ @i.buffer.stage.size == 1 }
Expand Down Expand Up @@ -607,7 +607,7 @@ def get_log_time(msg, logs)
logs = @i.log.out.logs

target_time = Time.parse("2016-04-13 18:35:31 -0700")
target_msg = "[error]: failed to flush the buffer, and hit limit for retries. dropping all chunks in the buffer queue."
target_msg = "[error]: Hit limit for retries. dropping all chunks in the buffer queue."
assert{ logs.any?{|l| l.include?(target_msg) } }

log_time = get_log_time(target_msg, logs)
Expand Down Expand Up @@ -695,7 +695,7 @@ def get_log_time(msg, logs)
@i.emit_events("test.tag.3", dummy_event_stream())

logs = @i.log.out.logs
assert{ logs.any?{|l| l.include?("[error]: failed to flush the buffer, and hit limit for retries. dropping all chunks in the buffer queue.") && l.include?("retry_times=10") } }
assert{ logs.any?{|l| l.include?("[error]: Hit limit for retries. dropping all chunks in the buffer queue.") && l.include?("retry_times=10") } }

assert{ @i.buffer.queue.size == 0 }
assert{ @i.buffer.stage.size == 1 }
Expand Down Expand Up @@ -743,7 +743,7 @@ def get_log_time(msg, logs)

assert(@i.write_count == 1)
assert(@i.num_errors == 1)
assert(@i.log.out.logs.any?{|l| l.include?("[error]: failed to flush the buffer, and hit limit for retries. dropping all chunks in the buffer queue.") && l.include?("retry_times=0") })
assert(@i.log.out.logs.any?{|l| l.include?("[error]: Hit limit for retries. dropping all chunks in the buffer queue.") && l.include?("retry_times=0") })
assert(@i.buffer.queue.size == 0)
assert(@i.buffer.stage.size == 1)
assert(@i.buffer.queue.all?{|c| c.empty? })
Expand Down
2 changes: 1 addition & 1 deletion test/plugin/test_output_as_buffered_secondary.rb
Original file line number Diff line number Diff line change
Expand Up @@ -874,7 +874,7 @@ def dummy_event_stream
end

logs = @i.log.out.logs
assert{ logs.any?{|l| l.include?("[error]: failed to flush the buffer, and hit limit for retries. dropping all chunks in the buffer queue.") } }
assert{ logs.any?{|l| l.include?("[error]: Hit limit for retries. dropping all chunks in the buffer queue.") } }

assert{ now >= first_failure + 60 }
end
Expand Down
Loading

0 comments on commit 2da6e1b

Please sign in to comment.