Skip to content

Commit

Permalink
Merge pull request #644 from davidrichey/unpause_fifo_queues
Browse files Browse the repository at this point in the history
Unpause FIFO queues on worker completion
  • Loading branch information
cjlarose authored Jan 24, 2021
2 parents 1a261d6 + f2990d9 commit 2a596a8
Show file tree
Hide file tree
Showing 7 changed files with 73 additions and 4 deletions.
14 changes: 10 additions & 4 deletions lib/shoryuken/manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,13 @@ def ready
@max_processors - busy
end

def processor_done
def processor_done(queue)
@busy_processors.decrement
client_queue = Shoryuken::Client.queues(queue)
return unless client_queue.fifo?
return unless @polling_strategy.respond_to?(:message_processed)

@polling_strategy.message_processed(queue)
end

def assign(queue_name, sqs_msg)
Expand All @@ -68,9 +73,10 @@ def assign(queue_name, sqs_msg)

@busy_processors.increment

Concurrent::Promise.execute(
executor: @executor
) { Processor.process(queue_name, sqs_msg) }.then { processor_done }.rescue { processor_done }
Concurrent::Promise
.execute(executor: @executor) { Processor.process(queue_name, sqs_msg) }
.then { processor_done(queue_name) }
.rescue { processor_done(queue_name) }
end

def dispatch_batch(queue)
Expand Down
2 changes: 2 additions & 0 deletions lib/shoryuken/polling/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ def messages_found(_queue, _messages_found)
fail NotImplementedError
end

def message_processed(_queue); end

def active_queues
fail NotImplementedError
end
Expand Down
6 changes: 6 additions & 0 deletions lib/shoryuken/polling/strict_priority.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ def active_queues
.reverse
end

def message_processed(queue)
logger.debug "Unpausing #{queue}"
@paused_until[queue] = Time.now
end

private

def next_active_queue
Expand Down Expand Up @@ -70,6 +75,7 @@ def queue_paused?(queue)

def pause(queue)
return unless delay > 0

@paused_until[queue] = Time.now + delay
logger.debug "Paused #{queue}"
end
Expand Down
11 changes: 11 additions & 0 deletions lib/shoryuken/polling/weighted_round_robin.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,28 @@ def active_queues
unparse_queues(@queues)
end

def message_processed(queue)
return if @paused_queues.empty?

logger.debug "Unpausing #{queue}"
@paused_queues.reject! { |_time, name| name == queue }
@queues << queue
@queues.uniq!
end

private

def pause(queue)
return unless @queues.delete(queue)

@paused_queues << [Time.now + delay, queue]
logger.debug "Paused #{queue}"
end

def unpause_queues
return if @paused_queues.empty?
return if Time.now < @paused_queues.first[0]

pause = @paused_queues.shift
@queues << pause[1]
logger.debug "Unpaused #{pause[1]}"
Expand Down
24 changes: 24 additions & 0 deletions spec/shoryuken/manager_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -139,4 +139,28 @@
subject.send(:dispatch_single_messages, q)
end
end

describe '#processor_done' do
let(:sqs_queue) { double Shoryuken::Queue }

before do
allow(Shoryuken::Client).to receive(:queues).with(queue).and_return(sqs_queue)
end

context 'when queue.fifo? is true' do
it 'calls message_processed on strategy' do
expect(sqs_queue).to receive(:fifo?).and_return(true)
expect(polling_strategy).to receive(:message_processed).with(queue)
subject.send(:processor_done, queue)
end
end

context 'when queue.fifo? is false' do
it 'does not call message_processed on strategy' do
expect(sqs_queue).to receive(:fifo?).and_return(false)
expect(polling_strategy).to_not receive(:message_processed)
subject.send(:processor_done, queue)
end
end
end
end
10 changes: 10 additions & 0 deletions spec/shoryuken/polling/strict_priority_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -145,4 +145,14 @@
expect(subject.next_queue).to eq(queue3)
end
end

describe '#message_processed' do
it 'removes paused queue, adds to active queues' do
strategy = Shoryuken::Polling::StrictPriority.new([queue1, queue2])
strategy.send(:pause, queue1)
expect(strategy.active_queues).to eq([[queue2, 1]])
strategy.message_processed(queue1)
expect(strategy.active_queues).to eq([[queue1, 2], [queue2, 1]])
end
end
end
10 changes: 10 additions & 0 deletions spec/shoryuken/polling/weighted_round_robin_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -104,4 +104,14 @@
expect(subject.delay).to eq(1.0)
end
end

describe '#message_processed' do
it 'removes paused queue, adds to active queues' do
strategy = Shoryuken::Polling::WeightedRoundRobin.new([queue1, queue2])
strategy.send(:pause, queue1)
expect(strategy.active_queues).to eq([[queue2, 1]])
strategy.message_processed(queue1)
expect(strategy.active_queues).to eq([[queue2, 1], [queue1, 1]])
end
end
end

0 comments on commit 2a596a8

Please sign in to comment.