-
-
Notifications
You must be signed in to change notification settings - Fork 280
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Strict priority polling w/ backmerge #288
Conversation
@mariokostelac would you mind reviewing it? |
@atyndall could you rebase on master first please? We merged my change in the master. |
1eab6b6
to
05b467b
Compare
@mariokostelac Should be done now |
@@ -19,11 +19,49 @@ def ==(other) | |||
end | |||
|
|||
alias_method :eql?, :== | |||
|
|||
def to_s | |||
options.empty? ? name : super |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you explain this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was to tidy up log messages such as these:
Before change;
2016-12-15T23:38:08Z 88280 TID-oxeiaih3s DEBUG: Looking for new messages in '#<struct Shoryuken::Polling::QueueConfiguration name="development_atyndall_affinity_0", options={}>'
2016-12-15T23:38:08Z 88280 TID-oxeiaih3s DEBUG: Fetcher for '#<struct Shoryuken::Polling::QueueConfiguration name="development_atyndall_affinity_0", options={}>' completed in 268.11899999999997 ms
After change;
2016-12-15T23:39:56Z 88353 TID-out2291p8 DEBUG: Looking for new messages in 'development_atyndall_affinity_0'
2016-12-15T23:39:56Z 88353 TID-out2291p8 DEBUG: Fetcher for 'development_atyndall_affinity_0' completed in 217.197 ms
Seeing as QueueConfiguration
s are considered equal if have no options and their strings are equal, it made sense to me to make their string representation just the names of the queues to tidy up logging. This could also be handled by changing inspect
, I'm open to either way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we make a case where options does exist a little bit nicer? :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Made some changes
.each_with_object({}) { |queue, h| h[queue] = [true, nil] } | ||
|
||
# Most recently used queue | ||
@current_queue = nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From what I understand, that it actually last_used_queue
. Can we name it like that? It will make more sense for the loop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rework has eliminated variable altogether
.map(&:first) | ||
|
||
# Pause status of the queues | ||
@queue_status = @queue_order |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can compress queue_status in just one field, something like "activation time". When Time.now < activation_time
, queue is not active yet. When Time.now >= activation_time
, queue is active.
|
||
# Priority ordering of the queues | ||
@queue_order = @queue_priorities | ||
.to_a |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to simplify this code or add a comment of the datastructure coming out :).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
|
||
# Loop through the queue order from the current queue until we find a | ||
# queue that is next in line and is not paused | ||
while true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use a for loop? This external bookkeeping of variable i looks ugly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not drop-in but we have similar code in our custom stuff that works like this:
queues = @queue_order.dup
while true
queue = queues.first
# process queue
queues.rotate!
end
So rather than maintaining i
or @current_queue
you dup
the ordered queues at some point and then Array#rotate!
them as needed to move to the next queue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gshutler Excellent! I didn't know about Array#rotate!
, I'll rework the code around it.
|
||
i += 1 | ||
return queue if active | ||
return nil if i >= @queue_order.length # Prevents infinite looping |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, using For loop gets rid of this line.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
logger.debug "Paused '#{queue}'" | ||
end | ||
|
||
def unpause_queues |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Once we compress the state, we do not have to unpause
queues because we know their order, we just have to check if activation_time > Time.now
or not :).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah yes, I'll implement this approach.
expect(subject.next_queue).to eq(queue2) | ||
end | ||
|
||
it 'cycles when declared asc' do |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why looks the same as previous one, while priorities are different?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The priorities aren't different, the priorities are the number. They are declaration order independent.
Therefore
[shoryuken, 2]
[uppercut, 1]
and
[uppercut, 1]
[shoryuken, 2]
Have the same meaning, because they have the same priority numbers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, did not see that queues are reversed!
@atyndall could you please describe the algorithm you want to implement? That's the only way we can be sure we're on the same page :). |
|
||
# Priority ordering of the queues | ||
@queue_order = @queue_priorities | ||
.to_a |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@queue_order = queues.group_by(&:itself).sort_by { |q, qs| -qs.count }.map(&:first)
May have to do the longhand of Object#itself
if you need support for Ruby < 2.2.0
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice, will use instead
I have made significant changes to the implementation taking into account the feedback, thanks! @mariokostelac Sure, at a high level each queue is assigned a unique priority, describing how critical its work is. When shoryuken goes to select a new job, it chooses the highest priority unpaused queue to check first. If that queue contains no work, it will proceed to check the next highest priority unpaused queue, etc. When new work is received or when a queue unpauses, the checking order is reset to check the highest priority queue once more. |
@atyndall so that's priority queue, right? As long as there are jobs in a queue with the highest priority, poll that queue. Once it has no jobs, pause it for If yes, we can just use https://ruby-doc.org/stdlib-2.3.1/libdoc/set/rdoc/SortedSet.html. |
@mariokostelac Yeah that's correct. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we have to figure out how to release that. This change is actually moving from "one polling strategy" model to "choose your polling strategy" model.
I suggest we do something similar to:
- we do not share options between polling strategies (no sharing
delay
option, it's confusing) - we cut a new version where we deprecate
delay
parameter (it's confusing anyway). The new version also has a way to choose polling strategy and parameters are scoped per polling strategy (while weighted round robin still supportingdelay
for some time).
WDYT @phstc?
@queue_order = @initial_order.dup | ||
|
||
# Pause status of the queues, default to past time (unpaused) | ||
@paused_until = queues |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
pause(queue) | ||
else | ||
# Reset the queue order to the initial ordering | ||
@queue_order = @initial_order.dup |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we're using this reset_queue_order
several times, can we isolate that into a method? If we do so, we do not even need a comment. Let the method name talk! :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will add such a method
|
||
# `rotate!` through the queue list until we find an unpaused queue | ||
begin | ||
next_queue = @queue_order.first |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is stored in here? It seems that .map(&:first)
in initializer stores just the q in here, no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's getting refactored out now
|
||
allow(subject).to receive(:delay).and_return(10) | ||
|
||
now = Time.now |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good practice 👍
now = Time.now | ||
|
||
# Return nil if all queues are paused to prevent infinite loop | ||
return nil if @paused_until.values.all? { |t| t > now } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmmm, I know I am picky here but this code seems a little bit complicated to me :). Since our number of queues has to be low (100 at most), we do not have to think about speed too much and we can implement a straightforward solution - iterate over queues from index last_index
(like you did before) and just check if a queue is paused. The first one that is not - has to be returned. If it ever gets slow (I really doubt that :)), we can optimize this class, but I'd rather have something that is very simple and everybody can understand, fast!
I think it's far simpler than maintaining that initial_order
, having a check from line 174, making sure that we call dup
every time we want to get a copy etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The whole next_active_queue
would be very simple, something like:
def next_active_queue
size = @queues.length
for 0..limit do
queue = @queues[@next_queue_index]
@next_queue_index = (@next_queue_index + 1) % size
return queue unless queue_paused?(queue)
end
return nil
end
We would be able to have very simple, reusable queue_paused?
method.
Also, reset_next_queue
is simple as @next_queue_index = 0
. We have to call it in initializer, too.
WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good, I'll implement that approach.
end | ||
|
||
def active_queues | ||
@paused_until |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, once we have queue_paused?
, we can use something simple as
@queues.reject{ |q| queue_paused?(q) }
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's far easier to understand
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll add the method
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Made additional changes to simplify methods further, and move them into smaller submethods.
now = Time.now | ||
|
||
# Return nil if all queues are paused to prevent infinite loop | ||
return nil if @paused_until.values.all? { |t| t > now } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good, I'll implement that approach.
end | ||
|
||
def active_queues | ||
@paused_until |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll add the method
pause(queue) | ||
else | ||
# Reset the queue order to the initial ordering | ||
@queue_order = @initial_order.dup |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will add such a method
@@ -19,11 +19,49 @@ def ==(other) | |||
end | |||
|
|||
alias_method :eql?, :== | |||
|
|||
def to_s | |||
options.empty? ? name : super |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Made some changes
|
||
# `rotate!` through the queue list until we find an unpaused queue | ||
begin | ||
next_queue = @queue_order.first |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's getting refactored out now
@mariokostelac I think most of people they don't care about which polling strategy they need to use, the only want to make it work. The shoryuken.yml by default (in the samples/README) should be something like this concurrency: 25 # The number of allocated threads to process messages. Default 25
queues:
- [high_priority, 6]
- [default, 2]
- [low_priority, 1] I would remove the With that, I wouldn't do any major release, I would just let "choose your polling strategy" something special for now, documented in the wiki. Keeping shoryuken as simple and straightforward as possible, I don't mind about keeping the WDYT? Is this change introducing any breaking changes? |
@phstc AFAIK there are no breaking changes, merging this in should not have any impact on anyone who doesn't use it. For your information, this is what our queueing config looks like atm;
|
@atyndall nice! |
@phstc Yep |
@atyndall @phstc I am fine with having a default polling strategy being set and all associated parameters, but I am not really happy with sharing these parameters between polling strategies. They could have slightly or completely different meanings. That's why I'd like to namespace these parameters in new version :). Also, |
@mariokostelac I agree, I just want to make the basic |
@phstc This is the strict priority polling implementation from #263 with master backmerged, it didn't make it when you closed #263 in favour of #284.