Skip to content

Commit

Permalink
Merge pull request #284 from mariokostelac/mario/extract_fetcher
Browse files Browse the repository at this point in the history
Refactor fetcher, polling strategy and manager
  • Loading branch information
phstc authored Dec 13, 2016
2 parents 83a9985 + 7fc8393 commit e83a68b
Show file tree
Hide file tree
Showing 10 changed files with 363 additions and 285 deletions.
4 changes: 3 additions & 1 deletion lib/shoryuken.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
require 'shoryuken/middleware/server/timing'
require 'shoryuken/sns_arn'
require 'shoryuken/topic'
require 'shoryuken/polling'

module Shoryuken
DEFAULTS = {
Expand All @@ -33,7 +34,8 @@ module Shoryuken
startup: [],
quiet: [],
shutdown: [],
}
},
polling_strategy: Polling::WeightedRoundRobin,
}

@@queues = []
Expand Down
61 changes: 16 additions & 45 deletions lib/shoryuken/fetcher.rb
Original file line number Diff line number Diff line change
@@ -1,73 +1,44 @@
module Shoryuken
class Fetcher
include Celluloid
include Util

FETCH_LIMIT = 10

def initialize(manager)
@manager = manager
end

def receive_messages(queue, limit)
# AWS limits the batch size by 10
limit = limit > FETCH_LIMIT ? FETCH_LIMIT : limit

options = (Shoryuken::AwsConfig.options[:receive_message] || {}).dup
options[:max_number_of_messages] = limit
options[:message_attribute_names] = %w(All)
options[:attribute_names] = %w(All)

Shoryuken::Client.queues(queue).receive_messages options
end

def fetch(queue, available_processors)
watchdog('Fetcher#fetch died') do
started_at = Time.now

logger.debug { "Looking for new messages in '#{queue}'" }

begin
batch = Shoryuken.worker_registry.batch_receive_messages?(queue)
limit = batch ? FETCH_LIMIT : available_processors

if (sqs_msgs = Array(receive_messages(queue, limit))).any?
logger.debug { "Found #{sqs_msgs.size} messages for '#{queue}'" }

if batch
@manager.async.assign(queue, patch_sqs_msgs!(sqs_msgs))
else
sqs_msgs.each { |sqs_msg| @manager.async.assign(queue, sqs_msg) }
end

@manager.async.rebalance_queue_weight!(queue)
else
logger.debug { "No message found for '#{queue}'" }

@manager.async.pause_queue!(queue)
end
limit = available_processors > FETCH_LIMIT ? FETCH_LIMIT : available_processors

sqs_msgs = Array(receive_messages(queue, limit))
logger.info { "Found #{sqs_msgs.size} messages for '#{queue.name}'" }
logger.debug { "Fetcher for '#{queue}' completed in #{elapsed(started_at)} ms" }
sqs_msgs
rescue => ex
logger.error { "Error fetching message: #{ex}" }
logger.error { ex.backtrace.first }
[]
end

@manager.async.dispatch
end

end

private

def patch_sqs_msgs!(sqs_msgs)
sqs_msgs.instance_eval do
def message_id
"batch-with-#{size}-messages"
end
end
def receive_messages(queue, limit)
# AWS limits the batch size by 10
limit = limit > FETCH_LIMIT ? FETCH_LIMIT : limit

options = (Shoryuken.options[:aws][:receive_message] || {}).dup
options[:max_number_of_messages] = limit
options[:message_attribute_names] = %w(All)
options[:attribute_names] = %w(All)

options.merge!(queue.options)

sqs_msgs
Shoryuken::Client.queues(queue.name).receive_messages(options)
end
end
end
5 changes: 2 additions & 3 deletions lib/shoryuken/launcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,16 @@ class Launcher
def initialize
@condvar = Celluloid::Condition.new
@manager = Shoryuken::Manager.new_link(@condvar)
@fetcher = Shoryuken::Fetcher.new_link(manager)

@done = false

manager.fetcher = @fetcher
manager.fetcher = Shoryuken::Fetcher.new
manager.polling_strategy = Shoryuken.options[:polling_strategy].new(Shoryuken.queues)
end

def stop(options = {})
watchdog('Launcher#stop') do
@done = true
@fetcher.terminate if @fetcher.alive?

manager.async.stop(shutdown: !!options[:shutdown], timeout: Shoryuken.options[:timeout])
@condvar.wait
Expand Down
133 changes: 49 additions & 84 deletions lib/shoryuken/manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,14 @@ class Manager
include Util

attr_accessor :fetcher
attr_accessor :polling_strategy

exclusive :dispatch

trap_exit :processor_died

BATCH_LIMIT = 10

def initialize(condvar)
@count = Shoryuken.options[:concurrency] || 25
raise(ArgumentError, "Concurrency value #{@count} is invalid, it needs to be a positive number") unless @count > 0
Expand Down Expand Up @@ -40,8 +45,6 @@ def stop(options = {})

fire_event(:shutdown, true)

@fetcher.terminate if @fetcher.alive?

logger.info { "Shutting down #{@ready.size} quiet workers" }

@ready.each do |processor|
Expand Down Expand Up @@ -71,6 +74,7 @@ def processor_done(queue, processor)
return after(0) { @finished.signal } if @busy.empty?
else
@ready << processor
async.dispatch
end
end
end
Expand All @@ -86,6 +90,7 @@ def processor_died(processor, reason)
return after(0) { @finished.signal } if @busy.empty?
else
@ready << build_processor
async.dispatch
end
end
end
Expand All @@ -94,57 +99,27 @@ def stopped?
@done
end

def assign(queue, sqs_msg)
watchdog('Manager#assign died') do
logger.debug { "Assigning #{sqs_msg.message_id}" }

processor = @ready.pop
@busy << processor

processor.async.process(queue, sqs_msg)
end
end

def rebalance_queue_weight!(queue)
watchdog('Manager#rebalance_queue_weight! died') do
if (original = original_queue_weight(queue)) > (current = current_queue_weight(queue))
logger.info { "Increasing '#{queue}' weight to #{current + 1}, max: #{original}" }

@queues << queue
end
end
end

def pause_queue!(queue)
return if !@queues.include?(queue) || Shoryuken.options[:delay].to_f <= 0

logger.debug { "Pausing '#{queue}' for #{Shoryuken.options[:delay].to_f} seconds, because it's empty" }

@queues.delete(queue)

after(Shoryuken.options[:delay].to_f) { async.restart_queue!(queue) }
end


def dispatch
return if stopped?

logger.debug { "Ready: #{@ready.size}, Busy: #{@busy.size}, Active Queues: #{unparse_queues(@queues)}" }
logger.debug { "Ready: #{@ready.size}, Busy: #{@busy.size}, Active Queues: #{polling_strategy.active_queues}" }

if @ready.empty?
logger.debug { 'Pausing fetcher, because all processors are busy' }

dispatch_later
return
end

if (queue = next_queue)
@fetcher.async.fetch(queue, @ready.size)
else
queue = polling_strategy.next_queue
if queue.nil?
logger.debug { 'Pausing fetcher, because all queues are paused' }

@fetcher_paused = true
dispatch_later
return
end

batched_queue?(queue) ? dispatch_batch(queue) : dispatch_single_messages(queue)

async.dispatch
end

def real_thread(proxy_id, thr)
Expand All @@ -160,61 +135,41 @@ def dispatch_later
end
end

def build_processor
processor = Processor.new_link(current_actor)
processor.proxy_id = processor.object_id
processor
end

def restart_queue!(queue)
return if stopped?

unless @queues.include? queue
logger.debug { "Restarting '#{queue}'" }

@queues << queue

if @fetcher_paused
logger.debug { 'Restarting fetcher' }
def assign(queue, sqs_msg)
watchdog('Manager#assign died') do
logger.debug { "Assigning #{sqs_msg.message_id}" }

@fetcher_paused = false
processor = @ready.pop
@busy << processor

dispatch
end
processor.async.process(queue, sqs_msg)
end
end

def current_queue_weight(queue)
queue_weight(@queues, queue)
def dispatch_batch(queue)
batch = fetcher.fetch(queue, BATCH_LIMIT)
polling_strategy.messages_found(queue.name, batch.size)
assign(queue.name, patch_batch!(batch))
end

def original_queue_weight(queue)
queue_weight(Shoryuken.queues, queue)
def dispatch_single_messages(queue)
messages = fetcher.fetch(queue, @ready.size)
polling_strategy.messages_found(queue.name, messages.size)
messages.each { |message| assign(queue.name, message) }
end

def queue_weight(queues, queue)
queues.count { |q| q == queue }
def batched_queue?(queue)
Shoryuken.worker_registry.batch_receive_messages?(queue.name)
end

def next_queue
return nil if @queues.empty?

# get/remove the first queue in the list
queue = @queues.shift

unless defined?(::ActiveJob) || !Shoryuken.worker_registry.workers(queue).empty?
# when no worker registered pause the queue to avoid endless recursion
logger.debug { "Pausing '#{queue}' for #{Shoryuken.options[:delay].to_f} seconds, because no workers registered" }

after(Shoryuken.options[:delay].to_f) { async.restart_queue!(queue) }

return next_queue
end

# add queue back to the end of the list
@queues << queue
def delay
Shoryuken.options[:delay].to_f
end

queue
def build_processor
processor = Processor.new_link(current_actor)
processor.proxy_id = processor.object_id
processor
end

def soft_shutdown(delay)
Expand Down Expand Up @@ -247,5 +202,15 @@ def hard_shutdown_in(delay)
end
end
end

def patch_batch!(sqs_msgs)
sqs_msgs.instance_eval do
def message_id
"batch-with-#{size}-messages"
end
end

sqs_msgs
end
end
end
Loading

0 comments on commit e83a68b

Please sign in to comment.