Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extracted queue polling strategy #106

Closed
wants to merge 3 commits into from

Conversation

gshutler
Copy link
Contributor

@gshutler gshutler commented Jun 5, 2015

In order to be able to customize the order and manner in which Shoryuken polls SQS queues, the previous hard-coded strategy was extracted into a separate class.

This meant that some of the method names were altered to be more generic (for example rebalance_queue_weight! became messages_received), otherwise this was a direct extraction.


I don't think this is 100% ready for merging, I'm not entirely happy with the names and so forth but I think it's solid enough to start having a discussion around.

My motivation was to be able to have more strict prioritization semantics than provided by the default implementation. I also wanted the option to do an extended poll on the highest priority queue if all queues are empty rather than either continually polling them all or sleeping to not poll anything.

This is my custom strategy that achieves that:

class StrictPriorityPolling
  include Shoryuken::Util

  def initialize(queues)
    @active_queues = unparse_queues(queues)
    @prioritized_queues = @active_queues.sort_by { |queue, priority| -priority }.map(&:first)
    @highest_priority = @prioritized_queues.first
    @queues_since_messages = 0
  end

  def active_queues
    @active_queues
  end

  def next_queue
    options = {}

    queue = @prioritized_queues[@queues_since_messages % @prioritized_queues.length]

    if @highest_priority == queue && @queues_since_messages > 0
      options[:wait_time_seconds] = 2
    end

    @queues_since_messages += 1

    Shoryuken::Polling::QueueConfiguration.new(queue, options)
  end

  def messages_present(queue)
    @queues_since_messages = 0
  end

  def pause(queue)
    # No-op
  end

  def restart(queue)
    # No-op
  end
end

Shoryuken.configure_server do |config|
  config.options[:polling_strategy] = StrictPriorityPolling
end

From this, a strategy could be developed that may satisfy #92.

end
end

def pause_queue!(queue)
return if [email protected]?(queue) || Shoryuken.options[:delay].to_f <= 0
def queue_empty(queue)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assignment Branch Condition size for queue_empty is too high. [16.03/15]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't see how to fix this. The method itself seems pretty straightforward to me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, it seems to be happy with the extraction of the delay method.

@gshutler
Copy link
Contributor Author

gshutler commented Jun 5, 2015

I'll do my best to make houndci happy.

@gshutler
Copy link
Contributor Author

I think the latest RSpec has changed equality behaviour and so the tests are now failing. I'll get that fixed later today.

In order to be able to customize the order and manner in which Shoryuken
polls SQS queues, the previous hard-coded strategy was extracted into a
separate class.

This meant that some of the method names were altered to be more generic
(for example rebalance_queue_weight! became messages_received),
otherwise this was a direct extraction.
Mock "receive" calls appear to use a stronger interpretation of
equality.

Also altered assertions around raised errors to suppress new warnings.
@phstc
Copy link
Collaborator

phstc commented Jul 2, 2015

Hi @gshutler how is that going so far? Are you using it in your setup?

@gshutler
Copy link
Contributor Author

gshutler commented Jul 2, 2015

It's been working well. Been running for at least a couple of weeks in production now without issue.

end

private

def receive_messages(queue, limit)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reason for making this private? This is a breaking api change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because it can be without breaking the implementation, reducing the surface area of the class, and making the protocol of a "fetcher" more obvious if a different implementation were wanted at some point.

@mariokostelac
Copy link
Contributor

I was a little bit sceptic, but idea is not a bad one. The PR is too intrusive though. I'd really like to cut the scope of this one and make sure we are just isolating that component. Cutting the surface area of class interface and changing method interfaces should be part of some other PRs.

@gshutler
Copy link
Contributor Author

I'm don't think there's a way to introduce a strategy without it being intrusive. As mentioned in my last comment I tried to minimize the changes made to other methods that weren't directly related to making the polling strategy work.

I think the only thing that could be undone whilst retaining the functionality is making receive_messages public again.

@mariokostelac
Copy link
Contributor

mariokostelac commented Jul 25, 2016

@gshutler sorry, you're right. I've been looking at PR more closely and it looks good. What I'd like to do is to go maybe a little bit further.
It seems that pausing and restart queues is part of the strategy. There is actually no need to have all that complexity in the manager. If all queues are paused, we can just return no messages and that's it. Strategies should actually be fetchers so we just call something like fetch and we get some work or we get no work.
Do you have time to work on it some more time or you want us to work on top of your commits?
It's tremendous start to move towards "write your own fetcher" thing and make it totally extensible.

@gshutler
Copy link
Contributor Author

@mariokostelac that sounds like it makes sense I did try that route at the time but it changed so much internally that I didn't think it would make a very polite PR!

I don't have the capacity to work on this at the moment, I ended up creating a custom message processor based on a forking model because of the memory leaks that our application experienced that I was never able to get to the bottom of.

Feel free to extend on this work or create a new branch with some of the ideas from it. I do watch this repository so would be happy to cast my eye over the changes if you think that would be helpful.

@mariokostelac
Copy link
Contributor

Closed in favor of #236

@phstc
Copy link
Collaborator

phstc commented Dec 3, 2016

Hey @gshutler

I know that it might be too late, but have you seen the FIFO queues? It can guarantee the message processing order.

https://github.com/phstc/shoryuken/wiki/FIFO-Queues

@gshutler
Copy link
Contributor Author

gshutler commented Dec 5, 2016

@phstc Yeah, they look pretty interesting, though I do think that generally if you rely on precise message ordering you're doing messaging wrong as (for example) at some point a message will fail and then should you stop processing all other messages?

We were more looking for "if both queues have messages, we should process the ones from the high queue first". This is because (to simplify) in general our "low" messages are perfectly fine to be processed any time in the next 60 minutes, but our "high" ones should be processed as soon as possible.

@phstc
Copy link
Collaborator

phstc commented Dec 5, 2016

@gshutler I agree with you, they fit for very specific cases, but in the cases they do, they're very useful.

We were more looking for "if both queues have messages, we should process the ones from the high queue first"

I see that, @atyndall also needs that. I'm starting to work on the concurrent-ruby migration, and after that I plan to review #236 and #263, these PRs should support that. Meanwhile, the only way to do that, is having more than one shoryuken process.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants