Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding FIFO Support #273

Closed
wants to merge 4 commits into from
Closed

Adding FIFO Support #273

wants to merge 4 commits into from

Conversation

richseviora
Copy link
Contributor

@richseviora richseviora commented Nov 24, 2016

Fixes #272

Adds FIFO queue type support to Shoryuken.

Changes:

  • Shoryuken::Queue will now check if the queue is a FIFO queue or not.
  • If it is, messages will have a MessageGroupId attached.
  • If the queue is FIFO and does not have content based deduplication enabled, a UUID is assigned as the MessageDeduplicationId.
  • If the queue is FIFO and the consumer submits a message with DelaySeconds, an error will be raised as the SDK would've raised an error any way.

@richseviora richseviora mentioned this pull request Nov 24, 2016
# Modifies the supplied hash and adds the required FIFO message attributes based on the queue configuration.
def add_fifo_attributes!(message_hash)
message_hash[:message_group_id] = MESSAGE_GROUP_ID if is_fifo?
message_hash[:message_deduplication_id] = SecureRandom.uuid if is_fifo? && !has_content_deduplication?
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@richseviora WDYT:

def add_fifo_attributes!(message_hash)
   return unless is_fifo?
   
   message_hash[:message_group_id]         = MESSAGE_GROUP_ID
   message_hash[:message_deduplication_id] = SecureRandom.uuid unless has_content_deduplication?
end

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitely cleaner. Will amend accordingly.

queue_url: url,
attribute_names: ['VisibilityTimeout']
queue_url: url,
attribute_names: ['VisibilityTimeout']
).attributes['VisibilityTimeout'].to_i
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@richseviora could you keep the indentation, we use 2 spaces.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do. I don't see a code style statement in the readme so I'll add that in.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's no specific code style, it's more like just to keep the style it currently uses.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So then we'll make the current style the code style. I think it's beneficial to include so new contributors can write it correctly going forward, and it saves you time :)

result = {id: index.to_s}.merge(m.is_a?(Hash) ? m : {message_body: m})
add_fifo_attributes!(result)
result
end}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another way:

{ entries: options.map.with_index do |m, index|
  { id: index.to_s }.merge(m.is_a?(Hash) ? m : { message_body: m }).tap(&method(:add_fifo_attributes))
end }

We use spaces in between { }.

options[:entries].each do |m|
add_fifo_attributes!(m)
end
options
end
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

options[:entries].each(&:add_fifo_attributes!)

when options.is_a?(String)
# send_message('message')
{message_body: options}
when options.is_a?(Hash)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better not changing the styling.


# @return [Aws::SQS::Types::GetQueueAttributesResult]
def fifo_attributes
@fifo_attr ||= client.get_queue_attributes(queue_url: url, attribute_names: [FIFO_ATTRIBUTE, CONTENT_DEDUP_ATTRIBUTE])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering if we could do:

def attributes
  @_attributes || client.get_queue_attributes(queue_url: url, attribute_names: [FIFO_ATTRIBUTE, CONTENT_DEDUP_ATTRIBUTE, 'VisibilityTimeout'])
end

So we could share the attributes with visibility_timeout and fifo.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like it!

As I'm thinking through this, I'm starting to wonder if we'd need to worry about changes to queues while the workers are active. I know FifoQueue is readonly, but I believe VisibilityTimeout and ContentBasedDeduplication can be changed.

private

FIFO_ATTRIBUTE = 'FifoQueue'
Copy link
Collaborator

@phstc phstc Nov 26, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@richseviora I'm getting this:

     Aws::SQS::Errors::InvalidAttributeName:
       Unknown Attribute FifoQueue.

while trying to run SPEC_ALL=true rspec which performs an integration test end-to-end. I think that's because the idea of the generic queue_attributes, it's also requesting FifoQueue for non-FIFO queues.

I think we could use:

# See https://docs.aws.amazon.com/sdkforruby/api/Aws/SQS/Client.html#get_queue_attributes-instance_method
client.get_queue_attributes(queue_url: url, attribute_names: ['All'])

So it would return FifoQueue when available. WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wow. I'm surprised that SQS would have a queue attribute that would generate an error if the value was false. I've opened an issue with their SDK. I'll amend the code as suggested.

# Returns whether this queue is a FIFO queue or not.
# @return [TrueClass, FalseClass]
def is_fifo?
queue_attributes.attributes[FIFO_ATTRIBUTE] == 'true'
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@richseviora I got your point that we shouldn't memoize the generic queue_attributes as the visibility timeout can be changed after the queue creation, that was a good call. But I think we could memoize is_fifo?, so we don't need to perform a request for every message we send.

def fifo?
  @_is_fifo ||= queue_attributes.attributes[FIFO_ATTRIBUTE] == 'true'
end

WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed! Will amend accordingly.

end
if is_fifo? && !has_content_deduplication? && options[:message_deduplication_id].nil?
fail ArgumentError, 'This queue is FIFO without ContentBasedDeduplication enabled, and no MessageDeduplicationId was supplied'
end
options
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@richseviora WDYT:

    def validate_message!(options)
      body = options[:message_body]
      if body.is_a?(Hash)
        options[:message_body] = JSON.dump(body)
      elsif !body.is_a?(String)
        fail ArgumentError, "The message body must be a String and you passed a #{body.class}"
      end

      validate_fifo_message!(options)

      options
    end

    def validate_fifo_message!(options)
      return unless fifo?

      if options[:delay_seconds].is_a?(Fixnum)
        fail ArgumentError, 'FIFO queues do not accept DelaySeconds arguments.'
      end

      if options[:message_group_id].nil?
        fail ArgumentError, 'This queue is FIFO and no message_group_id was provided.'
      end

      if options[:message_deduplication_id].nil? && !has_content_deduplication?
        fail ArgumentError, 'This queue is FIFO without ContentBasedDeduplication enabled, and no MessageDeduplicationId was supplied'
      end
    end

This if options[:message_deduplication_id].nil? && !has_content_deduplication? condition change is because has_content_deduplication? can perform a request, so it's better to check options[:message_deduplication_id] first, because we may not need to check the other which's more expensive.

phstc pushed a commit that referenced this pull request Dec 3, 2016
phstc pushed a commit that referenced this pull request Dec 3, 2016
phstc pushed a commit that referenced this pull request Dec 3, 2016
@phstc
Copy link
Collaborator

phstc commented Dec 3, 2016

Hi @richseviora

I created a follow up PR #276 (branching from your branch, so all your commits are in there), with some ideas, would be able to review it?

@phstc phstc closed this Dec 3, 2016
phstc pushed a commit that referenced this pull request Dec 3, 2016
phstc added a commit that referenced this pull request Dec 3, 2016
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants