Skip to content

Commit

Permalink
Memoized queue#is_fifo?, resolved issue with queue#queue_attributes o…
Browse files Browse the repository at this point in the history
…n non-FIFO queues.
  • Loading branch information
richseviora committed Nov 28, 2016
1 parent c127660 commit 3061b41
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 9 deletions.
25 changes: 18 additions & 7 deletions lib/shoryuken/queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def receive_messages(options)
# Returns whether this queue is a FIFO queue or not.
# @return [TrueClass, FalseClass]
def is_fifo?
queue_attributes.attributes[FIFO_ATTRIBUTE] == 'true'
@__is_fifo ||= queue_attributes.attributes[FIFO_ATTRIBUTE] == 'true'
end

# Returns whether this queue has content based deduplication enabled or not.
Expand All @@ -57,9 +57,12 @@ def has_content_deduplication?
MESSAGE_GROUP_ID = 'ShoryukenMessage'
VISIBILITY_TIMEOUT_ATTR = 'VisibilityTimeout'


# @return [Aws::SQS::Types::GetQueueAttributesResult]
def queue_attributes
client.get_queue_attributes(queue_url: url, attribute_names: [FIFO_ATTRIBUTE, CONTENT_DEDUP_ATTRIBUTE, VISIBILITY_TIMEOUT_ATTR])
# Note: Retrieving all queue attributes as requesting `FifoQueue` on non-FIFO queue raises error.
# See issue: https://github.com/aws/aws-sdk-ruby/issues/1350
client.get_queue_attributes(queue_url: url, attribute_names: ['All'])
end

# Returns sanitized messages, raising ArgumentError if any of the message is invalid.
Expand Down Expand Up @@ -111,16 +114,24 @@ def validate_message!(options)
elsif !body.is_a?(String)
fail ArgumentError, "The message body must be a String and you passed a #{body.class}"
end
if is_fifo? && options[:delay_seconds].is_a?(Fixnum)
validate_fifo_message! options
options
end

# Validates a FIFO message with the queue configuration.
# @param [Hash] options - Message hash.
# @raise [ArgumentError] raises ArgumentError if the message configuration is incompatible with the queue configuration.
def validate_fifo_message!(options)
return unless is_fifo?
if options[:delay_seconds].is_a?(Fixnum)
fail ArgumentError, 'FIFO queues do not accept DelaySeconds arguments.'
end
if is_fifo? && options[:message_group_id].nil?
if options[:message_group_id].nil?
fail ArgumentError, 'This queue is FIFO and no message_group_id was provided.'
end
if is_fifo? && !has_content_deduplication? && options[:message_deduplication_id].nil?
fail ArgumentError, 'This queue is FIFO without ContentBasedDeduplication enabled, and no MessageDeduplicationId was supplied'
if !has_content_deduplication? && options[:message_deduplication_id].nil?
fail ArgumentError, 'This queue is FIFO without ContentBasedDeduplication enabled, and no MessageDeduplicationId was supplied.'
end
options
end
end
end
4 changes: 2 additions & 2 deletions spec/shoryuken/queue_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ def call(options)
before {
# Required as Aws::SQS::Client.get_queue_url returns 'String' when responses are stubbed.
allow(subject).to receive(:url).and_return(queue_url)
allow(sqs).to receive(:get_queue_attributes).with({ queue_url: queue_url, attribute_names: ['FifoQueue', 'ContentBasedDeduplication', 'VisibilityTimeout'] }).and_return(attribute_response)
allow(sqs).to receive(:get_queue_attributes).with({ queue_url: queue_url, attribute_names: ['All'] }).and_return(attribute_response)

}
context 'when queue is FIFO' do
Expand All @@ -195,7 +195,7 @@ def call(options)

describe '#has_content_deduplication?' do
before {
allow(sqs).to receive(:get_queue_attributes).with({ queue_url: queue_url, attribute_names: ['FifoQueue', 'ContentBasedDeduplication', 'VisibilityTimeout'] }).and_return(attribute_response)
allow(sqs).to receive(:get_queue_attributes).with({ queue_url: queue_url, attribute_names: ['All'] }).and_return(attribute_response)

}
context 'when queue has content deduplicaiton' do
Expand Down

0 comments on commit 3061b41

Please sign in to comment.