Skip to content

Commit

Permalink
Move queue pausing responsibility to PollingStrategy
Browse files Browse the repository at this point in the history
  • Loading branch information
mariokostelac committed Dec 13, 2016
1 parent 314038d commit ef73e17
Show file tree
Hide file tree
Showing 7 changed files with 200 additions and 268 deletions.
95 changes: 50 additions & 45 deletions lib/shoryuken/fetcher.rb
Original file line number Diff line number Diff line change
@@ -1,74 +1,79 @@
module Shoryuken
class Fetcher
include Celluloid
include Util
module Shoryuken
class Fetcher
include Celluloid
include Util

FETCH_LIMIT = 10
FETCH_LIMIT = 10

def initialize(manager)
@manager = manager
end
def initialize(manager, polling_strategy)
@manager = manager
@polling_strategy = polling_strategy
@delay = Shoryuken.options[:delay].to_f
end

def fetch(queue, available_processors)
watchdog('Fetcher#fetch died') do
started_at = Time.now
def fetch(queue, available_processors)
watchdog('Fetcher#fetch died') do
started_at = Time.now

logger.debug { "Looking for new messages in '#{queue}'" }
logger.debug { "Looking for new messages in '#{queue}'" }

begin
batch = Shoryuken.worker_registry.batch_receive_messages?(queue.name)
limit = batch ? FETCH_LIMIT : available_processors
begin
batch = Shoryuken.worker_registry.batch_receive_messages?(queue.name)
limit = batch ? FETCH_LIMIT : available_processors

if (sqs_msgs = Array(receive_messages(queue, limit))).any?
logger.debug { "Found #{sqs_msgs.size} messages for '#{queue}'" }
sqs_msgs = Array(receive_messages(queue, limit))
logger.info { "Found #{sqs_msgs.size} messages for '#{queue}'" }

if batch
@manager.async.assign(queue.name, patch_sqs_msgs!(sqs_msgs))
else
sqs_msgs.each { |sqs_msg| @manager.async.assign(queue.name, sqs_msg) }
end

@manager.async.messages_present(queue)
else
logger.debug { "No message found for '#{queue}'" }
@polling_strategy.messages_found(queue, sqs_msgs.size)

@manager.async.queue_empty(queue)
logger.debug { "Fetcher for '#{queue}' completed in #{elapsed(started_at)} ms" }
rescue => ex
logger.error { "Error fetching message: #{ex}" }
logger.error { ex.backtrace.first }
end

logger.debug { "Fetcher for '#{queue}' completed in #{elapsed(started_at)} ms" }
rescue => ex
logger.error { "Error fetching message: #{ex}" }
logger.error { ex.backtrace.first }
@manager.async.dispatch
end
end

@manager.async.dispatch
def next_queue(*args)
@polling_strategy.next_queue(*args)
end
end

private
def active_queues(*args)
@polling_strategy.active_queues(*args)
end

def receive_messages(queue, limit)
# AWS limits the batch size by 10
limit = limit > FETCH_LIMIT ? FETCH_LIMIT : limit
private

options = (Shoryuken.options[:aws][:receive_message] || {}).dup
options[:max_number_of_messages] = limit
options[:message_attribute_names] = %w(All)
options[:attribute_names] = %w(All)
def receive_messages(queue, limit)
# AWS limits the batch size by 10
limit = limit > FETCH_LIMIT ? FETCH_LIMIT : limit

options.merge!(queue.options)
options = (Shoryuken.options[:aws][:receive_message] || {}).dup
options[:max_number_of_messages] = limit
options[:message_attribute_names] = %w(All)
options[:attribute_names] = %w(All)

Shoryuken::Client.queues(queue.name).receive_messages options
end
options.merge!(queue.options)

def patch_sqs_msgs!(sqs_msgs)
sqs_msgs.instance_eval do
def message_id
"batch-with-#{size}-messages"
end
Shoryuken::Client.queues(queue.name).receive_messages options
end

sqs_msgs
def patch_sqs_msgs!(sqs_msgs)
sqs_msgs.instance_eval do
def message_id
"batch-with-#{size}-messages"
end
end

sqs_msgs
end
end
end
end
3 changes: 2 additions & 1 deletion lib/shoryuken/launcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ class Launcher
def initialize
@condvar = Celluloid::Condition.new
@manager = Shoryuken::Manager.new_link(@condvar)
@fetcher = Shoryuken::Fetcher.new_link(manager)
polling_strategy = Shoryuken.options[:polling_strategy].new(Shoryuken.queues)
@fetcher = Shoryuken::Fetcher.new_link(manager, polling_strategy)

@done = false

Expand Down
71 changes: 12 additions & 59 deletions lib/shoryuken/manager.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
require 'shoryuken/processor'
require 'shoryuken/polling'
require 'shoryuken/fetcher'

module Shoryuken
Expand All @@ -15,7 +14,6 @@ def initialize(condvar)
@count = Shoryuken.options[:concurrency] || 25
raise(ArgumentError, "Concurrency value #{@count} is invalid, it needs to be a positive number") unless @count > 0
@queues = Shoryuken.queues.dup.uniq
@polling_strategy = Shoryuken.options[:polling_strategy].new(Shoryuken.queues)
@finished = condvar

@done = false
Expand Down Expand Up @@ -107,42 +105,31 @@ def assign(queue, sqs_msg)
end
end

def messages_present(queue)
watchdog('Manager#messages_present died') do
@polling_strategy.messages_present(queue)
end
end

def queue_empty(queue)
return if delay <= 0

logger.debug { "Pausing '#{queue}' for #{delay} seconds, because it's empty" }

@polling_strategy.pause(queue)

after(delay) { async.restart_queue!(queue) }
end


def dispatch
return if stopped?

logger.debug { "Ready: #{@ready.size}, Busy: #{@busy.size}, Active Queues: #{@polling_strategy.active_queues}" }
logger.debug { "Ready: #{@ready.size}, Busy: #{@busy.size}, Active Queues: #{@fetcher.active_queues}" }

if @ready.empty?
logger.debug { 'Pausing fetcher, because all processors are busy' }

dispatch_later
return
end

if (queue = next_queue)
@fetcher.async.fetch(queue, @ready.size)
else
queue = @fetcher.next_queue
if queue == nil
logger.debug { 'Pausing fetcher, because all queues are paused' }
after(1) { dispatch }
return
end

@fetcher_paused = true
unless defined?(::ActiveJob) || Shoryuken.worker_registry.workers(queue.name).any?
logger.debug { "Pausing fetcher, because of no registered workers for queue #{queue}" }
after(1) { dispatch }
return
end

@fetcher.async.fetch(queue, @ready.size)
end

def real_thread(proxy_id, thr)
Expand All @@ -168,40 +155,6 @@ def build_processor
processor
end

def restart_queue!(queue)
return if stopped?

@polling_strategy.restart(queue)

if @fetcher_paused
logger.debug { 'Restarting fetcher' }

@fetcher_paused = false

dispatch
end
end

def next_queue
# get/remove the first queue in the list
queue = @polling_strategy.next_queue

return nil unless queue

if queue && (!defined?(::ActiveJob) && Shoryuken.worker_registry.workers(queue.name).empty?)
# when no worker registered pause the queue to avoid endless recursion
logger.debug { "Pausing '#{queue}' for #{delay} seconds, because no workers registered" }

@polling_strategy.pause(queue)

after(delay) { async.restart_queue!(queue) }

queue = next_queue
end

queue
end

def soft_shutdown(delay)
logger.info { "Waiting for #{@busy.size} busy workers" }

Expand Down
54 changes: 36 additions & 18 deletions lib/shoryuken/polling.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,34 +27,34 @@ class WeightedRoundRobin
def initialize(queues)
@initial_queues = queues
@queues = queues.dup.uniq
end

def active_queues
unparse_queues(@queues)
@paused_queues = []
end

def next_queue
unpause_queues
queue = @queues.shift
return nil if queue == nil

@queues << queue
QueueConfiguration.new(queue, {})
end

def messages_present(queue)
return unless (original = original_queue_weight(queue)) > (current = current_queue_weight(queue))

logger.info "Increasing '#{queue}' weight to #{current + 1}, max: #{original}"
@queues << queue
end
def messages_found(queue, messages_found)
if messages_found == 0
pause(queue)
return
end

def pause(queue)
return unless @queues.delete(queue)
logger.debug "Paused '#{queue}'"
maximum_weight = maximum_queue_weight(queue)
current_weight = current_queue_weight(queue)
if maximum_weight > current_weight
logger.info { "Increasing '#{queue}' weight to #{current_weight + 1}, max: #{maximum_weight}" }
@queues << queue
end
end

def restart(queue)
return if @queues.include?(queue)
logger.debug "Restarting '#{queue}'"
@queues << queue
def active_queues
unparse_queues(@queues)
end

def ==(other)
Expand All @@ -72,11 +72,29 @@ def ==(other)

private

def delay
Shoryuken.options[:delay].to_f
end

def pause(queue)
return unless @queues.delete(queue)
@paused_queues << [Time.now + delay, queue]
logger.debug "Paused '#{queue}'"
end

def unpause_queues
return if @paused_queues.empty?
return if Time.now < @paused_queues.first[0]
pause = @paused_queues.shift
@queues << pause[1]
logger.debug "Unpaused '#{pause[1]}'"
end

def current_queue_weight(queue)
queue_weight(@queues, queue)
end

def original_queue_weight(queue)
def maximum_queue_weight(queue)
queue_weight(@initial_queues, queue)
end

Expand Down
Loading

0 comments on commit ef73e17

Please sign in to comment.