diff --git a/lib/fluent/plugin/output.rb b/lib/fluent/plugin/output.rb index 0db896fd4c..45290fda64 100644 --- a/lib/fluent/plugin/output.rb +++ b/lib/fluent/plugin/output.rb @@ -1290,16 +1290,17 @@ def update_retry_state(chunk_id, using_secondary, error = nil) # @retry exists + # Ensure that the current time is greater than or equal to @retry.next_time to avoid the situation when + # @retry.step is called almost as many times as the number of flush threads in a short time. + if Time.now >= @retry.next_time + @retry.step + else + @retry.recalc_next_time # to prevent all flush threads from retrying at the same time + end + if @retry.limit? handle_limit_reached(error) else - # Ensure that the current time is greater than or equal to @retry.next_time to avoid the situation when - # @retry.step is called almost as many times as the number of flush threads in a short time. - if Time.now >= @retry.next_time - @retry.step - else - @retry.recalc_next_time # to prevent all flush threads from retrying at the same time - end if error if using_secondary msg = "failed to flush the buffer with secondary output." @@ -1311,7 +1312,6 @@ def update_retry_state(chunk_id, using_secondary, error = nil) log.warn_backtrace error.backtrace end end - handle_limit_reached(error) if @retry.limit_step? end end end diff --git a/lib/fluent/plugin_helper/retry_state.rb b/lib/fluent/plugin_helper/retry_state.rb index 29bcf8c157..82e5689af3 100644 --- a/lib/fluent/plugin_helper/retry_state.rb +++ b/lib/fluent/plugin_helper/retry_state.rb @@ -44,6 +44,8 @@ def initialize(title, wait, timeout, forever, max_steps, randomize, randomize_wi @timeout = timeout @timeout_at = @start + timeout + @has_reached_timeout = false + @has_timed_out = false @current = :primary if randomize_width < 0 || randomize_width > 0.5 @@ -123,7 +125,15 @@ def step @current = :secondary @secondary_transition_steps = @steps end + @next_time = calc_next_time + + if @has_reached_timeout + @has_timed_out = @next_time >= @timeout_at + else + @has_reached_timeout = @next_time >= @timeout_at + end + nil end @@ -131,24 +141,11 @@ def recalc_next_time @next_time = calc_next_time end - # Use @next_time for time by default to keep backward compatibility - def limit?(time: @next_time, steps: @steps) - timeout?(time) || limit_step?(steps) - end - - def timeout?(time = current_time) - if @forever - false - else - time >= @timeout_at - end - end - - def limit_step?(steps = @steps) + def limit? if @forever false else - !!(@max_steps && steps >= @max_steps) + @has_timed_out || !!(@max_steps && @steps >= @max_steps) end end end diff --git a/test/plugin_helper/test_retry_state.rb b/test/plugin_helper/test_retry_state.rb index 824151c3a8..b61e5e35db 100644 --- a/test/plugin_helper/test_retry_state.rb +++ b/test/plugin_helper/test_retry_state.rb @@ -90,6 +90,7 @@ def ==(obj) override_current_time(s, s.next_time) s.step assert_equal s.timeout_at, s.next_time + s.step assert s.limit? end @@ -115,7 +116,6 @@ def ==(obj) assert_equal 5, i override_current_time(s, s.next_time) s.step - assert_equal (s.current_time + 3), s.next_time assert s.limit? end @@ -179,7 +179,9 @@ def ==(obj) assert s.secondary? s.step - assert_equal s.timeout_at, s.next_time + assert_equal s.timeout_at, s.next_time # 100 + + s.step assert s.limit? end @@ -285,6 +287,7 @@ def ==(obj) assert_equal 3, s.steps assert_equal s.timeout_at, s.next_time + s.step assert s.limit? end @@ -334,8 +337,6 @@ def ==(obj) override_current_time(s, s.next_time) s.step assert_equal 6, s.steps - assert_equal (s.current_time + 10), s.next_time - assert s.limit? end @@ -405,7 +406,9 @@ def ==(obj) assert s.secondary? s.step - assert_equal s.timeout_at, s.next_time + assert_equal s.timeout_at, s.next_time # 100 + + s.step assert s.limit? end @@ -988,15 +991,9 @@ def ==(obj) msg << "[#{next_elapsed}s elapsed point] #{retry_count}th-Retry(#{s.secondary? ? "SEC" : "PRI"}) is triggered.\n" # Update retry statement - if s.limit? - msg << "--- Reach limit of timeout. ---\n" - break - end - s.step - - if s.limit_step? - msg << "--- Reach limit of max step. ---\n" + if s.limit? + msg << "--- Reach limit. ---\n" break end end