Skip to content

Commit

Permalink
Turn dispatch/dispatch_later into a heartbeat
Browse files Browse the repository at this point in the history
  • Loading branch information
Pablo Cantero committed Dec 19, 2016
1 parent 9b9d4b3 commit 50dcdf0
Showing 1 changed file with 6 additions and 17 deletions.
23 changes: 6 additions & 17 deletions lib/shoryuken/manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ def initialize(fetcher, polling_strategy)

@done = Concurrent::AtomicBoolean.new(false)

@dispatch_later = Concurrent::AtomicBoolean.new(false)

@fetcher = fetcher
@polling_strategy = polling_strategy

Expand Down Expand Up @@ -51,45 +49,36 @@ def processor_done(queue)
logger.debug { "Process done for '#{queue}'" }

@ready.increment

dispatch_later unless @done.true?
end

private

def dispatch
return if @done.true?

logger.debug { "Ready: #{@ready.value}, Busy: #{busy}, Active Queues: #{@polling_strategy.active_queues}" }

if @ready.value == 0
logger.debug { 'Pausing fetcher, because all processors are busy' }
dispatch_later
return
return dispatch_later
end

unless queue = @polling_strategy.next_queue
logger.debug { 'Pausing fetcher, because all queues are paused' }
dispatch_later
return
return dispatch_later
end

batched_queue?(queue) ? dispatch_batch(queue) : dispatch_single_messages(queue)

dispatch_later
return dispatch_later
end

private

def busy
@count - @ready.value
end

def dispatch_later
return unless @dispatch_later.make_true

after(1) do
@dispatch_later.make_false
dispatch
end
after(1) { dispatch }
end

def assign(queue, sqs_msg)
Expand Down

0 comments on commit 50dcdf0

Please sign in to comment.