Skip to content

Commit

Permalink
Remove dispatch pause, it's now controller in the polling strategy
Browse files Browse the repository at this point in the history
  • Loading branch information
Pablo Cantero committed Feb 17, 2017
1 parent 9395b98 commit 574f927
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 13 deletions.
18 changes: 8 additions & 10 deletions lib/shoryuken/manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ module Shoryuken
class Manager
include Util

BATCH_LIMIT = 10
BATCH_LIMIT = 10
HEARTBEAT_INTERVAL = 0.1

def initialize(fetcher, polling_strategy)
@count = Shoryuken.options.fetch(:concurrency, 25)
Expand All @@ -17,7 +18,9 @@ def initialize(fetcher, polling_strategy)
@fetcher = fetcher
@polling_strategy = polling_strategy

@heartbeat = Concurrent::TimerTask.new(run_now: true, execution_interval: 0.1, timeout_interval: 60) { dispatch }
@heartbeat = Concurrent::TimerTask.new(run_now: true,
execution_interval: HEARTBEAT_INTERVAL,
timeout_interval: 60) { dispatch }

@pool = Concurrent::FixedThreadPool.new(@count, max_queue: @count)
end
Expand Down Expand Up @@ -59,15 +62,10 @@ def dispatch
return if @done.true?
return unless @dispatching.make_true

logger.debug { "Ready: #{ready}, Busy: #{busy}, Active Queues: #{@polling_strategy.active_queues}" }

if ready == 0
return logger.debug { 'Pausing fetcher, because all processors are busy' }
end
return if ready == 0
return unless (queue = @polling_strategy.next_queue)

unless (queue = @polling_strategy.next_queue)
return logger.debug { 'Pausing fetcher, because all queues are paused' }
end
logger.debug { "Ready: #{ready}, Busy: #{busy}, Active Queues: #{@polling_strategy.active_queues}" }

batched_queue?(queue) ? dispatch_batch(queue) : dispatch_single_messages(queue)
ensure
Expand Down
4 changes: 1 addition & 3 deletions lib/shoryuken/polling.rb
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ def delay
end

class WeightedRoundRobin < BaseStrategy

def initialize(queues)
@initial_queues = queues
@queues = queues.dup.uniq
Expand Down Expand Up @@ -129,7 +128,6 @@ def queue_weight(queues, queue)
end

class StrictPriority < BaseStrategy

def initialize(queues)
# Priority ordering of the queues, highest priority first
@queues = queues
Expand Down Expand Up @@ -178,7 +176,7 @@ def next_active_queue
return queue unless queue_paused?(queue)
end

return nil
nil
end

def queues_unpaused_since?
Expand Down

0 comments on commit 574f927

Please sign in to comment.