Skip to content

Commit

Permalink
Merge pull request #329 from phstc/support-retry_intervals-as-a-lambd…
Browse files Browse the repository at this point in the history
…a-303

Support retry_intervals as a lambda

Fix #303
  • Loading branch information
phstc authored Mar 10, 2017
2 parents 599291b + bd8915d commit ce5a304
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 50 deletions.
46 changes: 25 additions & 21 deletions lib/shoryuken/middleware/server/exponential_backoff_retry.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,16 @@ class ExponentialBackoffRetry

def call(worker, queue, sqs_msg, body)
if sqs_msg.is_a?(Array)
logger.warn { "Exponential backoff isn't supported for batch workers" }
logger.warn { "Exponential backoff isn't supported for batch workers" }
return yield
end

started_at = Time.now
yield
rescue
retry_intervals = Array(worker.class.get_shoryuken_options['retry_intervals'])
retry_intervals = worker.class.get_shoryuken_options['retry_intervals']

if retry_intervals.empty? || !handle_failure(sqs_msg, started_at, retry_intervals)
if retry_intervals.nil? || !handle_failure(sqs_msg, started_at, retry_intervals)
# Re-raise the exception if the job is not going to be exponential backoff retried.
# This allows custom middleware (like exception notifiers) to be aware of the unhandled failure.
raise
Expand All @@ -24,28 +24,32 @@ def call(worker, queue, sqs_msg, body)

private

def handle_failure(sqs_msg, started_at, retry_intervals)
return unless attempts = sqs_msg.attributes['ApproximateReceiveCount']

attempts = attempts.to_i - 1

interval = if attempts < retry_intervals.size
retry_intervals[attempts]
else
retry_intervals.last
end

# Visibility timeouts are limited to a total 12 hours, starting from the receipt of the message.
# We calculate the maximum timeout by subtracting the amount of time since the receipt of the message.
#
# From the docs: "Amazon SQS restarts the timeout period using the new value."
# http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/AboutVT.html#AboutVT-extending-message-visibility-timeout
max_timeout = 43200 - (Time.now - started_at).ceil - 1
def get_interval(retry_intervals, attempts)
return retry_intervals.call(attempts) if retry_intervals.respond_to?(:call)

if attempts <= (retry_intervals = Array(retry_intervals)).size
retry_intervals[attempts - 1]
else
retry_intervals.last
end
end

def next_visibility_timeout(interval, started_at)
max_timeout = 43_200 - (Time.now - started_at).ceil - 1
interval = max_timeout if interval > max_timeout
interval.to_i
end

def handle_failure(sqs_msg, started_at, retry_intervals)
receive_count = sqs_msg.attributes['ApproximateReceiveCount'].to_i

sqs_msg.change_visibility(visibility_timeout: interval.to_i)
return false unless (interval = get_interval(retry_intervals, receive_count))

sqs_msg.change_visibility(visibility_timeout: next_visibility_timeout(interval.to_i, started_at))

logger.info { "Message #{sqs_msg.message_id} failed, will be retried in #{interval} seconds." }

true
end
end
end
Expand Down
75 changes: 46 additions & 29 deletions spec/shoryuken/middleware/server/exponential_backoff_retry_spec.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
require 'spec_helper'

# rubocop:disable Metrics/BlockLength, Metrics/BlockDelimiters
RSpec.describe Shoryuken::Middleware::Server::ExponentialBackoffRetry do
let(:queue) { 'default' }
let(:sqs_queue) { double Shoryuken::Queue }
Expand All @@ -16,8 +17,8 @@
end
end

context 'when a job succeeds' do
it 'does not retry the job' do
context 'when no exception' do
it 'does not retry' do
TestWorker.get_shoryuken_options['retry_intervals'] = [300, 1800]

expect(sqs_msg).not_to receive(:change_visibility)
Expand All @@ -26,60 +27,76 @@
end
end

context 'when a job throws an exception' do
context 'when an error' do
context "and retry_intervals isn't set" do
it 'does not retry' do
expect(sqs_msg).not_to receive(:change_visibility)

it 'does not retry the job by default' do
expect(sqs_msg).not_to receive(:change_visibility)
expect {
subject.call(TestWorker.new, queue, sqs_msg, sqs_msg.body) { raise 'Error' }
}.to raise_error(RuntimeError, 'Error')
end
end

context 'and retry_intervals is a lambda' do
it 'retries' do
TestWorker.get_shoryuken_options['retry_intervals'] = ->(_attempts) { 500 }

allow(sqs_msg).to receive(:queue) { sqs_queue }
expect(sqs_msg).to receive(:change_visibility).with(visibility_timeout: 500)

expect {
subject.call(TestWorker.new, queue, sqs_msg, sqs_msg.body) { raise 'Error' }
}.to raise_error(RuntimeError, 'Error')
expect { subject.call(TestWorker.new, queue, sqs_msg, sqs_msg.body) { raise 'failed' } }.not_to raise_error
end
end

it 'does not retry the job if :retry_intervals is empty' do
TestWorker.get_shoryuken_options['retry_intervals'] = []
context 'and retry_intervals is empty' do
it 'does not retry' do
TestWorker.get_shoryuken_options['retry_intervals'] = []

expect(sqs_msg).not_to receive(:change_visibility)
expect(sqs_msg).not_to receive(:change_visibility)

expect {
subject.call(TestWorker.new, queue, sqs_msg, sqs_msg.body) { raise 'Error' }
}.to raise_error(RuntimeError, 'Error')
expect {
subject.call(TestWorker.new, queue, sqs_msg, sqs_msg.body) { raise 'Error' }
}.to raise_error(RuntimeError, 'Error')
end
end

it 'retries the job if :retry_intervals is non-empty' do
it 'uses first interval ' do
TestWorker.get_shoryuken_options['retry_intervals'] = [300, 1800]

allow(sqs_msg).to receive(:queue){ sqs_queue }
allow(sqs_msg).to receive(:queue) { sqs_queue }
expect(sqs_msg).to receive(:change_visibility).with(visibility_timeout: 300)

expect { subject.call(TestWorker.new, queue, sqs_msg, sqs_msg.body) { raise 'failed' } }.not_to raise_error
end

it 'retries the job with exponential backoff' do
it 'uses matching interval' do
TestWorker.get_shoryuken_options['retry_intervals'] = [300, 1800]

allow(sqs_msg).to receive(:attributes){ {'ApproximateReceiveCount' => 2 } }
allow(sqs_msg).to receive(:queue){ sqs_queue }
allow(sqs_msg).to receive(:attributes) { { 'ApproximateReceiveCount' => 2 } }
allow(sqs_msg).to receive(:queue) { sqs_queue }
expect(sqs_msg).to receive(:change_visibility).with(visibility_timeout: 1800)

expect { subject.call(TestWorker.new, queue, sqs_msg, sqs_msg.body) { raise 'failed' } }.not_to raise_error
end

it 'uses the last retry interval when :receive_count exceeds the size of :retry_intervals' do
TestWorker.get_shoryuken_options['retry_intervals'] = [300, 1800]
context 'when attempts exceeds retry_intervals' do
it 'uses last interval' do
TestWorker.get_shoryuken_options['retry_intervals'] = [300, 1800]

allow(sqs_msg).to receive(:attributes){ {'ApproximateReceiveCount' => 3 } }
allow(sqs_msg).to receive(:queue){ sqs_queue }
expect(sqs_msg).to receive(:change_visibility).with(visibility_timeout: 1800)
allow(sqs_msg).to receive(:attributes) { { 'ApproximateReceiveCount' => 3 } }
allow(sqs_msg).to receive(:queue) { sqs_queue }
expect(sqs_msg).to receive(:change_visibility).with(visibility_timeout: 1800)

expect { subject.call(TestWorker.new, queue, sqs_msg, sqs_msg.body) { raise 'failed' } }.not_to raise_error
expect { subject.call(TestWorker.new, queue, sqs_msg, sqs_msg.body) { raise 'failed' } }.not_to raise_error
end
end

it 'limits the visibility timeout to 12 hours from receipt of message' do
TestWorker.get_shoryuken_options['retry_intervals'] = [86400]
it 'limits the visibility timeout to 12 hours' do
TestWorker.get_shoryuken_options['retry_intervals'] = [86_400]

allow(sqs_msg).to receive(:queue){ sqs_queue }
expect(sqs_msg).to receive(:change_visibility).with(visibility_timeout: 43198)
allow(sqs_msg).to receive(:queue) { sqs_queue }
expect(sqs_msg).to receive(:change_visibility).with(visibility_timeout: 43_198)

expect { subject.call(TestWorker.new, queue, sqs_msg, sqs_msg.body) { raise 'failed' } }.not_to raise_error
end
Expand Down

0 comments on commit ce5a304

Please sign in to comment.