diff --git a/lib/fluent/plugin/output.rb b/lib/fluent/plugin/output.rb index 54dbbeb5e0..976ff1002c 100644 --- a/lib/fluent/plugin/output.rb +++ b/lib/fluent/plugin/output.rb @@ -1275,52 +1275,57 @@ def update_retry_state(chunk_id, using_secondary, error = nil) unless @retry @retry = retry_state(@buffer_config.retry_randomize) + if @retry.limit? - # @retry_max_times == 0, fail imediately by the following block - else - if error - log.warn "failed to flush the buffer.", retry_times: @retry.steps, next_retry_time: @retry.next_time.round, chunk: chunk_id_hex, error: error - log.warn_backtrace error.backtrace - end - return + handle_limit_reached(error) + elsif error + log_retry_error(error, chunk_id_hex, using_secondary) end + + return end # @retry exists - if @retry.limit? - if error - records = @buffer.queued_records - msg = "failed to flush the buffer, and hit limit for retries. dropping all chunks in the buffer queue." - log.error msg, retry_times: @retry.steps, records: records, error: error - log.error_backtrace error.backtrace - end - @buffer.clear_queue! - log.debug "buffer queue cleared" - @retry = nil + # Ensure that the current time is greater than or equal to @retry.next_time to avoid the situation when + # @retry.step is called almost as many times as the number of flush threads in a short time. + if Time.now >= @retry.next_time + @retry.step else - # Ensure that the current time is greater than or equal to @retry.next_time to avoid the situation when - # @retry.step is called almost as many times as the number of flush threads in a short time. - if Time.now >= @retry.next_time - @retry.step - else - @retry.recalc_next_time # to prevent all flush threads from retrying at the same time - end - if error - if using_secondary - msg = "failed to flush the buffer with secondary output." - log.warn msg, retry_times: @retry.steps, next_retry_time: @retry.next_time.round, chunk: chunk_id_hex, error: error - log.warn_backtrace error.backtrace - else - msg = "failed to flush the buffer." - log.warn msg, retry_times: @retry.steps, next_retry_time: @retry.next_time.round, chunk: chunk_id_hex, error: error - log.warn_backtrace error.backtrace - end - end + @retry.recalc_next_time # to prevent all flush threads from retrying at the same time + end + + if @retry.limit? + handle_limit_reached(error) + elsif error + log_retry_error(error, chunk_id_hex, using_secondary) end end end + def log_retry_error(error, chunk_id_hex, using_secondary) + return unless error + if using_secondary + msg = "failed to flush the buffer with secondary output." + else + msg = "failed to flush the buffer." + end + log.warn(msg, retry_times: @retry.steps, next_retry_time: @retry.next_time.round, chunk: chunk_id_hex, error: error) + log.warn_backtrace(error.backtrace) + end + + def handle_limit_reached(error) + if error + records = @buffer.queued_records + msg = "Hit limit for retries. dropping all chunks in the buffer queue." + log.error msg, retry_times: @retry.steps, records: records, error: error + log.error_backtrace error.backtrace + end + @buffer.clear_queue! + log.debug "buffer queue cleared" + @retry = nil + end + def retry_state(randomize) if @secondary retry_state_create( diff --git a/lib/fluent/plugin_helper/retry_state.rb b/lib/fluent/plugin_helper/retry_state.rb index 7851fd61ea..82e5689af3 100644 --- a/lib/fluent/plugin_helper/retry_state.rb +++ b/lib/fluent/plugin_helper/retry_state.rb @@ -44,6 +44,8 @@ def initialize(title, wait, timeout, forever, max_steps, randomize, randomize_wi @timeout = timeout @timeout_at = @start + timeout + @has_reached_timeout = false + @has_timed_out = false @current = :primary if randomize_width < 0 || randomize_width > 0.5 @@ -98,7 +100,7 @@ def calc_next_time naive end elsif @current == :secondary - naive = naive_next_time(@steps - @secondary_transition_steps + 1) + naive = naive_next_time(@steps - @secondary_transition_steps) if naive >= @timeout_at @timeout_at else @@ -123,7 +125,15 @@ def step @current = :secondary @secondary_transition_steps = @steps end + @next_time = calc_next_time + + if @has_reached_timeout + @has_timed_out = @next_time >= @timeout_at + else + @has_reached_timeout = @next_time >= @timeout_at + end + nil end @@ -135,7 +145,7 @@ def limit? if @forever false else - @next_time >= @timeout_at || !!(@max_steps && @steps >= @max_steps) + @has_timed_out || !!(@max_steps && @steps >= @max_steps) end end end @@ -159,13 +169,13 @@ def naive_next_time(retry_next_times) def calc_max_retry_timeout(max_steps) result = 0 max_steps.times { |i| - result += calc_interval(i + 1) + result += calc_interval(i) } result end def calc_interval(num) - interval = raw_interval(num - 1) + interval = raw_interval(num) if @max_interval && interval > @max_interval @max_interval else @@ -175,7 +185,7 @@ def calc_interval(num) # Calculate previous finite value to avoid inf related errors. If this re-computing is heavy, use cache. until interval.finite? num -= 1 - interval = raw_interval(num - 1) + interval = raw_interval(num) end interval end diff --git a/test/plugin/test_output_as_buffered_retries.rb b/test/plugin/test_output_as_buffered_retries.rb index f3a08e877f..1fd0307500 100644 --- a/test/plugin/test_output_as_buffered_retries.rb +++ b/test/plugin/test_output_as_buffered_retries.rb @@ -140,13 +140,13 @@ def get_log_time(msg, logs) retry_state = @i.retry_state( @i.buffer_config.retry_randomize ) retry_state.step - assert_equal 1, (retry_state.next_time - now) - retry_state.step assert_equal (1 * (2 ** 1)), (retry_state.next_time - now) retry_state.step assert_equal (1 * (2 ** 2)), (retry_state.next_time - now) retry_state.step assert_equal (1 * (2 ** 3)), (retry_state.next_time - now) + retry_state.step + assert_equal (1 * (2 ** 4)), (retry_state.next_time - now) end test 'does retries correctly when #write fails' do @@ -332,7 +332,7 @@ def get_log_time(msg, logs) @i.emit_events("test.tag.3", dummy_event_stream()) logs = @i.log.out.logs - assert{ logs.any?{|l| l.include?("[error]: failed to flush the buffer, and hit limit for retries. dropping all chunks in the buffer queue.") } } + assert{ logs.any?{|l| l.include?("[error]: Hit limit for retries. dropping all chunks in the buffer queue.") } } end test 'output plugin give retries up by retry_max_times, and clear queue in buffer' do @@ -409,7 +409,7 @@ def get_log_time(msg, logs) @i.emit_events("test.tag.3", dummy_event_stream()) logs = @i.log.out.logs - assert{ logs.any?{|l| l.include?("[error]: failed to flush the buffer, and hit limit for retries. dropping all chunks in the buffer queue.") && l.include?("retry_times=10") } } + assert{ logs.any?{|l| l.include?("[error]: Hit limit for retries. dropping all chunks in the buffer queue.") && l.include?("retry_times=10") } } assert{ @i.buffer.queue.size == 0 } assert{ @i.buffer.stage.size == 1 } @@ -607,7 +607,7 @@ def get_log_time(msg, logs) logs = @i.log.out.logs target_time = Time.parse("2016-04-13 18:35:31 -0700") - target_msg = "[error]: failed to flush the buffer, and hit limit for retries. dropping all chunks in the buffer queue." + target_msg = "[error]: Hit limit for retries. dropping all chunks in the buffer queue." assert{ logs.any?{|l| l.include?(target_msg) } } log_time = get_log_time(target_msg, logs) @@ -695,7 +695,7 @@ def get_log_time(msg, logs) @i.emit_events("test.tag.3", dummy_event_stream()) logs = @i.log.out.logs - assert{ logs.any?{|l| l.include?("[error]: failed to flush the buffer, and hit limit for retries. dropping all chunks in the buffer queue.") && l.include?("retry_times=10") } } + assert{ logs.any?{|l| l.include?("[error]: Hit limit for retries. dropping all chunks in the buffer queue.") && l.include?("retry_times=10") } } assert{ @i.buffer.queue.size == 0 } assert{ @i.buffer.stage.size == 1 } @@ -743,7 +743,7 @@ def get_log_time(msg, logs) assert(@i.write_count == 1) assert(@i.num_errors == 1) - assert(@i.log.out.logs.any?{|l| l.include?("[error]: failed to flush the buffer, and hit limit for retries. dropping all chunks in the buffer queue.") && l.include?("retry_times=0") }) + assert(@i.log.out.logs.any?{|l| l.include?("[error]: Hit limit for retries. dropping all chunks in the buffer queue.") && l.include?("retry_times=0") }) assert(@i.buffer.queue.size == 0) assert(@i.buffer.stage.size == 1) assert(@i.buffer.queue.all?{|c| c.empty? }) diff --git a/test/plugin/test_output_as_buffered_secondary.rb b/test/plugin/test_output_as_buffered_secondary.rb index ae774b7455..b12bb154a3 100644 --- a/test/plugin/test_output_as_buffered_secondary.rb +++ b/test/plugin/test_output_as_buffered_secondary.rb @@ -874,7 +874,7 @@ def dummy_event_stream end logs = @i.log.out.logs - assert{ logs.any?{|l| l.include?("[error]: failed to flush the buffer, and hit limit for retries. dropping all chunks in the buffer queue.") } } + assert{ logs.any?{|l| l.include?("[error]: Hit limit for retries. dropping all chunks in the buffer queue.") } } assert{ now >= first_failure + 60 } end diff --git a/test/plugin_helper/test_retry_state.rb b/test/plugin_helper/test_retry_state.rb index 4edd41bdf9..b61e5e35db 100644 --- a/test/plugin_helper/test_retry_state.rb +++ b/test/plugin_helper/test_retry_state.rb @@ -18,6 +18,21 @@ class Dummy < Fluent::Plugin::TestBase helpers :retry_state end + class RetryRecord + attr_reader :retry_count, :elapsed_sec, :is_secondary + def initialize(retry_count, elapsed_sec, is_secondary) + @retry_count = retry_count # This is Nth retryment + @elapsed_sec = elapsed_sec + @is_secondary = is_secondary + end + + def ==(obj) + @retry_count == obj.retry_count && + @elapsed_sec == obj.elapsed_sec && + @is_secondary == obj.is_secondary + end + end + setup do @d = Dummy.new end @@ -75,6 +90,7 @@ class Dummy < Fluent::Plugin::TestBase override_current_time(s, s.next_time) s.step assert_equal s.timeout_at, s.next_time + s.step assert s.limit? end @@ -100,7 +116,6 @@ class Dummy < Fluent::Plugin::TestBase assert_equal 5, i override_current_time(s, s.next_time) s.step - assert_equal (s.current_time + 3), s.next_time assert s.limit? end @@ -164,7 +179,9 @@ class Dummy < Fluent::Plugin::TestBase assert s.secondary? s.step - assert_equal s.timeout_at, s.next_time + assert_equal s.timeout_at, s.next_time # 100 + + s.step assert s.limit? end @@ -202,7 +219,7 @@ class Dummy < Fluent::Plugin::TestBase while i < 300 s.step assert_equal i, s.steps - assert_equal (dummy_current_time + 0.1 * (2 ** (i - 1))), s.next_time + assert_equal (dummy_current_time + 0.1 * (2 ** i)), s.next_time assert !s.limit? i += 1 end @@ -218,22 +235,22 @@ class Dummy < Fluent::Plugin::TestBase assert_equal 0, s.steps assert_equal (dummy_current_time + 0.1), s.next_time - # 0.1 * (2 ** (10 - 1)) == 0.1 * 2 ** 9 == 51.2 - # 0.1 * (2 ** (11 - 1)) == 0.1 * 2 ** 10 == 102.4 + # 0.1 * 2 ** 9 == 51.2 + # 0.1 * 2 ** 10 == 102.4 i = 1 - while i < 11 + while i < 10 s.step assert_equal i, s.steps - assert_equal (dummy_current_time + 0.1 * (2 ** (i - 1))), s.next_time, "start:#{dummy_current_time}, i:#{i}" + assert_equal (dummy_current_time + 0.1 * (2 ** i)), s.next_time, "start:#{dummy_current_time}, i:#{i}" i += 1 end s.step - assert_equal 11, s.steps + assert_equal 10, s.steps assert_equal (dummy_current_time + 100), s.next_time s.step - assert_equal 12, s.steps + assert_equal 11, s.steps assert_equal (dummy_current_time + 100), s.next_time end @@ -249,32 +266,28 @@ class Dummy < Fluent::Plugin::TestBase assert_equal 0, s.steps assert_equal (dummy_current_time + 1), s.next_time - # 1 + 1 + 2 + 4 (=8) + # 1 + 2 + 4 (=7) override_current_time(s, s.next_time) s.step assert_equal 1, s.steps - assert_equal (s.current_time + 1), s.next_time - - override_current_time(s, s.next_time) - s.step - assert_equal 2, s.steps assert_equal (s.current_time + 2), s.next_time override_current_time(s, s.next_time) s.step - assert_equal 3, s.steps + assert_equal 2, s.steps assert_equal (s.current_time + 4), s.next_time assert !s.limit? - # + 8 (=16) > 12 + # + 8 (=15) > 12 override_current_time(s, s.next_time) s.step - assert_equal 4, s.steps + assert_equal 3, s.steps assert_equal s.timeout_at, s.next_time + s.step assert s.limit? end @@ -293,24 +306,24 @@ class Dummy < Fluent::Plugin::TestBase override_current_time(s, s.next_time) s.step assert_equal 1, s.steps - assert_equal (s.current_time + 1), s.next_time + assert_equal (s.current_time + 2), s.next_time override_current_time(s, s.next_time) s.step assert_equal 2, s.steps - assert_equal (s.current_time + 2), s.next_time + assert_equal (s.current_time + 4), s.next_time override_current_time(s, s.next_time) s.step assert_equal 3, s.steps - assert_equal (s.current_time + 4), s.next_time + assert_equal (s.current_time + 8), s.next_time assert !s.limit? override_current_time(s, s.next_time) s.step assert_equal 4, s.steps - assert_equal (s.current_time + 8), s.next_time + assert_equal (s.current_time + 10), s.next_time assert !s.limit? @@ -324,8 +337,6 @@ class Dummy < Fluent::Plugin::TestBase override_current_time(s, s.next_time) s.step assert_equal 6, s.steps - assert_equal (s.current_time + 10), s.next_time - assert s.limit? end @@ -341,40 +352,42 @@ class Dummy < Fluent::Plugin::TestBase assert_equal (dummy_current_time + 1), s.next_time assert !s.secondary? - # 1, 1(2), 2(4), 4(8), 8(16), 16(32), 32(64), (80), (81), (83), (87), (95), (100) + # primary: 3, 7, 15, 31, 63, 80 (timeout * threashold) + # secondary: 81, 83, 87, 95, 100 i = 1 - while i < 7 + while i < 6 override_current_time(s, s.next_time) assert !s.secondary? s.step assert_equal i, s.steps - assert_equal (s.current_time + 1 * (2 ** (i - 1))), s.next_time + assert_equal (s.current_time + 1 * (2 ** i)), s.next_time assert !s.limit? i += 1 end - assert_equal 7, i - override_current_time(s, s.next_time) # 64 + assert_equal 6, i + override_current_time(s, s.next_time) # 63 assert !s.secondary? s.step - assert_equal 7, s.steps + assert_equal 6, s.steps assert_equal s.secondary_transition_at, s.next_time assert !s.limit? i += 1 - assert_equal 8, i + assert_equal 7, i override_current_time(s, s.next_time) # 80 assert s.secondary? s.step - assert_equal 8, s.steps + assert_equal 7, s.steps assert_equal s.steps, s.secondary_transition_steps - assert_equal (s.secondary_transition_at + 1.0), s.next_time + assert_equal (s.secondary_transition_at + 1.0), s.next_time # 81 assert !s.limit? + assert_equal :secondary, s.current - # 81, 82, 84, 88, 96, 100 + # 83, 87, 95, 100 j = 1 while j < 4 override_current_time(s, s.next_time) @@ -382,18 +395,20 @@ class Dummy < Fluent::Plugin::TestBase assert_equal :secondary, s.current s.step - assert_equal (8 + j), s.steps + assert_equal (7 + j), s.steps assert_equal (s.current_time + (1 * (2 ** j))), s.next_time assert !s.limit?, "j:#{j}" j += 1 end assert_equal 4, j - override_current_time(s, s.next_time) # 96 + override_current_time(s, s.next_time) # 95 assert s.secondary? s.step - assert_equal s.timeout_at, s.next_time + assert_equal s.timeout_at, s.next_time # 100 + + s.step assert s.limit? end @@ -439,4 +454,553 @@ class Dummy < Fluent::Plugin::TestBase end end end + + sub_test_case "ExponentialBackOff_ScenarioTests" do + data("Simple timeout", { + timeout: 100, max_steps: nil, max_interval: nil, use_sec: false, sec_thres: 0.8, wait: 1, backoff_base: 2, + expected: [ + RetryRecord.new(1, 1, false), + RetryRecord.new(2, 3, false), + RetryRecord.new(3, 7, false), + RetryRecord.new(4, 15, false), + RetryRecord.new(5, 31, false), + RetryRecord.new(6, 63, false), + RetryRecord.new(7, 100, false), + ], + }) + data("Simple timeout with secondary", { + timeout: 100, max_steps: nil, max_interval: nil, use_sec: true, sec_thres: 0.8, wait: 1, backoff_base: 2, + expected: [ + RetryRecord.new(1, 1, false), + RetryRecord.new(2, 3, false), + RetryRecord.new(3, 7, false), + RetryRecord.new(4, 15, false), + RetryRecord.new(5, 31, false), + RetryRecord.new(6, 63, false), + RetryRecord.new(7, 80, true), + RetryRecord.new(8, 81, true), + RetryRecord.new(9, 83, true), + RetryRecord.new(10, 87, true), + RetryRecord.new(11, 95, true), + RetryRecord.new(12, 100, true), + ], + }) + data("Simple timeout with custom wait and backoff_base", { + timeout: 1000, max_steps: nil, max_interval: nil, use_sec: false, sec_thres: 0.8, wait: 2, backoff_base: 3, + expected: [ + RetryRecord.new(1, 2, false), + RetryRecord.new(2, 8, false), + RetryRecord.new(3, 26, false), + RetryRecord.new(4, 80, false), + RetryRecord.new(5, 242, false), + RetryRecord.new(6, 728, false), + RetryRecord.new(7, 1000, false), + ], + }) + data("Simple timeout with custom wait and backoff_base and secondary", { + timeout: 1000, max_steps: nil, max_interval: nil, use_sec: true, sec_thres: 0.8, wait: 2, backoff_base: 3, + expected: [ + RetryRecord.new(1, 2, false), + RetryRecord.new(2, 8, false), + RetryRecord.new(3, 26, false), + RetryRecord.new(4, 80, false), + RetryRecord.new(5, 242, false), + RetryRecord.new(6, 728, false), + RetryRecord.new(7, 800, true), + RetryRecord.new(8, 802, true), + RetryRecord.new(9, 808, true), + RetryRecord.new(10, 826, true), + RetryRecord.new(11, 880, true), + RetryRecord.new(12, 1000, true), + ], + }) + data("Default timeout", { + timeout: 72*3600, max_steps: nil, max_interval: nil, use_sec: false, sec_thres: 0.8, wait: 1, backoff_base: 2, + expected: [ + RetryRecord.new(1, 1, false), + RetryRecord.new(2, 3, false), + RetryRecord.new(3, 7, false), + RetryRecord.new(4, 15, false), + RetryRecord.new(5, 31, false), + RetryRecord.new(6, 63, false), + RetryRecord.new(7, 127, false), + RetryRecord.new(8, 255, false), + RetryRecord.new(9, 511, false), + RetryRecord.new(10, 1023, false), + RetryRecord.new(11, 2047, false), + RetryRecord.new(12, 4095, false), + RetryRecord.new(13, 8191, false), + RetryRecord.new(14, 16383, false), + RetryRecord.new(15, 32767, false), + RetryRecord.new(16, 65535, false), + RetryRecord.new(17, 131071, false), + RetryRecord.new(18, 259200, false), + ], + }) + data("Default timeout with secondary", { + timeout: 72*3600, max_steps: nil, max_interval: nil, use_sec: true, sec_thres: 0.8, wait: 1, backoff_base: 2, + expected: [ + RetryRecord.new(1, 1, false), + RetryRecord.new(2, 3, false), + RetryRecord.new(3, 7, false), + RetryRecord.new(4, 15, false), + RetryRecord.new(5, 31, false), + RetryRecord.new(6, 63, false), + RetryRecord.new(7, 127, false), + RetryRecord.new(8, 255, false), + RetryRecord.new(9, 511, false), + RetryRecord.new(10, 1023, false), + RetryRecord.new(11, 2047, false), + RetryRecord.new(12, 4095, false), + RetryRecord.new(13, 8191, false), + RetryRecord.new(14, 16383, false), + RetryRecord.new(15, 32767, false), + RetryRecord.new(16, 65535, false), + RetryRecord.new(17, 131071, false), + RetryRecord.new(18, 207360, true), + RetryRecord.new(19, 207361, true), + RetryRecord.new(20, 207363, true), + RetryRecord.new(21, 207367, true), + RetryRecord.new(22, 207375, true), + RetryRecord.new(23, 207391, true), + RetryRecord.new(24, 207423, true), + RetryRecord.new(25, 207487, true), + RetryRecord.new(26, 207615, true), + RetryRecord.new(27, 207871, true), + RetryRecord.new(28, 208383, true), + RetryRecord.new(29, 209407, true), + RetryRecord.new(30, 211455, true), + RetryRecord.new(31, 215551, true), + RetryRecord.new(32, 223743, true), + RetryRecord.new(33, 240127, true), + RetryRecord.new(34, 259200, true), + ], + }) + data("Default timeout with secondary and custom threshold", { + timeout: 72*3600, max_steps: nil, max_interval: nil, use_sec: true, sec_thres: 0.5, wait: 1, backoff_base: 2, + expected: [ + RetryRecord.new(1, 1, false), + RetryRecord.new(2, 3, false), + RetryRecord.new(3, 7, false), + RetryRecord.new(4, 15, false), + RetryRecord.new(5, 31, false), + RetryRecord.new(6, 63, false), + RetryRecord.new(7, 127, false), + RetryRecord.new(8, 255, false), + RetryRecord.new(9, 511, false), + RetryRecord.new(10, 1023, false), + RetryRecord.new(11, 2047, false), + RetryRecord.new(12, 4095, false), + RetryRecord.new(13, 8191, false), + RetryRecord.new(14, 16383, false), + RetryRecord.new(15, 32767, false), + RetryRecord.new(16, 65535, false), + RetryRecord.new(17, 129600, true), + RetryRecord.new(18, 129601, true), + RetryRecord.new(19, 129603, true), + RetryRecord.new(20, 129607, true), + RetryRecord.new(21, 129615, true), + RetryRecord.new(22, 129631, true), + RetryRecord.new(23, 129663, true), + RetryRecord.new(24, 129727, true), + RetryRecord.new(25, 129855, true), + RetryRecord.new(26, 130111, true), + RetryRecord.new(27, 130623, true), + RetryRecord.new(28, 131647, true), + RetryRecord.new(29, 133695, true), + RetryRecord.new(30, 137791, true), + RetryRecord.new(31, 145983, true), + RetryRecord.new(32, 162367, true), + RetryRecord.new(33, 195135, true), + RetryRecord.new(34, 259200, true), + ], + }) + data("Simple max_steps", { + timeout: 72*3600, max_steps: 10, max_interval: nil, use_sec: false, sec_thres: 0.8, wait: 1, backoff_base: 2, + expected: [ + RetryRecord.new(1, 1, false), + RetryRecord.new(2, 3, false), + RetryRecord.new(3, 7, false), + RetryRecord.new(4, 15, false), + RetryRecord.new(5, 31, false), + RetryRecord.new(6, 63, false), + RetryRecord.new(7, 127, false), + RetryRecord.new(8, 255, false), + RetryRecord.new(9, 511, false), + RetryRecord.new(10, 1023, false), + ], + }) + data("Simple max_steps with secondary", { + timeout: 72*3600, max_steps: 10, max_interval: nil, use_sec: true, sec_thres: 0.8, wait: 1, backoff_base: 2, + expected: [ + RetryRecord.new(1, 1, false), + RetryRecord.new(2, 3, false), + RetryRecord.new(3, 7, false), + RetryRecord.new(4, 15, false), + RetryRecord.new(5, 31, false), + RetryRecord.new(6, 63, false), + RetryRecord.new(7, 127, false), + RetryRecord.new(8, 255, false), + RetryRecord.new(9, 511, false), + RetryRecord.new(10, 818, true), + ], + }) + data("Simple interval", { + timeout: 72*3600, max_steps: nil, max_interval: 3600, use_sec: false, sec_thres: 0.8, wait: 1, backoff_base: 2, + expected: [ + RetryRecord.new(1, 1, false), + RetryRecord.new(2, 3, false), + RetryRecord.new(3, 7, false), + RetryRecord.new(4, 15, false), + RetryRecord.new(5, 31, false), + RetryRecord.new(6, 63, false), + RetryRecord.new(7, 127, false), + RetryRecord.new(8, 255, false), + RetryRecord.new(9, 511, false), + RetryRecord.new(10, 1023, false), + RetryRecord.new(11, 2047, false), + RetryRecord.new(12, 4095, false), + RetryRecord.new(13, 7695, false), + RetryRecord.new(14, 11295, false), + RetryRecord.new(15, 14895, false), + RetryRecord.new(16, 18495, false), + RetryRecord.new(17, 22095, false), + RetryRecord.new(18, 25695, false), + RetryRecord.new(19, 29295, false), + RetryRecord.new(20, 32895, false), + RetryRecord.new(21, 36495, false), + RetryRecord.new(22, 40095, false), + RetryRecord.new(23, 43695, false), + RetryRecord.new(24, 47295, false), + RetryRecord.new(25, 50895, false), + RetryRecord.new(26, 54495, false), + RetryRecord.new(27, 58095, false), + RetryRecord.new(28, 61695, false), + RetryRecord.new(29, 65295, false), + RetryRecord.new(30, 68895, false), + RetryRecord.new(31, 72495, false), + RetryRecord.new(32, 76095, false), + RetryRecord.new(33, 79695, false), + RetryRecord.new(34, 83295, false), + RetryRecord.new(35, 86895, false), + RetryRecord.new(36, 90495, false), + RetryRecord.new(37, 94095, false), + RetryRecord.new(38, 97695, false), + RetryRecord.new(39, 101295, false), + RetryRecord.new(40, 104895, false), + RetryRecord.new(41, 108495, false), + RetryRecord.new(42, 112095, false), + RetryRecord.new(43, 115695, false), + RetryRecord.new(44, 119295, false), + RetryRecord.new(45, 122895, false), + RetryRecord.new(46, 126495, false), + RetryRecord.new(47, 130095, false), + RetryRecord.new(48, 133695, false), + RetryRecord.new(49, 137295, false), + RetryRecord.new(50, 140895, false), + RetryRecord.new(51, 144495, false), + RetryRecord.new(52, 148095, false), + RetryRecord.new(53, 151695, false), + RetryRecord.new(54, 155295, false), + RetryRecord.new(55, 158895, false), + RetryRecord.new(56, 162495, false), + RetryRecord.new(57, 166095, false), + RetryRecord.new(58, 169695, false), + RetryRecord.new(59, 173295, false), + RetryRecord.new(60, 176895, false), + RetryRecord.new(61, 180495, false), + RetryRecord.new(62, 184095, false), + RetryRecord.new(63, 187695, false), + RetryRecord.new(64, 191295, false), + RetryRecord.new(65, 194895, false), + RetryRecord.new(66, 198495, false), + RetryRecord.new(67, 202095, false), + RetryRecord.new(68, 205695, false), + RetryRecord.new(69, 209295, false), + RetryRecord.new(70, 212895, false), + RetryRecord.new(71, 216495, false), + RetryRecord.new(72, 220095, false), + RetryRecord.new(73, 223695, false), + RetryRecord.new(74, 227295, false), + RetryRecord.new(75, 230895, false), + RetryRecord.new(76, 234495, false), + RetryRecord.new(77, 238095, false), + RetryRecord.new(78, 241695, false), + RetryRecord.new(79, 245295, false), + RetryRecord.new(80, 248895, false), + RetryRecord.new(81, 252495, false), + RetryRecord.new(82, 256095, false), + RetryRecord.new(83, 259200, false), + ], + }) + data("Simple interval with secondary", { + timeout: 72*3600, max_steps: nil, max_interval: 3600, use_sec: true, sec_thres: 0.8, wait: 1, backoff_base: 2, + expected: [ + RetryRecord.new(1, 1, false), + RetryRecord.new(2, 3, false), + RetryRecord.new(3, 7, false), + RetryRecord.new(4, 15, false), + RetryRecord.new(5, 31, false), + RetryRecord.new(6, 63, false), + RetryRecord.new(7, 127, false), + RetryRecord.new(8, 255, false), + RetryRecord.new(9, 511, false), + RetryRecord.new(10, 1023, false), + RetryRecord.new(11, 2047, false), + RetryRecord.new(12, 4095, false), + RetryRecord.new(13, 7695, false), + RetryRecord.new(14, 11295, false), + RetryRecord.new(15, 14895, false), + RetryRecord.new(16, 18495, false), + RetryRecord.new(17, 22095, false), + RetryRecord.new(18, 25695, false), + RetryRecord.new(19, 29295, false), + RetryRecord.new(20, 32895, false), + RetryRecord.new(21, 36495, false), + RetryRecord.new(22, 40095, false), + RetryRecord.new(23, 43695, false), + RetryRecord.new(24, 47295, false), + RetryRecord.new(25, 50895, false), + RetryRecord.new(26, 54495, false), + RetryRecord.new(27, 58095, false), + RetryRecord.new(28, 61695, false), + RetryRecord.new(29, 65295, false), + RetryRecord.new(30, 68895, false), + RetryRecord.new(31, 72495, false), + RetryRecord.new(32, 76095, false), + RetryRecord.new(33, 79695, false), + RetryRecord.new(34, 83295, false), + RetryRecord.new(35, 86895, false), + RetryRecord.new(36, 90495, false), + RetryRecord.new(37, 94095, false), + RetryRecord.new(38, 97695, false), + RetryRecord.new(39, 101295, false), + RetryRecord.new(40, 104895, false), + RetryRecord.new(41, 108495, false), + RetryRecord.new(42, 112095, false), + RetryRecord.new(43, 115695, false), + RetryRecord.new(44, 119295, false), + RetryRecord.new(45, 122895, false), + RetryRecord.new(46, 126495, false), + RetryRecord.new(47, 130095, false), + RetryRecord.new(48, 133695, false), + RetryRecord.new(49, 137295, false), + RetryRecord.new(50, 140895, false), + RetryRecord.new(51, 144495, false), + RetryRecord.new(52, 148095, false), + RetryRecord.new(53, 151695, false), + RetryRecord.new(54, 155295, false), + RetryRecord.new(55, 158895, false), + RetryRecord.new(56, 162495, false), + RetryRecord.new(57, 166095, false), + RetryRecord.new(58, 169695, false), + RetryRecord.new(59, 173295, false), + RetryRecord.new(60, 176895, false), + RetryRecord.new(61, 180495, false), + RetryRecord.new(62, 184095, false), + RetryRecord.new(63, 187695, false), + RetryRecord.new(64, 191295, false), + RetryRecord.new(65, 194895, false), + RetryRecord.new(66, 198495, false), + RetryRecord.new(67, 202095, false), + RetryRecord.new(68, 205695, false), + RetryRecord.new(69, 207360, true), + RetryRecord.new(70, 207361, true), + RetryRecord.new(71, 207363, true), + RetryRecord.new(72, 207367, true), + RetryRecord.new(73, 207375, true), + RetryRecord.new(74, 207391, true), + RetryRecord.new(75, 207423, true), + RetryRecord.new(76, 207487, true), + RetryRecord.new(77, 207615, true), + RetryRecord.new(78, 207871, true), + RetryRecord.new(79, 208383, true), + RetryRecord.new(80, 209407, true), + RetryRecord.new(81, 211455, true), + RetryRecord.new(82, 215055, true), + RetryRecord.new(83, 218655, true), + RetryRecord.new(84, 222255, true), + RetryRecord.new(85, 225855, true), + RetryRecord.new(86, 229455, true), + RetryRecord.new(87, 233055, true), + RetryRecord.new(88, 236655, true), + RetryRecord.new(89, 240255, true), + RetryRecord.new(90, 243855, true), + RetryRecord.new(91, 247455, true), + RetryRecord.new(92, 251055, true), + RetryRecord.new(93, 254655, true), + RetryRecord.new(94, 258255, true), + RetryRecord.new(95, 259200, true), + ], + }) + data("Max_steps and max_interval", { + timeout: 72*3600, max_steps: 30, max_interval: 3600, use_sec: false, sec_thres: 0.8, wait: 1, backoff_base: 2, + expected: [ + RetryRecord.new(1, 1, false), + RetryRecord.new(2, 3, false), + RetryRecord.new(3, 7, false), + RetryRecord.new(4, 15, false), + RetryRecord.new(5, 31, false), + RetryRecord.new(6, 63, false), + RetryRecord.new(7, 127, false), + RetryRecord.new(8, 255, false), + RetryRecord.new(9, 511, false), + RetryRecord.new(10, 1023, false), + RetryRecord.new(11, 2047, false), + RetryRecord.new(12, 4095, false), + RetryRecord.new(13, 7695, false), + RetryRecord.new(14, 11295, false), + RetryRecord.new(15, 14895, false), + RetryRecord.new(16, 18495, false), + RetryRecord.new(17, 22095, false), + RetryRecord.new(18, 25695, false), + RetryRecord.new(19, 29295, false), + RetryRecord.new(20, 32895, false), + RetryRecord.new(21, 36495, false), + RetryRecord.new(22, 40095, false), + RetryRecord.new(23, 43695, false), + RetryRecord.new(24, 47295, false), + RetryRecord.new(25, 50895, false), + RetryRecord.new(26, 54495, false), + RetryRecord.new(27, 58095, false), + RetryRecord.new(28, 61695, false), + RetryRecord.new(29, 65295, false), + RetryRecord.new(30, 68895, false), + ], + }) + data("Max_steps and max_interval with secondary", { + timeout: 72*3600, max_steps: 30, max_interval: 3600, use_sec: true, sec_thres: 0.8, wait: 1, backoff_base: 2, + expected: [ + RetryRecord.new(1, 1, false), + RetryRecord.new(2, 3, false), + RetryRecord.new(3, 7, false), + RetryRecord.new(4, 15, false), + RetryRecord.new(5, 31, false), + RetryRecord.new(6, 63, false), + RetryRecord.new(7, 127, false), + RetryRecord.new(8, 255, false), + RetryRecord.new(9, 511, false), + RetryRecord.new(10, 1023, false), + RetryRecord.new(11, 2047, false), + RetryRecord.new(12, 4095, false), + RetryRecord.new(13, 7695, false), + RetryRecord.new(14, 11295, false), + RetryRecord.new(15, 14895, false), + RetryRecord.new(16, 18495, false), + RetryRecord.new(17, 22095, false), + RetryRecord.new(18, 25695, false), + RetryRecord.new(19, 29295, false), + RetryRecord.new(20, 32895, false), + RetryRecord.new(21, 36495, false), + RetryRecord.new(22, 40095, false), + RetryRecord.new(23, 43695, false), + RetryRecord.new(24, 47295, false), + RetryRecord.new(25, 50895, false), + RetryRecord.new(26, 54495, false), + RetryRecord.new(27, 55116, true), + RetryRecord.new(28, 55117, true), + RetryRecord.new(29, 55119, true), + RetryRecord.new(30, 55123, true), + ], + }) + data("Max_steps and max_interval with timeout", { + timeout: 10000, max_steps: 30, max_interval: 1000, use_sec: false, sec_thres: 0.8, wait: 1, backoff_base: 2, + expected: [ + RetryRecord.new(1, 1, false), + RetryRecord.new(2, 3, false), + RetryRecord.new(3, 7, false), + RetryRecord.new(4, 15, false), + RetryRecord.new(5, 31, false), + RetryRecord.new(6, 63, false), + RetryRecord.new(7, 127, false), + RetryRecord.new(8, 255, false), + RetryRecord.new(9, 511, false), + RetryRecord.new(10, 1023, false), + RetryRecord.new(11, 2023, false), + RetryRecord.new(12, 3023, false), + RetryRecord.new(13, 4023, false), + RetryRecord.new(14, 5023, false), + RetryRecord.new(15, 6023, false), + RetryRecord.new(16, 7023, false), + RetryRecord.new(17, 8023, false), + RetryRecord.new(18, 9023, false), + RetryRecord.new(19, 10000, false), + ], + }) + data("Max_steps and max_interval with timeout and secondary", { + timeout: 10000, max_steps: 30, max_interval: 1000, use_sec: true, sec_thres: 0.8, wait: 1, backoff_base: 2, + expected: [ + RetryRecord.new(1, 1, false), + RetryRecord.new(2, 3, false), + RetryRecord.new(3, 7, false), + RetryRecord.new(4, 15, false), + RetryRecord.new(5, 31, false), + RetryRecord.new(6, 63, false), + RetryRecord.new(7, 127, false), + RetryRecord.new(8, 255, false), + RetryRecord.new(9, 511, false), + RetryRecord.new(10, 1023, false), + RetryRecord.new(11, 2023, false), + RetryRecord.new(12, 3023, false), + RetryRecord.new(13, 4023, false), + RetryRecord.new(14, 5023, false), + RetryRecord.new(15, 6023, false), + RetryRecord.new(16, 7023, false), + RetryRecord.new(17, 8000, true), + RetryRecord.new(18, 8001, true), + RetryRecord.new(19, 8003, true), + RetryRecord.new(20, 8007, true), + RetryRecord.new(21, 8015, true), + RetryRecord.new(22, 8031, true), + RetryRecord.new(23, 8063, true), + RetryRecord.new(24, 8127, true), + RetryRecord.new(25, 8255, true), + RetryRecord.new(26, 8511, true), + RetryRecord.new(27, 9023, true), + RetryRecord.new(28, 10000, true), + ], + }) + test "exponential backoff with senario" do |data| + print_for_debug = false # change this value true if need to see msg always. + trying_count = 1000 # just for avoiding infinite loop + + retry_records = [] + msg = "" + + s = @d.retry_state_create( + :t15, :exponential_backoff, data[:wait], data[:timeout], + max_steps: data[:max_steps], max_interval: data[:max_interval], + secondary: data[:use_sec], secondary_threshold: data[:sec_thres], + backoff_base: data[:backoff_base], randomize: false + ) + override_current_time(s, s.start) + + retry_count = 0 + trying_count.times do + next_elapsed = (s.next_time - s.start).to_i + + msg << "step: #{s.steps}, next: #{next_elapsed}s (#{next_elapsed / 3600}h)\n" + + # Wait until next time to trigger the next retry + override_current_time(s, s.next_time) + + # Retry will be triggered at this point. + retry_count += 1 + rec = RetryRecord.new(retry_count, next_elapsed, s.secondary?) + retry_records.append(rec) + msg << "[#{next_elapsed}s elapsed point] #{retry_count}th-Retry(#{s.secondary? ? "SEC" : "PRI"}) is triggered.\n" + + # Update retry statement + s.step + if s.limit? + msg << "--- Reach limit. ---\n" + break + end + end + + assert_equal(data[:expected], retry_records, msg) + + print(msg) if print_for_debug + end + end end