diff --git a/lib/shoryuken/queue.rb b/lib/shoryuken/queue.rb index 957537c5..0f1a42ab 100644 --- a/lib/shoryuken/queue.rb +++ b/lib/shoryuken/queue.rb @@ -1,15 +1,17 @@ module Shoryuken class Queue + FIFO_ATTRIBUTE = 'FifoQueue' + MESSAGE_GROUP_ID = 'ShoryukenMessage' + VISIBILITY_TIMEOUT_ATTR = 'VisibilityTimeout' + attr_accessor :name, :client, :url def initialize(client, name) - self.name = name + self.name = name self.client = client - begin - self.url = client.get_queue_url(queue_name: name).queue_url - rescue Aws::SQS::Errors::NonExistentQueue => e - raise e, "The specified queue '#{name}' does not exist" - end + self.url = client.get_queue_url(queue_name: name).queue_url + rescue Aws::SQS::Errors::NonExistentQueue => e + raise e, "The specified queue '#{name}' does not exist." end def visibility_timeout @@ -38,34 +40,18 @@ def receive_messages(options) map { |m| Message.new(client, self, m) } end - # Returns whether this queue is a FIFO queue or not. - # @return [TrueClass, FalseClass] - def is_fifo? - @__is_fifo ||= queue_attributes.attributes[FIFO_ATTRIBUTE] == 'true' - end - - # Returns whether this queue has content based deduplication enabled or not. - # @return [TrueClass, FalseClass] - def has_content_deduplication? - queue_attributes.attributes[CONTENT_DEDUP_ATTRIBUTE] == 'true' + def fifo? + @_fifo ||= queue_attributes.attributes[FIFO_ATTRIBUTE] == 'true' end private - FIFO_ATTRIBUTE = 'FifoQueue' - CONTENT_DEDUP_ATTRIBUTE = 'ContentBasedDeduplication' - MESSAGE_GROUP_ID = 'ShoryukenMessage' - VISIBILITY_TIMEOUT_ATTR = 'VisibilityTimeout' - - - # @return [Aws::SQS::Types::GetQueueAttributesResult] def queue_attributes # Note: Retrieving all queue attributes as requesting `FifoQueue` on non-FIFO queue raises error. # See issue: https://github.com/aws/aws-sdk-ruby/issues/1350 client.get_queue_attributes(queue_url: url, attribute_names: ['All']) end - # Returns sanitized messages, raising ArgumentError if any of the message is invalid. def sanitize_messages!(options) options = case when options.is_a?(Array) @@ -76,62 +62,54 @@ def sanitize_messages!(options) options[:entries].each(&method(:add_fifo_attributes!)) options end + validate_messages!(options) + options end - # Modifies the supplied hash and adds the required FIFO message attributes based on the queue configuration. - def add_fifo_attributes!(message_hash) - return unless is_fifo? + def add_fifo_attributes!(options) + return unless fifo? - message_hash[:message_group_id] = MESSAGE_GROUP_ID - message_hash[:message_deduplication_id] = SecureRandom.uuid unless has_content_deduplication? + options[:message_group_id] ||= MESSAGE_GROUP_ID + options[:message_deduplication_id] ||= Digest::SHA256.digest(options[:message_body]) - message_hash + options end def sanitize_message!(options) - options = case - when options.is_a?(String) - # send_message('message') - { message_body: options } - when options.is_a?(Hash) - options - end - add_fifo_attributes! options + options = { message_body: options } if options.is_a?(String) + + add_fifo_attributes!(options) validate_message!(options) + options end def validate_messages!(options) - options[:entries].map { |m| validate_message!(m) } + options[:entries].map(&method(:validate_message!)) end def validate_message!(options) body = options[:message_body] + if body.is_a?(Hash) options[:message_body] = JSON.dump(body) elsif !body.is_a?(String) - fail ArgumentError, "The message body must be a String and you passed a #{body.class}" + fail ArgumentError, "The message body must be a String and you passed a #{body.class}." end - validate_fifo_message! options + + validate_fifo_message!(options) + options end - # Validates a FIFO message with the queue configuration. - # @param [Hash] options - Message hash. - # @raise [ArgumentError] raises ArgumentError if the message configuration is incompatible with the queue configuration. def validate_fifo_message!(options) - return unless is_fifo? - if options[:delay_seconds].is_a?(Fixnum) + return unless fifo? + + if options[:delay_seconds] fail ArgumentError, 'FIFO queues do not accept DelaySeconds arguments.' end - if options[:message_group_id].nil? - fail ArgumentError, 'This queue is FIFO and no message_group_id was provided.' - end - if !has_content_deduplication? && options[:message_deduplication_id].nil? - fail ArgumentError, 'This queue is FIFO without ContentBasedDeduplication enabled, and no MessageDeduplicationId was supplied.' - end end end end diff --git a/spec/shoryuken/queue_spec.rb b/spec/shoryuken/queue_spec.rb index 58b60330..4682f8e8 100644 --- a/spec/shoryuken/queue_spec.rb +++ b/spec/shoryuken/queue_spec.rb @@ -5,7 +5,6 @@ let(:sqs) { Aws::SQS::Client.new(stub_responses: true, credentials: credentials) } let(:queue_name) { 'shoryuken' } let(:queue_url) { 'https://eu-west-1.amazonaws.com:6059/123456789012/shoryuken' } - let(:attribute_response) { double 'Aws::SQS::Types::GetQueueAttributesResponse' } subject { described_class.new(sqs, queue_name) } before { @@ -16,7 +15,7 @@ describe '#send_message' do before { - allow(subject).to receive(:is_fifo?).and_return(false) + allow(subject).to receive(:fifo?).and_return(false) } it 'accepts SQS request parameters' do # https://docs.aws.amazon.com/sdkforruby/api/Aws/SQS/Client.html#send_message-instance_method @@ -35,13 +34,13 @@ it 'raises ArgumentError for nil' do expect { subject.send_message(message_body: nil) - }.to raise_error(ArgumentError, 'The message body must be a String and you passed a NilClass') + }.to raise_error(ArgumentError, 'The message body must be a String and you passed a NilClass.') end it 'raises ArgumentError for Fixnum' do expect { subject.send_message(message_body: 1) - }.to raise_error(ArgumentError, 'The message body must be a String and you passed a Fixnum') + }.to raise_error(ArgumentError, 'The message body must be a String and you passed a Fixnum.') end context 'when a client middleware' do @@ -81,7 +80,7 @@ def call(options) describe '#send_messages' do before { - allow(subject).to receive(:is_fifo?).and_return(false) + allow(subject).to receive(:fifo?).and_return(false) } it 'accepts SQS request parameters' do # https://docs.aws.amazon.com/sdkforruby/api/Aws/SQS/Client.html#send_message_batch-instance_method @@ -91,62 +90,62 @@ def call(options) end it 'accepts an array of messages' do - expect(sqs).to receive(:send_message_batch).with(hash_including(entries: [{id: '0', message_body: 'msg1', delay_seconds: 1, message_attributes: {attr: 'attr1'}}, {id: '1', message_body: 'msg2', delay_seconds: 1, message_attributes: {attr: 'attr2'}}])) - - subject.send_messages([ - { - message_body: 'msg1', - delay_seconds: 1, - message_attributes: {attr: 'attr1'} - }, { - message_body: 'msg2', - delay_seconds: 1, - message_attributes: {attr: 'attr2'} - } - ]) + options = { entries: [{ id: '0', + message_body: 'msg1', + delay_seconds: 1, + message_attributes: { attr: 'attr1' } }, + { id: '1', + message_body: 'msg2', + delay_seconds: 1, + message_attributes: { attr: 'attr2' } }] } + + expect(sqs).to receive(:send_message_batch).with(hash_including(options)) + + subject.send_messages([{ message_body: 'msg1', + delay_seconds: 1, + message_attributes: { attr: 'attr1' } + }, { + message_body: 'msg2', + delay_seconds: 1, + message_attributes: { attr: 'attr2' } + }]) end - context 'when FIFO is configured without content deduplication' do - before { - # Arrange - allow(subject).to receive(:is_fifo?).and_return(true) - # Pre-Assert - expect(sqs).to receive(:send_message_batch) do |arg| - expect(arg).to include(:entries) - first_entry = arg[:entries].first - expect(first_entry).to include({ id: '0', message_body: 'msg1', message_group_id: 'ShoryukenMessage' }) - expect(first_entry[:message_deduplication_id]).to be_a String - end - } - it 'sends with message_group_id and message_deduplication_id when an array is sent' do - subject.send_messages([{ message_body: 'msg1', message_attributes: { attr: 'attr1' } }]) - end - it 'sends with message_group_id and message_deduplication_id when a hash is sent' do - subject.send_messages(entries: [{ id: '0', message_body: 'msg1' }]) + context 'when FIFO' do + before do + allow(subject).to receive(:fifo?).and_return(true) end - end - context 'when FIFO is configured with content deduplication' do - before { - # Arrange - allow(subject).to receive(:is_fifo?).and_return(true) - allow(subject).to receive(:has_content_deduplication?).and_return(true) - # Pre-Assert - expect(sqs).to receive(:send_message_batch) do |arg| - expect(arg).to include(:entries) - first_entry = arg[:entries].first - expect(first_entry).to match({ id: '0', message_body: 'msg1', message_group_id: 'ShoryukenMessage', message_attributes: { attr: 'attr1' } }) + context 'message_group_id and message_deduplication_id are absent' do + it 'sets default values' do + expect(sqs).to receive(:send_message_batch) do |arg| + first_entry = arg[:entries].first + + expect(first_entry[:message_group_id]).to eq described_class::MESSAGE_GROUP_ID + expect(first_entry[:message_deduplication_id]).to be + end + + subject.send_messages([{ message_body: 'msg1', message_attributes: { attr: 'attr1' } }]) end - } - it 'sends with message_group_id when argument is an array' do - subject.send_messages([{ message_body: 'msg1', message_attributes: { attr: 'attr1' } }]) end - it 'sends with message_group_id when a hash is sent' do - subject.send_messages(entries: [{ id: '0', message_body: 'msg1', message_attributes: { attr: 'attr1' } }]) + + context 'message_group_id and message_deduplication_id are present' do + it 'preserves existing values' do + expect(sqs).to receive(:send_message_batch) do |arg| + first_entry = arg[:entries].first + + expect(first_entry[:message_group_id]).to eq 'my group' + expect(first_entry[:message_deduplication_id]).to eq 'my id' + end + + subject.send_messages([{ message_body: 'msg1', + message_attributes: { attr: 'attr1' }, + message_group_id: 'my group', + message_deduplication_id: 'my id' }]) + end end end - it 'accepts an array of string' do expect(sqs).to receive(:send_message_batch).with(hash_including(entries: [{ id: '0', message_body: 'msg1' }, { id: '1', message_body: 'msg2' }])) @@ -157,61 +156,39 @@ def call(options) it 'raises ArgumentError for nil' do expect { subject.send_messages(entries: [message_body: nil]) - }.to raise_error(ArgumentError, 'The message body must be a String and you passed a NilClass') + }.to raise_error(ArgumentError, 'The message body must be a String and you passed a NilClass.') end it 'raises ArgumentError for Fixnum' do expect { subject.send_messages(entries: [message_body: 1]) - }.to raise_error(ArgumentError, 'The message body must be a String and you passed a Fixnum') + }.to raise_error(ArgumentError, 'The message body must be a String and you passed a Fixnum.') end end end - describe '#is_fifo?' do - before { - # Required as Aws::SQS::Client.get_queue_url returns 'String' when responses are stubbed. + describe '#fifo?' do + before do + attribute_response = double 'Aws::SQS::Types::GetQueueAttributesResponse' + + allow(attribute_response).to receive(:attributes).and_return('FifoQueue' => fifo.to_s, 'ContentBasedDeduplication' => 'true') allow(subject).to receive(:url).and_return(queue_url) - allow(sqs).to receive(:get_queue_attributes).with({ queue_url: queue_url, attribute_names: ['All'] }).and_return(attribute_response) + allow(sqs).to receive(:get_queue_attributes).with(queue_url: queue_url, attribute_names: ['All']).and_return(attribute_response) + end - } context 'when queue is FIFO' do - before { - allow(attribute_response).to receive(:attributes).and_return({ 'FifoQueue' => 'true', 'ContentBasedDeduplication' => 'true' }) - } - it 'Returns True' do - expect(subject.is_fifo?).to eq true - end - end - context 'when queue is not FIFO' do - before { - allow(attribute_response).to receive(:attributes).and_return({ 'FifoQueue' => 'false', 'ContentBasedDeduplication' => 'false' }) - } - it 'Returns False' do - expect(subject.is_fifo?).to eq false + let(:fifo) { true } + + it 'returns true' do + expect(subject.fifo?).to be end end - end - describe '#has_content_deduplication?' do - before { - allow(sqs).to receive(:get_queue_attributes).with({ queue_url: queue_url, attribute_names: ['All'] }).and_return(attribute_response) + context 'when queue is not FIFO' do + let(:fifo) { false } - } - context 'when queue has content deduplicaiton' do - before { - allow(attribute_response).to receive(:attributes).and_return({ 'FifoQueue' => 'true', 'ContentBasedDeduplication' => 'true' }) - } - it 'Returns True' do - expect(subject.has_content_deduplication?).to eq true - end - end - context 'when queue does not have content deduplication' do - before { - allow(attribute_response).to receive(:attributes).and_return({ 'FifoQueue' => 'true', 'ContentBasedDeduplication' => 'false' }) - } - it 'Returns False' do - expect(subject.has_content_deduplication?).to eq false + it 'returns false' do + expect(subject.fifo?).to_not be end end end