diff --git a/lib/shoryuken/fetcher.rb b/lib/shoryuken/fetcher.rb index 6d732545..5e7b2004 100644 --- a/lib/shoryuken/fetcher.rb +++ b/lib/shoryuken/fetcher.rb @@ -1,26 +1,28 @@ -module Shoryuken - class Fetcher - include Celluloid - include Util + module Shoryuken + class Fetcher + include Celluloid + include Util - FETCH_LIMIT = 10 + FETCH_LIMIT = 10 - def initialize(manager) - @manager = manager - end + def initialize(manager, polling_strategy) + @manager = manager + @polling_strategy = polling_strategy + @delay = Shoryuken.options[:delay].to_f + end - def fetch(queue, available_processors) - watchdog('Fetcher#fetch died') do - started_at = Time.now + def fetch(queue, available_processors) + watchdog('Fetcher#fetch died') do + started_at = Time.now - logger.debug { "Looking for new messages in '#{queue}'" } + logger.debug { "Looking for new messages in '#{queue}'" } - begin - batch = Shoryuken.worker_registry.batch_receive_messages?(queue.name) - limit = batch ? FETCH_LIMIT : available_processors + begin + batch = Shoryuken.worker_registry.batch_receive_messages?(queue.name) + limit = batch ? FETCH_LIMIT : available_processors - if (sqs_msgs = Array(receive_messages(queue, limit))).any? - logger.debug { "Found #{sqs_msgs.size} messages for '#{queue}'" } + sqs_msgs = Array(receive_messages(queue, limit)) + logger.info { "Found #{sqs_msgs.size} messages for '#{queue}'" } if batch @manager.async.assign(queue.name, patch_sqs_msgs!(sqs_msgs)) @@ -28,47 +30,50 @@ def fetch(queue, available_processors) sqs_msgs.each { |sqs_msg| @manager.async.assign(queue.name, sqs_msg) } end - @manager.async.messages_present(queue) - else - logger.debug { "No message found for '#{queue}'" } + @polling_strategy.messages_found(queue, sqs_msgs.size) - @manager.async.queue_empty(queue) + logger.debug { "Fetcher for '#{queue}' completed in #{elapsed(started_at)} ms" } + rescue => ex + logger.error { "Error fetching message: #{ex}" } + logger.error { ex.backtrace.first } end - logger.debug { "Fetcher for '#{queue}' completed in #{elapsed(started_at)} ms" } - rescue => ex - logger.error { "Error fetching message: #{ex}" } - logger.error { ex.backtrace.first } + @manager.async.dispatch end + end - @manager.async.dispatch + def next_queue(*args) + @polling_strategy.next_queue(*args) end - end - private + def active_queues(*args) + @polling_strategy.active_queues(*args) + end - def receive_messages(queue, limit) - # AWS limits the batch size by 10 - limit = limit > FETCH_LIMIT ? FETCH_LIMIT : limit + private - options = (Shoryuken.options[:aws][:receive_message] || {}).dup - options[:max_number_of_messages] = limit - options[:message_attribute_names] = %w(All) - options[:attribute_names] = %w(All) + def receive_messages(queue, limit) + # AWS limits the batch size by 10 + limit = limit > FETCH_LIMIT ? FETCH_LIMIT : limit - options.merge!(queue.options) + options = (Shoryuken.options[:aws][:receive_message] || {}).dup + options[:max_number_of_messages] = limit + options[:message_attribute_names] = %w(All) + options[:attribute_names] = %w(All) - Shoryuken::Client.queues(queue.name).receive_messages options - end + options.merge!(queue.options) - def patch_sqs_msgs!(sqs_msgs) - sqs_msgs.instance_eval do - def message_id - "batch-with-#{size}-messages" - end + Shoryuken::Client.queues(queue.name).receive_messages options end - sqs_msgs + def patch_sqs_msgs!(sqs_msgs) + sqs_msgs.instance_eval do + def message_id + "batch-with-#{size}-messages" + end + end + + sqs_msgs + end end end -end diff --git a/lib/shoryuken/launcher.rb b/lib/shoryuken/launcher.rb index 4d125a1d..6b692b35 100644 --- a/lib/shoryuken/launcher.rb +++ b/lib/shoryuken/launcher.rb @@ -10,7 +10,8 @@ class Launcher def initialize @condvar = Celluloid::Condition.new @manager = Shoryuken::Manager.new_link(@condvar) - @fetcher = Shoryuken::Fetcher.new_link(manager) + polling_strategy = Shoryuken.options[:polling_strategy].new(Shoryuken.queues) + @fetcher = Shoryuken::Fetcher.new_link(manager, polling_strategy) @done = false diff --git a/lib/shoryuken/manager.rb b/lib/shoryuken/manager.rb index e8cc6a8d..de2ba55f 100644 --- a/lib/shoryuken/manager.rb +++ b/lib/shoryuken/manager.rb @@ -1,5 +1,4 @@ require 'shoryuken/processor' -require 'shoryuken/polling' require 'shoryuken/fetcher' module Shoryuken @@ -15,7 +14,6 @@ def initialize(condvar) @count = Shoryuken.options[:concurrency] || 25 raise(ArgumentError, "Concurrency value #{@count} is invalid, it needs to be a positive number") unless @count > 0 @queues = Shoryuken.queues.dup.uniq - @polling_strategy = Shoryuken.options[:polling_strategy].new(Shoryuken.queues) @finished = condvar @done = false @@ -107,42 +105,31 @@ def assign(queue, sqs_msg) end end - def messages_present(queue) - watchdog('Manager#messages_present died') do - @polling_strategy.messages_present(queue) - end - end - - def queue_empty(queue) - return if delay <= 0 - - logger.debug { "Pausing '#{queue}' for #{delay} seconds, because it's empty" } - - @polling_strategy.pause(queue) - - after(delay) { async.restart_queue!(queue) } - end - - def dispatch return if stopped? - logger.debug { "Ready: #{@ready.size}, Busy: #{@busy.size}, Active Queues: #{@polling_strategy.active_queues}" } + logger.debug { "Ready: #{@ready.size}, Busy: #{@busy.size}, Active Queues: #{@fetcher.active_queues}" } if @ready.empty? logger.debug { 'Pausing fetcher, because all processors are busy' } - dispatch_later return end - if (queue = next_queue) - @fetcher.async.fetch(queue, @ready.size) - else + queue = @fetcher.next_queue + if queue == nil logger.debug { 'Pausing fetcher, because all queues are paused' } + after(1) { dispatch } + return + end - @fetcher_paused = true + unless defined?(::ActiveJob) || Shoryuken.worker_registry.workers(queue.name).any? + logger.debug { "Pausing fetcher, because of no registered workers for queue #{queue}" } + after(1) { dispatch } + return end + + @fetcher.async.fetch(queue, @ready.size) end def real_thread(proxy_id, thr) @@ -168,40 +155,6 @@ def build_processor processor end - def restart_queue!(queue) - return if stopped? - - @polling_strategy.restart(queue) - - if @fetcher_paused - logger.debug { 'Restarting fetcher' } - - @fetcher_paused = false - - dispatch - end - end - - def next_queue - # get/remove the first queue in the list - queue = @polling_strategy.next_queue - - return nil unless queue - - if queue && (!defined?(::ActiveJob) && Shoryuken.worker_registry.workers(queue.name).empty?) - # when no worker registered pause the queue to avoid endless recursion - logger.debug { "Pausing '#{queue}' for #{delay} seconds, because no workers registered" } - - @polling_strategy.pause(queue) - - after(delay) { async.restart_queue!(queue) } - - queue = next_queue - end - - queue - end - def soft_shutdown(delay) logger.info { "Waiting for #{@busy.size} busy workers" } diff --git a/lib/shoryuken/polling.rb b/lib/shoryuken/polling.rb index 508156aa..a656ae57 100644 --- a/lib/shoryuken/polling.rb +++ b/lib/shoryuken/polling.rb @@ -27,34 +27,34 @@ class WeightedRoundRobin def initialize(queues) @initial_queues = queues @queues = queues.dup.uniq - end - - def active_queues - unparse_queues(@queues) + @paused_queues = [] end def next_queue + unpause_queues queue = @queues.shift + return nil if queue == nil + @queues << queue QueueConfiguration.new(queue, {}) end - def messages_present(queue) - return unless (original = original_queue_weight(queue)) > (current = current_queue_weight(queue)) - - logger.info "Increasing '#{queue}' weight to #{current + 1}, max: #{original}" - @queues << queue - end + def messages_found(queue, messages_found) + if messages_found == 0 + pause(queue) + return + end - def pause(queue) - return unless @queues.delete(queue) - logger.debug "Paused '#{queue}'" + maximum_weight = maximum_queue_weight(queue) + current_weight = current_queue_weight(queue) + if maximum_weight > current_weight + logger.info { "Increasing '#{queue}' weight to #{current_weight + 1}, max: #{maximum_weight}" } + @queues << queue + end end - def restart(queue) - return if @queues.include?(queue) - logger.debug "Restarting '#{queue}'" - @queues << queue + def active_queues + unparse_queues(@queues) end def ==(other) @@ -72,11 +72,29 @@ def ==(other) private + def delay + Shoryuken.options[:delay].to_f + end + + def pause(queue) + return unless @queues.delete(queue) + @paused_queues << [Time.now + delay, queue] + logger.debug "Paused '#{queue}'" + end + + def unpause_queues + return if @paused_queues.empty? + return if Time.now < @paused_queues.first[0] + pause = @paused_queues.shift + @queues << pause[1] + logger.debug "Unpaused '#{pause[1]}'" + end + def current_queue_weight(queue) queue_weight(@queues, queue) end - def original_queue_weight(queue) + def maximum_queue_weight(queue) queue_weight(@initial_queues, queue) end diff --git a/spec/shoryuken/fetcher_spec.rb b/spec/shoryuken/fetcher_spec.rb index a5b43bdb..6b9be0c6 100644 --- a/spec/shoryuken/fetcher_spec.rb +++ b/spec/shoryuken/fetcher_spec.rb @@ -2,41 +2,47 @@ require 'shoryuken/manager' require 'shoryuken/fetcher' + describe Shoryuken::Fetcher do let(:manager) { double Shoryuken::Manager } let(:queue) { double Shoryuken::Queue } let(:queue_name) { 'default' } + let(:queues) { [queue_name] } let(:queue_config) { Shoryuken::Polling::QueueConfiguration.new(queue_name, {}) } + let(:polling_strategy) { Shoryuken::Polling::WeightedRoundRobin.new(queues) } let(:sqs_msg) do double Shoryuken::Message, queue_url: queue_name, body: 'test', - message_id: 'fc754df7-9cc2-4c41-96ca-5996a44b771e' + message_id: 'fc754df79cc24c4196ca5996a44b771e' end - subject { described_class.new(manager) } + subject { described_class.new(manager, polling_strategy) } before do allow(manager).to receive(:async).and_return(manager) allow(Shoryuken::Client).to receive(:queues).with(queue_name).and_return(queue) end - describe '#fetch' do it 'calls pause when no message' do - allow(queue).to receive(:receive_messages).with(max_number_of_messages: 1, attribute_names: ['All'], message_attribute_names: ['All']).and_return([]) + allow(queue).to receive(:receive_messages) + .with(max_number_of_messages: 1, attribute_names: ['All'], message_attribute_names: ['All']) + .and_return([]) - expect(manager).to receive(:queue_empty).with(queue_config) + expect(polling_strategy).to receive(:messages_found).with(queue_config, 0) expect(manager).to receive(:dispatch) subject.fetch(queue_config, 1) end it 'assigns messages' do - allow(queue).to receive(:receive_messages).with(max_number_of_messages: 5, attribute_names: ['All'], message_attribute_names: ['All']).and_return(sqs_msg) + allow(queue).to receive(:receive_messages) + .with(max_number_of_messages: 5, attribute_names: ['All'], message_attribute_names: ['All']) + .and_return(sqs_msg) - expect(manager).to receive(:messages_present).with(queue_config) + expect(polling_strategy).to receive(:messages_found).with(queue_config, 1) expect(manager).to receive(:assign).with(queue_name, sqs_msg) expect(manager).to receive(:dispatch) @@ -46,9 +52,11 @@ it 'assigns messages in batch' do TestWorker.get_shoryuken_options['batch'] = true - allow(queue).to receive(:receive_messages).with(max_number_of_messages: described_class::FETCH_LIMIT, attribute_names: ['All'], message_attribute_names: ['All']).and_return(sqs_msg) + allow(queue).to receive(:receive_messages) + .with(max_number_of_messages: described_class::FETCH_LIMIT, attribute_names: ['All'], message_attribute_names: ['All']) + .and_return(sqs_msg) - expect(manager).to receive(:messages_present).with(queue_config) + expect(polling_strategy).to receive(:messages_found).with(queue_config, 1) expect(manager).to receive(:assign).with(queue_name, [sqs_msg]) expect(manager).to receive(:dispatch) @@ -59,9 +67,11 @@ let(:queue_name) { 'notfound' } it 'ignores batch' do - allow(queue).to receive(:receive_messages).with(max_number_of_messages: 5, attribute_names: ['All'], message_attribute_names: ['All']).and_return(sqs_msg) + allow(queue).to receive(:receive_messages) + .with(max_number_of_messages: 5, attribute_names: ['All'], message_attribute_names: ['All']) + .and_return(sqs_msg) - expect(manager).to receive(:messages_present).with(queue_config) + expect(polling_strategy).to receive(:messages_found).with(queue_config, 1) expect(manager).to receive(:assign).with(queue_name, sqs_msg) expect(manager).to receive(:dispatch) diff --git a/spec/shoryuken/manager_spec.rb b/spec/shoryuken/manager_spec.rb index 42d136b0..32b1c37a 100644 --- a/spec/shoryuken/manager_spec.rb +++ b/spec/shoryuken/manager_spec.rb @@ -2,10 +2,22 @@ require 'shoryuken/manager' RSpec.describe Shoryuken::Manager do - subject do + let(:queue1) { 'shoryuken' } + let(:queue2) { 'uppercut'} + let(:queues) { [] } + let(:polling_strategy) { Shoryuken::Polling::WeightedRoundRobin.new(queues) } + let(:fetcher) { Shoryuken::Fetcher.new(manager, polling_strategy) } + let(:condvar) do condvar = double(:condvar) allow(condvar).to receive(:signal).and_return(nil) - Shoryuken::Manager.new(condvar) + condvar + end + let(:manager) { Shoryuken::Manager.new(condvar) } + + subject { manager } + + before(:each) do + manager.fetcher = fetcher end describe 'Invalid concurrency setting' do @@ -14,109 +26,5 @@ expect { Shoryuken::Manager.new(nil) } .to raise_error(ArgumentError, 'Concurrency value -1 is invalid, it needs to be a positive number') end - - end - - describe 'Auto Scaling' do - it 'decreases weight' do - queue1 = 'shoryuken' - queue2 = 'uppercut' - - Shoryuken.queues.clear - # [shoryuken, 2] - # [uppercut, 1] - Shoryuken.queues << queue1 - Shoryuken.queues << queue1 - Shoryuken.queues << queue2 - - expect(subject.instance_variable_get('@polling_strategy')).to eq [queue1, queue2] - - subject.queue_empty(queue1) - - expect(subject.instance_variable_get('@polling_strategy')).to eq [queue2] - end - - it 'increases weight' do - queue1 = 'shoryuken' - queue2 = 'uppercut' - - Shoryuken.queues.clear - # [shoryuken, 3] - # [uppercut, 1] - Shoryuken.queues << queue1 - Shoryuken.queues << queue1 - Shoryuken.queues << queue1 - Shoryuken.queues << queue2 - - expect(subject.instance_variable_get('@polling_strategy')).to eq [queue1, queue2] - subject.queue_empty(queue1) - expect(subject.instance_variable_get('@polling_strategy')).to eq [queue2] - - subject.messages_present(queue1) - expect(subject.instance_variable_get('@polling_strategy')).to eq [queue2, queue1] - - subject.messages_present(queue1) - expect(subject.instance_variable_get('@polling_strategy')).to eq [queue2, queue1, queue1] - - subject.messages_present(queue1) - expect(subject.instance_variable_get('@polling_strategy')).to eq [queue2, queue1, queue1, queue1] - end - - it 'adds queue back' do - queue1 = 'shoryuken' - queue2 = 'uppercut' - - Shoryuken.queues.clear - # [shoryuken, 2] - # [uppercut, 1] - Shoryuken.queues << queue1 - Shoryuken.queues << queue1 - Shoryuken.queues << queue2 - - Shoryuken.options[:delay] = 0.1 - - fetcher = double('Fetcher').as_null_object - subject.fetcher = fetcher - - subject.queue_empty(queue1) - expect(subject.instance_variable_get('@polling_strategy')).to eq [queue2] - - sleep 0.5 - - expect(subject.instance_variable_get('@polling_strategy')).to eq [queue2, queue1] - end - end - - describe '#next_queue' do - it 'returns queues' do - queue1 = 'shoryuken' - queue2 = 'uppercut' - - Shoryuken.queues.clear - - Shoryuken.register_worker queue1, TestWorker - Shoryuken.register_worker queue2, TestWorker - - Shoryuken.queues << queue1 - Shoryuken.queues << queue2 - - expect(subject.send :next_queue).to eq queue1 - expect(subject.send :next_queue).to eq queue2 - end - - it 'skips when no worker' do - queue1 = 'shoryuken' - queue2 = 'uppercut' - - Shoryuken.queues.clear - - Shoryuken.register_worker queue2, TestWorker - - Shoryuken.queues << queue1 - Shoryuken.queues << queue2 - - expect(subject.send :next_queue).to eq queue2 - expect(subject.send :next_queue).to eq queue2 - end end end diff --git a/spec/shoryuken/polling_spec.rb b/spec/shoryuken/polling_spec.rb index 47f33bb6..5a50b3bb 100644 --- a/spec/shoryuken/polling_spec.rb +++ b/spec/shoryuken/polling_spec.rb @@ -1,63 +1,100 @@ require 'spec_helper' require 'shoryuken/polling' -describe Shoryuken::Polling do +describe Shoryuken::Polling::WeightedRoundRobin do let(:queue1) { 'shoryuken' } let(:queue2) { 'uppercut' } let(:queues) { Array.new } + subject { Shoryuken::Polling::WeightedRoundRobin.new(queues) } - describe Shoryuken::Polling::WeightedRoundRobin do - subject { Shoryuken::Polling::WeightedRoundRobin.new(queues) } - - it 'decreases weight' do + describe '#next_queue' do + it 'cycles' do # [shoryuken, 2] # [uppercut, 1] queues << queue1 queues << queue1 queues << queue2 - expect(subject).to eq [queue1, queue2] - - subject.pause(queue1) + expect(subject.next_queue).to eq(queue1) + expect(subject.next_queue).to eq(queue2) + expect(subject.next_queue).to eq(queue1) + end - expect(subject).to eq [queue2] + it 'returns nil if there are no active queues' do + expect(subject.next_queue).to eq(nil) end - it 'increases weight' do - # [shoryuken, 3] + it 'unpauses queues whose pause is expired' do + # [shoryuken, 2] # [uppercut, 1] queues << queue1 queues << queue1 - queues << queue1 queues << queue2 - expect(subject).to eq [queue1, queue2] - subject.pause(queue1) - expect(subject).to eq [queue2] + allow(subject).to receive(:delay).and_return(10) + + now = Time.now + allow(Time).to receive(:now).and_return(now) - subject.messages_present(queue1) - expect(subject).to eq [queue2, queue1] + # pause the first queue + subject.messages_found(queue1, 0) + expect(subject.next_queue).to eq(queue2) - subject.messages_present(queue1) - expect(subject).to eq [queue2, queue1, queue1] + now += 5 + allow(Time).to receive(:now).and_return(now) - subject.messages_present(queue1) - expect(subject).to eq [queue2, queue1, queue1, queue1] + # pause the second queue + subject.messages_found(queue2, 0) + expect(subject.next_queue).to eq(nil) + + # queue1 should be unpaused now + now += 6 + allow(Time).to receive(:now).and_return(now) + expect(subject.next_queue).to eq(queue1) + + # queue1 should be unpaused and added to the end of queues now + now += 6 + allow(Time).to receive(:now).and_return(now) + expect(subject.next_queue).to eq(queue1) + expect(subject.next_queue).to eq(queue2) end + end - it 'cycles' do - # [shoryuken, 1] + describe '#messages_found' do + it 'pauses a queue if there are no messages found' do + # [shoryuken, 2] # [uppercut, 1] queues << queue1 + queues << queue1 queues << queue2 - popped = [] + expect(subject).to receive(:pause).with(queue1).and_call_original + subject.messages_found(queue1, 0) + expect(subject.instance_variable_get(:@queues)).to eq([queue2]) + end - (queues.size * 3).times do - popped << subject.next_queue - end + it 'increased the weight if message is found' do + # [shoryuken, 2] + # [uppercut, 1] + queues << queue1 + queues << queue1 + queues << queue2 + + expect(subject.instance_variable_get(:@queues)).to eq([queue1, queue2]) + subject.messages_found(queue1, 1) + expect(subject.instance_variable_get(:@queues)).to eq([queue1, queue2, queue1]) + end + + it 'respects the maximum queue weight' do + # [shoryuken, 2] + # [uppercut, 1] + queues << queue1 + queues << queue1 + queues << queue2 - expect(popped).to eq(queues * 3) + subject.messages_found(queue1, 1) + subject.messages_found(queue1, 1) + expect(subject.instance_variable_get(:@queues)).to eq([queue1, queue2, queue1]) end end end