Skip to content

Commit

Permalink
Continuation of #273; Fixes #272; Apply some Shoryuken code style; Minor
Browse files Browse the repository at this point in the history
refactor
  • Loading branch information
Pablo Cantero committed Dec 3, 2016
1 parent 3061b41 commit 1712350
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 143 deletions.
82 changes: 30 additions & 52 deletions lib/shoryuken/queue.rb
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
module Shoryuken
class Queue
FIFO_ATTRIBUTE = 'FifoQueue'
MESSAGE_GROUP_ID = 'ShoryukenMessage'
VISIBILITY_TIMEOUT_ATTR = 'VisibilityTimeout'

attr_accessor :name, :client, :url

def initialize(client, name)
self.name = name
self.name = name
self.client = client
begin
self.url = client.get_queue_url(queue_name: name).queue_url
rescue Aws::SQS::Errors::NonExistentQueue => e
raise e, "The specified queue '#{name}' does not exist"
end
self.url = client.get_queue_url(queue_name: name).queue_url
rescue Aws::SQS::Errors::NonExistentQueue => e
raise e, "The specified queue '#{name}' does not exist."
end

def visibility_timeout
Expand Down Expand Up @@ -38,34 +40,18 @@ def receive_messages(options)
map { |m| Message.new(client, self, m) }
end

# Returns whether this queue is a FIFO queue or not.
# @return [TrueClass, FalseClass]
def is_fifo?
@__is_fifo ||= queue_attributes.attributes[FIFO_ATTRIBUTE] == 'true'
end

# Returns whether this queue has content based deduplication enabled or not.
# @return [TrueClass, FalseClass]
def has_content_deduplication?
queue_attributes.attributes[CONTENT_DEDUP_ATTRIBUTE] == 'true'
def fifo?
@_fifo ||= queue_attributes.attributes[FIFO_ATTRIBUTE] == 'true'
end

private

FIFO_ATTRIBUTE = 'FifoQueue'
CONTENT_DEDUP_ATTRIBUTE = 'ContentBasedDeduplication'
MESSAGE_GROUP_ID = 'ShoryukenMessage'
VISIBILITY_TIMEOUT_ATTR = 'VisibilityTimeout'


# @return [Aws::SQS::Types::GetQueueAttributesResult]
def queue_attributes
# Note: Retrieving all queue attributes as requesting `FifoQueue` on non-FIFO queue raises error.
# See issue: https://github.com/aws/aws-sdk-ruby/issues/1350
client.get_queue_attributes(queue_url: url, attribute_names: ['All'])
end

# Returns sanitized messages, raising ArgumentError if any of the message is invalid.
def sanitize_messages!(options)
options = case
when options.is_a?(Array)
Expand All @@ -76,62 +62,54 @@ def sanitize_messages!(options)
options[:entries].each(&method(:add_fifo_attributes!))
options
end

validate_messages!(options)

options
end

# Modifies the supplied hash and adds the required FIFO message attributes based on the queue configuration.
def add_fifo_attributes!(message_hash)
return unless is_fifo?
def add_fifo_attributes!(options)
return unless fifo?

message_hash[:message_group_id] = MESSAGE_GROUP_ID
message_hash[:message_deduplication_id] = SecureRandom.uuid unless has_content_deduplication?
options[:message_group_id] ||= MESSAGE_GROUP_ID
options[:message_deduplication_id] ||= Digest::SHA256.digest(options[:message_body])

message_hash
options
end

def sanitize_message!(options)
options = case
when options.is_a?(String)
# send_message('message')
{ message_body: options }
when options.is_a?(Hash)
options
end
add_fifo_attributes! options
options = { message_body: options } if options.is_a?(String)

add_fifo_attributes!(options)
validate_message!(options)

options
end

def validate_messages!(options)
options[:entries].map { |m| validate_message!(m) }
options[:entries].map(&method(:validate_message!))
end

def validate_message!(options)
body = options[:message_body]

if body.is_a?(Hash)
options[:message_body] = JSON.dump(body)
elsif !body.is_a?(String)
fail ArgumentError, "The message body must be a String and you passed a #{body.class}"
fail ArgumentError, "The message body must be a String and you passed a #{body.class}."
end
validate_fifo_message! options

validate_fifo_message!(options)

options
end

# Validates a FIFO message with the queue configuration.
# @param [Hash] options - Message hash.
# @raise [ArgumentError] raises ArgumentError if the message configuration is incompatible with the queue configuration.
def validate_fifo_message!(options)
return unless is_fifo?
if options[:delay_seconds].is_a?(Fixnum)
return unless fifo?

if options[:delay_seconds]
fail ArgumentError, 'FIFO queues do not accept DelaySeconds arguments.'
end
if options[:message_group_id].nil?
fail ArgumentError, 'This queue is FIFO and no message_group_id was provided.'
end
if !has_content_deduplication? && options[:message_deduplication_id].nil?
fail ArgumentError, 'This queue is FIFO without ContentBasedDeduplication enabled, and no MessageDeduplicationId was supplied.'
end
end
end
end
159 changes: 68 additions & 91 deletions spec/shoryuken/queue_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
let(:sqs) { Aws::SQS::Client.new(stub_responses: true, credentials: credentials) }
let(:queue_name) { 'shoryuken' }
let(:queue_url) { 'https://eu-west-1.amazonaws.com:6059/123456789012/shoryuken' }
let(:attribute_response) { double 'Aws::SQS::Types::GetQueueAttributesResponse' }

subject { described_class.new(sqs, queue_name) }
before {
Expand All @@ -16,7 +15,7 @@

describe '#send_message' do
before {
allow(subject).to receive(:is_fifo?).and_return(false)
allow(subject).to receive(:fifo?).and_return(false)
}
it 'accepts SQS request parameters' do
# https://docs.aws.amazon.com/sdkforruby/api/Aws/SQS/Client.html#send_message-instance_method
Expand All @@ -35,13 +34,13 @@
it 'raises ArgumentError for nil' do
expect {
subject.send_message(message_body: nil)
}.to raise_error(ArgumentError, 'The message body must be a String and you passed a NilClass')
}.to raise_error(ArgumentError, 'The message body must be a String and you passed a NilClass.')
end

it 'raises ArgumentError for Fixnum' do
expect {
subject.send_message(message_body: 1)
}.to raise_error(ArgumentError, 'The message body must be a String and you passed a Fixnum')
}.to raise_error(ArgumentError, 'The message body must be a String and you passed a Fixnum.')
end

context 'when a client middleware' do
Expand Down Expand Up @@ -81,7 +80,7 @@ def call(options)

describe '#send_messages' do
before {
allow(subject).to receive(:is_fifo?).and_return(false)
allow(subject).to receive(:fifo?).and_return(false)
}
it 'accepts SQS request parameters' do
# https://docs.aws.amazon.com/sdkforruby/api/Aws/SQS/Client.html#send_message_batch-instance_method
Expand All @@ -91,62 +90,62 @@ def call(options)
end

it 'accepts an array of messages' do
expect(sqs).to receive(:send_message_batch).with(hash_including(entries: [{id: '0', message_body: 'msg1', delay_seconds: 1, message_attributes: {attr: 'attr1'}}, {id: '1', message_body: 'msg2', delay_seconds: 1, message_attributes: {attr: 'attr2'}}]))

subject.send_messages([
{
message_body: 'msg1',
delay_seconds: 1,
message_attributes: {attr: 'attr1'}
}, {
message_body: 'msg2',
delay_seconds: 1,
message_attributes: {attr: 'attr2'}
}
])
options = { entries: [{ id: '0',
message_body: 'msg1',
delay_seconds: 1,
message_attributes: { attr: 'attr1' } },
{ id: '1',
message_body: 'msg2',
delay_seconds: 1,
message_attributes: { attr: 'attr2' } }] }

expect(sqs).to receive(:send_message_batch).with(hash_including(options))

subject.send_messages([{ message_body: 'msg1',
delay_seconds: 1,
message_attributes: { attr: 'attr1' }
}, {
message_body: 'msg2',
delay_seconds: 1,
message_attributes: { attr: 'attr2' }
}])
end

context 'when FIFO is configured without content deduplication' do
before {
# Arrange
allow(subject).to receive(:is_fifo?).and_return(true)
# Pre-Assert
expect(sqs).to receive(:send_message_batch) do |arg|
expect(arg).to include(:entries)
first_entry = arg[:entries].first
expect(first_entry).to include({ id: '0', message_body: 'msg1', message_group_id: 'ShoryukenMessage' })
expect(first_entry[:message_deduplication_id]).to be_a String
end
}
it 'sends with message_group_id and message_deduplication_id when an array is sent' do
subject.send_messages([{ message_body: 'msg1', message_attributes: { attr: 'attr1' } }])
end
it 'sends with message_group_id and message_deduplication_id when a hash is sent' do
subject.send_messages(entries: [{ id: '0', message_body: 'msg1' }])
context 'when FIFO' do
before do
allow(subject).to receive(:fifo?).and_return(true)
end
end

context 'when FIFO is configured with content deduplication' do
before {
# Arrange
allow(subject).to receive(:is_fifo?).and_return(true)
allow(subject).to receive(:has_content_deduplication?).and_return(true)
# Pre-Assert
expect(sqs).to receive(:send_message_batch) do |arg|
expect(arg).to include(:entries)
first_entry = arg[:entries].first
expect(first_entry).to match({ id: '0', message_body: 'msg1', message_group_id: 'ShoryukenMessage', message_attributes: { attr: 'attr1' } })
context 'message_group_id and message_deduplication_id are absent' do
it 'sets default values' do
expect(sqs).to receive(:send_message_batch) do |arg|
first_entry = arg[:entries].first

expect(first_entry[:message_group_id]).to eq described_class::MESSAGE_GROUP_ID
expect(first_entry[:message_deduplication_id]).to be
end

subject.send_messages([{ message_body: 'msg1', message_attributes: { attr: 'attr1' } }])
end
}
it 'sends with message_group_id when argument is an array' do
subject.send_messages([{ message_body: 'msg1', message_attributes: { attr: 'attr1' } }])
end
it 'sends with message_group_id when a hash is sent' do
subject.send_messages(entries: [{ id: '0', message_body: 'msg1', message_attributes: { attr: 'attr1' } }])

context 'message_group_id and message_deduplication_id are present' do
it 'preserves existing values' do
expect(sqs).to receive(:send_message_batch) do |arg|
first_entry = arg[:entries].first

expect(first_entry[:message_group_id]).to eq 'my group'
expect(first_entry[:message_deduplication_id]).to eq 'my id'
end

subject.send_messages([{ message_body: 'msg1',
message_attributes: { attr: 'attr1' },
message_group_id: 'my group',
message_deduplication_id: 'my id' }])
end
end
end


it 'accepts an array of string' do
expect(sqs).to receive(:send_message_batch).with(hash_including(entries: [{ id: '0', message_body: 'msg1' }, { id: '1', message_body: 'msg2' }]))

Expand All @@ -157,61 +156,39 @@ def call(options)
it 'raises ArgumentError for nil' do
expect {
subject.send_messages(entries: [message_body: nil])
}.to raise_error(ArgumentError, 'The message body must be a String and you passed a NilClass')
}.to raise_error(ArgumentError, 'The message body must be a String and you passed a NilClass.')
end

it 'raises ArgumentError for Fixnum' do
expect {
subject.send_messages(entries: [message_body: 1])
}.to raise_error(ArgumentError, 'The message body must be a String and you passed a Fixnum')
}.to raise_error(ArgumentError, 'The message body must be a String and you passed a Fixnum.')
end
end
end

describe '#is_fifo?' do
before {
# Required as Aws::SQS::Client.get_queue_url returns 'String' when responses are stubbed.
describe '#fifo?' do
before do
attribute_response = double 'Aws::SQS::Types::GetQueueAttributesResponse'

allow(attribute_response).to receive(:attributes).and_return('FifoQueue' => fifo.to_s, 'ContentBasedDeduplication' => 'true')
allow(subject).to receive(:url).and_return(queue_url)
allow(sqs).to receive(:get_queue_attributes).with({ queue_url: queue_url, attribute_names: ['All'] }).and_return(attribute_response)
allow(sqs).to receive(:get_queue_attributes).with(queue_url: queue_url, attribute_names: ['All']).and_return(attribute_response)
end

}
context 'when queue is FIFO' do
before {
allow(attribute_response).to receive(:attributes).and_return({ 'FifoQueue' => 'true', 'ContentBasedDeduplication' => 'true' })
}
it 'Returns True' do
expect(subject.is_fifo?).to eq true
end
end
context 'when queue is not FIFO' do
before {
allow(attribute_response).to receive(:attributes).and_return({ 'FifoQueue' => 'false', 'ContentBasedDeduplication' => 'false' })
}
it 'Returns False' do
expect(subject.is_fifo?).to eq false
let(:fifo) { true }

it 'returns true' do
expect(subject.fifo?).to be
end
end
end

describe '#has_content_deduplication?' do
before {
allow(sqs).to receive(:get_queue_attributes).with({ queue_url: queue_url, attribute_names: ['All'] }).and_return(attribute_response)
context 'when queue is not FIFO' do
let(:fifo) { false }

}
context 'when queue has content deduplicaiton' do
before {
allow(attribute_response).to receive(:attributes).and_return({ 'FifoQueue' => 'true', 'ContentBasedDeduplication' => 'true' })
}
it 'Returns True' do
expect(subject.has_content_deduplication?).to eq true
end
end
context 'when queue does not have content deduplication' do
before {
allow(attribute_response).to receive(:attributes).and_return({ 'FifoQueue' => 'true', 'ContentBasedDeduplication' => 'false' })
}
it 'Returns False' do
expect(subject.has_content_deduplication?).to eq false
it 'returns false' do
expect(subject.fifo?).to_not be
end
end
end
Expand Down

0 comments on commit 1712350

Please sign in to comment.