Skip to content

Commit

Permalink
Fix timeout handling
Browse files Browse the repository at this point in the history
Add 2 states to control timeout.

Signed-off-by: Daijiro Fukuda <[email protected]>
  • Loading branch information
daipom authored and ashie committed Mar 24, 2022
1 parent 09f0aa5 commit 4819ccf
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 36 deletions.
16 changes: 8 additions & 8 deletions lib/fluent/plugin/output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1290,16 +1290,17 @@ def update_retry_state(chunk_id, using_secondary, error = nil)

# @retry exists

# Ensure that the current time is greater than or equal to @retry.next_time to avoid the situation when
# @retry.step is called almost as many times as the number of flush threads in a short time.
if Time.now >= @retry.next_time
@retry.step
else
@retry.recalc_next_time # to prevent all flush threads from retrying at the same time
end

if @retry.limit?
handle_limit_reached(error)
else
# Ensure that the current time is greater than or equal to @retry.next_time to avoid the situation when
# @retry.step is called almost as many times as the number of flush threads in a short time.
if Time.now >= @retry.next_time
@retry.step
else
@retry.recalc_next_time # to prevent all flush threads from retrying at the same time
end
if error
if using_secondary
msg = "failed to flush the buffer with secondary output."
Expand All @@ -1311,7 +1312,6 @@ def update_retry_state(chunk_id, using_secondary, error = nil)
log.warn_backtrace error.backtrace
end
end
handle_limit_reached(error) if @retry.limit_step?
end
end
end
Expand Down
27 changes: 12 additions & 15 deletions lib/fluent/plugin_helper/retry_state.rb
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ def initialize(title, wait, timeout, forever, max_steps, randomize, randomize_wi

@timeout = timeout
@timeout_at = @start + timeout
@has_reached_timeout = false
@has_timeouted = false
@current = :primary

if randomize_width < 0 || randomize_width > 0.5
Expand Down Expand Up @@ -123,32 +125,27 @@ def step
@current = :secondary
@secondary_transition_steps = @steps
end

@next_time = calc_next_time

unless @has_reached_timeout
@has_reached_timeout = @next_time >= @timeout_at
else
@has_timeouted = @next_time >= @timeout_at
end

nil
end

def recalc_next_time
@next_time = calc_next_time
end

# Use @next_time for time by default to keep backward compatibility
def limit?(time: @next_time, steps: @steps)
timeout?(time) || limit_step?(steps)
end

def timeout?(time = current_time)
if @forever
false
else
time >= @timeout_at
end
end

def limit_step?(steps = @steps)
def limit?
if @forever
false
else
!!(@max_steps && steps >= @max_steps)
@has_timeouted || !!(@max_steps && @steps >= @max_steps)
end
end
end
Expand Down
23 changes: 10 additions & 13 deletions test/plugin_helper/test_retry_state.rb
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ def ==(obj)
override_current_time(s, s.next_time)
s.step
assert_equal s.timeout_at, s.next_time
s.step
assert s.limit?
end

Expand All @@ -115,7 +116,6 @@ def ==(obj)
assert_equal 5, i
override_current_time(s, s.next_time)
s.step
assert_equal (s.current_time + 3), s.next_time
assert s.limit?
end

Expand Down Expand Up @@ -179,7 +179,9 @@ def ==(obj)
assert s.secondary?

s.step
assert_equal s.timeout_at, s.next_time
assert_equal s.timeout_at, s.next_time # 100

s.step
assert s.limit?
end

Expand Down Expand Up @@ -285,6 +287,7 @@ def ==(obj)
assert_equal 3, s.steps
assert_equal s.timeout_at, s.next_time

s.step
assert s.limit?
end

Expand Down Expand Up @@ -334,8 +337,6 @@ def ==(obj)
override_current_time(s, s.next_time)
s.step
assert_equal 6, s.steps
assert_equal (s.current_time + 10), s.next_time

assert s.limit?
end

Expand Down Expand Up @@ -405,7 +406,9 @@ def ==(obj)
assert s.secondary?

s.step
assert_equal s.timeout_at, s.next_time
assert_equal s.timeout_at, s.next_time # 100

s.step
assert s.limit?
end

Expand Down Expand Up @@ -988,15 +991,9 @@ def ==(obj)
msg << "[#{next_elapsed}s elapsed point] #{retry_count}th-Retry(#{s.secondary? ? "SEC" : "PRI"}) is triggered.\n"

# Update retry statement
if s.limit?
msg << "--- Reach limit of timeout. ---\n"
break
end

s.step

if s.limit_step?
msg << "--- Reach limit of max step. ---\n"
if s.limit?
msg << "--- Reach limit. ---\n"
break
end
end
Expand Down

0 comments on commit 4819ccf

Please sign in to comment.