Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unpause FIFO queues on worker completion #644

Merged
merged 4 commits into from
Jan 24, 2021
Merged

Unpause FIFO queues on worker completion #644

merged 4 commits into from
Jan 24, 2021

Conversation

davidrichey
Copy link
Contributor

@davidrichey davidrichey commented Jan 19, 2021

  • Updated Manager#processor_done to check for FIFO queue & unpause
    • Added specs
  • Added unpause method on strategies
    • Added specs around #unpause

closes #643

* Add in UnpauseQueue middleware
  * Added specs
* Added unpause method on strategies
  * TODO: Add specs around #unpause
* Updates Processor, Manager & Options to allow new variables
  * TODO: Add specs
Copy link
Collaborator

@cjlarose cjlarose left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall approach seems solid! I took another pass after reviewing some of the surrounding context.

@@ -38,6 +38,11 @@ def active_queues
.reverse
end

def unpause_queue(queue)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For symmetry with the method messages_found, let's consider renaming this method message_processed.

The reason is that the interaction between the Manager class and the polling strategy is that the manager is just informing the polling strategy of certain events, allowing the polling strategy to respond as it sees fit. In this case, it's not the responsibility of the Manager to tell the polling strategy to pause a queue, it's just the responsibility for the Manager to let the polling strategy know that it's done processing the message: the polling strategy is welcome to do whatever it wants with that knowledge.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's also add message_processed to Shoryuken::Polling::BaseStrategy. Its implementation can be empty (for backward compatibility--users might have subclassed the base class themselves).

client_queue = Shoryuken::Client.queues(queue)
return unless client_queue.fifo?

@polling_strategy.unpause_queue(queue)
Copy link
Collaborator

@cjlarose cjlarose Jan 22, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since users can specify their own polling strategy class, we want to make it optional to implement the new method. Let's invoke this method only if @polling_strategy responds to it. Otherwise, this change would be backwards-incompatible.

@davidrichey davidrichey marked this pull request as ready for review January 23, 2021 12:51
@cjlarose
Copy link
Collaborator

Thanks for being so responsive to feedback! I'll do some testing on some real queues, but overall, this is looking great!

@cjlarose cjlarose self-assigned this Jan 24, 2021
@cjlarose
Copy link
Collaborator

cjlarose commented Jan 24, 2021

I tested this locally on some FIFO queues. On master, I observed that if concurrency is more than 1 and you have a delay configured, FIFO message throughput is very poor because the "fetcher" pauses every time it gets an empty receive.

On this branch, I observed that the fetcher resumed whenever a job is finished, improving throughput, especially when the number of distinct message_group_ids is small.

Excellent work, @davidrichey!

@cjlarose cjlarose merged commit 2a596a8 into ruby-shoryuken:master Jan 24, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

FIFO Queues with delays
2 participants