diff --git a/CHANGELOG.md b/CHANGELOG.md index 4c447c3a..6514a0ca 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,32 @@ +## [v2.1.3] - 2017-01-27 +- Show a warn message when batch isn't supported + - [#302](https://github.com/phstc/shoryuken/pull/302) + +- Require Celluloid ~> 17 + - [#305](https://github.com/phstc/shoryuken/pull/305) + +- Fix excessive logging when 0 messages found + - [#307](https://github.com/phstc/shoryuken/pull/307) + +## [v2.1.2] - 2016-12-22 +- Fix loading `logfile` from shoryuken.yml + - [#296](https://github.com/phstc/shoryuken/pull/296) + +- Add support for Strict priority polling (pending documentation) + - [#288](https://github.com/phstc/shoryuken/pull/288) + +- Add `test_workers` for end-to-end testing supporting + - [#286](https://github.com/phstc/shoryuken/pull/286) + +- Update README documenting `configure_client` and `configure_server` + - [#283](https://github.com/phstc/shoryuken/pull/283) + +- Fix memory leak caused by async tracking busy threads + - [#289](https://github.com/phstc/shoryuken/pull/289) + +- Refactor fetcher, polling strategy and manager + - [#284](https://github.com/phstc/shoryuken/pull/284) + ## [v2.1.1] - 2016-12-05 - Fix aws deprecation warning message - [#279](https://github.com/phstc/shoryuken/pull/279) diff --git a/Gemfile.lock b/Gemfile.lock deleted file mode 100644 index 26bb3978..00000000 --- a/Gemfile.lock +++ /dev/null @@ -1,76 +0,0 @@ -PATH - remote: . - specs: - shoryuken (2.1.1) - aws-sdk-core (~> 2) - concurrent-ruby - -GEM - remote: https://rubygems.org/ - specs: - aws-sdk-core (2.6.32) - aws-sigv4 (~> 1.0) - jmespath (~> 1.0) - aws-sigv4 (1.0.0) - byebug (9.0.5) - codeclimate-test-reporter (1.0.3) - simplecov - coderay (1.1.1) - concurrent-ruby (1.0.2) - diff-lcs (1.2.5) - docile (1.1.5) - dotenv (2.1.1) - jmespath (1.3.1) - json (2.0.2) - method_source (0.8.2) - mini_portile2 (2.1.0) - multi_xml (0.5.5) - nokogiri (1.6.8) - mini_portile2 (~> 2.1.0) - pkg-config (~> 1.1.7) - pkg-config (1.1.7) - pry (0.10.4) - coderay (~> 1.1.0) - method_source (~> 0.8.1) - slop (~> 3.4) - pry-byebug (3.4.0) - byebug (~> 9.0) - pry (~> 0.10) - rake (11.2.2) - rspec (3.5.0) - rspec-core (~> 3.5.0) - rspec-expectations (~> 3.5.0) - rspec-mocks (~> 3.5.0) - rspec-core (3.5.2) - rspec-support (~> 3.5.0) - rspec-expectations (3.5.0) - diff-lcs (>= 1.2.0, < 2.0) - rspec-support (~> 3.5.0) - rspec-mocks (3.5.0) - diff-lcs (>= 1.2.0, < 2.0) - rspec-support (~> 3.5.0) - rspec-support (3.5.0) - simplecov (0.12.0) - docile (~> 1.1.0) - json (>= 1.8, < 3) - simplecov-html (~> 0.10.0) - simplecov-html (0.10.0) - slop (3.6.0) - -PLATFORMS - ruby - -DEPENDENCIES - bundler (~> 1.6) - codeclimate-test-reporter - dotenv - multi_xml - nokogiri - pry-byebug - rake - rspec - shoryuken! - simplecov - -BUNDLED WITH - 1.13.6 diff --git a/lib/shoryuken/environment_loader.rb b/lib/shoryuken/environment_loader.rb index ab5f6a63..4b705937 100644 --- a/lib/shoryuken/environment_loader.rb +++ b/lib/shoryuken/environment_loader.rb @@ -60,8 +60,8 @@ def initialize_aws end def initialize_logger - Shoryuken::Logging.initialize_logger(options[:logfile]) if options[:logfile] - Shoryuken.logger.level = Logger::DEBUG if options[:verbose] + Shoryuken::Logging.initialize_logger(Shoryuken.options[:logfile]) if Shoryuken.options[:logfile] + Shoryuken.logger.level = Logger::DEBUG if Shoryuken.options[:verbose] end def load_rails diff --git a/lib/shoryuken/fetcher.rb b/lib/shoryuken/fetcher.rb index f04cdca4..e4568154 100644 --- a/lib/shoryuken/fetcher.rb +++ b/lib/shoryuken/fetcher.rb @@ -13,7 +13,7 @@ def fetch(queue, available_processors) limit = available_processors > FETCH_LIMIT ? FETCH_LIMIT : available_processors sqs_msgs = Array(receive_messages(queue, limit)) - logger.info { "Found #{sqs_msgs.size} messages for '#{queue.name}'" } if sqs_msgs.size > 0 + logger.info { "Found #{sqs_msgs.size} messages for '#{queue.name}'" } unless sqs_msgs.empty? logger.debug { "Fetcher for '#{queue}' completed in #{elapsed(started_at)} ms" } sqs_msgs rescue => ex diff --git a/lib/shoryuken/middleware/server/auto_extend_visibility.rb b/lib/shoryuken/middleware/server/auto_extend_visibility.rb index e2fe97ea..5597ece6 100644 --- a/lib/shoryuken/middleware/server/auto_extend_visibility.rb +++ b/lib/shoryuken/middleware/server/auto_extend_visibility.rb @@ -41,9 +41,7 @@ def auto_extend(worker, queue, sqs_msg, body) def auto_visibility_timer(worker, queue, sqs_msg, body) return unless worker.class.auto_visibility_timeout? - timer = MessageVisibilityExtender.new.auto_extend(worker, queue, sqs_msg, body) - timer.execute - timer + MessageVisibilityExtender.new.auto_extend(worker, queue, sqs_msg, body).tap(&:execute) end end end diff --git a/lib/shoryuken/middleware/server/exponential_backoff_retry.rb b/lib/shoryuken/middleware/server/exponential_backoff_retry.rb index 6254f108..dc828be8 100755 --- a/lib/shoryuken/middleware/server/exponential_backoff_retry.rb +++ b/lib/shoryuken/middleware/server/exponential_backoff_retry.rb @@ -5,6 +5,11 @@ class ExponentialBackoffRetry include Util def call(worker, queue, sqs_msg, body) + if sqs_msg.is_a?(Array) + logger.warn { "Exponential backoff isn't supported for batch workers" } + return yield + end + started_at = Time.now yield rescue diff --git a/lib/shoryuken/polling.rb b/lib/shoryuken/polling.rb index b6fc7cde..885c90b1 100644 --- a/lib/shoryuken/polling.rb +++ b/lib/shoryuken/polling.rb @@ -19,11 +19,53 @@ def ==(other) end alias_method :eql?, :== + + def to_s + if options.empty? + name + else + "#" + end + end end - class WeightedRoundRobin + class BaseStrategy include Util + def next_queue + fail NotImplementedError + end + + def messages_found(queue, messages_found) + fail NotImplementedError + end + + def active_queues + fail NotImplementedError + end + + def ==(other) + case other + when Array + @queues == other + else + if other.respond_to?(:active_queues) + active_queues == other.active_queues + else + false + end + end + end + + private + + def delay + Shoryuken.options[:delay].to_f + end + end + + class WeightedRoundRobin < BaseStrategy + def initialize(queues) @initial_queues = queues @queues = queues.dup.uniq @@ -57,25 +99,8 @@ def active_queues unparse_queues(@queues) end - def ==(other) - case other - when Array - @queues == other - else - if other.respond_to?(:active_queues) - active_queues == other.active_queues - else - false - end - end - end - private - def delay - Shoryuken.options[:delay].to_f - end - def pause(queue) return unless @queues.delete(queue) @paused_queues << [Time.now + delay, queue] @@ -102,5 +127,80 @@ def queue_weight(queues, queue) queues.count { |q| q == queue } end end + + class StrictPriority < BaseStrategy + + def initialize(queues) + # Priority ordering of the queues, highest priority first + @queues = queues + .group_by { |q| q } + .sort_by { |_, qs| -qs.count } + .map(&:first) + + # Pause status of the queues, default to past time (unpaused) + @paused_until = queues + .each_with_object(Hash.new) { |queue, h| h[queue] = Time.at(0) } + + # Start queues at 0 + reset_next_queue + end + + def next_queue + next_queue = next_active_queue + next_queue.nil? ? nil : QueueConfiguration.new(next_queue, {}) + end + + def messages_found(queue, messages_found) + if messages_found == 0 + pause(queue) + else + reset_next_queue + end + end + + def active_queues + @queues + .reverse + .map.with_index(1) + .reject { |q, _| queue_paused?(q) } + .reverse + end + + private + + def next_active_queue + reset_next_queue if queues_unpaused_since? + + size = @queues.length + size.times do + queue = @queues[@next_queue_index] + @next_queue_index = (@next_queue_index + 1) % size + return queue unless queue_paused?(queue) + end + + return nil + end + + def queues_unpaused_since? + last = @last_unpause_check + now = @last_unpause_check = Time.now + + last && @paused_until.values.any? { |t| t > last && t <= now } + end + + def reset_next_queue + @next_queue_index = 0 + end + + def queue_paused?(queue) + @paused_until[queue] > Time.now + end + + def pause(queue) + return unless delay > 0 + @paused_until[queue] = Time.now + delay + logger.debug "Paused '#{queue}'" + end + end end end diff --git a/lib/shoryuken/version.rb b/lib/shoryuken/version.rb index 9adbe10f..e9bd2fd1 100644 --- a/lib/shoryuken/version.rb +++ b/lib/shoryuken/version.rb @@ -1,3 +1,3 @@ module Shoryuken - VERSION = '2.1.1' + VERSION = '2.1.3' end diff --git a/shoryuken.gemspec b/shoryuken.gemspec index 6c9e611b..39fd20db 100644 --- a/shoryuken.gemspec +++ b/shoryuken.gemspec @@ -21,7 +21,6 @@ Gem::Specification.new do |spec| spec.add_development_dependency 'rake' spec.add_development_dependency 'rspec' spec.add_development_dependency 'pry-byebug' - spec.add_development_dependency 'nokogiri' spec.add_development_dependency 'dotenv' spec.add_dependency 'aws-sdk-core', '~> 2' diff --git a/spec/shoryuken/middleware/server/auto_extend_visibility_spec.rb b/spec/shoryuken/middleware/server/auto_extend_visibility_spec.rb index f664444c..fc521bc6 100644 --- a/spec/shoryuken/middleware/server/auto_extend_visibility_spec.rb +++ b/spec/shoryuken/middleware/server/auto_extend_visibility_spec.rb @@ -1,6 +1,6 @@ require 'spec_helper' -describe Shoryuken::Middleware::Server::AutoExtendVisibility do +RSpec.describe Shoryuken::Middleware::Server::AutoExtendVisibility do let(:queue) { 'default' } let(:visibility_timeout) { 3 } let(:extend_upfront) { 1 } @@ -32,6 +32,12 @@ def run_and_raise(worker, queue, sqs_msg, error_class) stub_const('Shoryuken::Middleware::Server::AutoExtendVisibility::EXTEND_UPFRONT_SECONDS', extend_upfront) end + context 'when batch worker' do + it 'yields' do + expect { |b| subject.call(nil, nil, [], nil, &b) }.to yield_control + end + end + it 'extends message visibility if jobs takes a long time' do TestWorker.get_shoryuken_options['auto_visibility_timeout'] = true diff --git a/spec/shoryuken/middleware/server/exponential_backoff_retry_spec.rb b/spec/shoryuken/middleware/server/exponential_backoff_retry_spec.rb index c7abc3d0..3dd9caf6 100755 --- a/spec/shoryuken/middleware/server/exponential_backoff_retry_spec.rb +++ b/spec/shoryuken/middleware/server/exponential_backoff_retry_spec.rb @@ -1,6 +1,6 @@ require 'spec_helper' -describe Shoryuken::Middleware::Server::ExponentialBackoffRetry do +RSpec.describe Shoryuken::Middleware::Server::ExponentialBackoffRetry do let(:queue) { 'default' } let(:sqs_queue) { double Shoryuken::Queue } let(:sqs_msg) { double Shoryuken::Message, queue_url: queue, body: 'test', receipt_handle: SecureRandom.uuid, @@ -10,6 +10,12 @@ allow(Shoryuken::Client).to receive(:queues).with(queue).and_return(sqs_queue) end + context 'when batch worker' do + it 'yields' do + expect { |b| subject.call(nil, nil, [], nil, &b) }.to yield_control + end + end + context 'when a job succeeds' do it 'does not retry the job' do TestWorker.get_shoryuken_options['retry_intervals'] = [300, 1800] diff --git a/spec/shoryuken/polling_spec.rb b/spec/shoryuken/polling_spec.rb index 5a50b3bb..6293037a 100644 --- a/spec/shoryuken/polling_spec.rb +++ b/spec/shoryuken/polling_spec.rb @@ -98,3 +98,142 @@ end end end + +describe Shoryuken::Polling::StrictPriority do + let(:queue1) { 'shoryuken' } + let(:queue2) { 'uppercut' } + let(:queue3) { 'other' } + let(:queues) { Array.new } + subject { Shoryuken::Polling::StrictPriority.new(queues) } + + describe '#next_queue' do + it 'cycles when declared desc' do + # [shoryuken, 2] + # [uppercut, 1] + queues << queue1 + queues << queue1 + queues << queue2 + + expect(subject.next_queue).to eq(queue1) + expect(subject.next_queue).to eq(queue2) + expect(subject.next_queue).to eq(queue1) + expect(subject.next_queue).to eq(queue2) + end + + it 'cycles when declared asc' do + # [uppercut, 1] + # [shoryuken, 2] + queues << queue2 + queues << queue1 + queues << queue1 + + expect(subject.next_queue).to eq(queue1) + expect(subject.next_queue).to eq(queue2) + expect(subject.next_queue).to eq(queue1) + expect(subject.next_queue).to eq(queue2) + end + + it 'returns nil if there are no active queues' do + expect(subject.next_queue).to eq(nil) + end + + it 'unpauses queues whose pause is expired' do + # [shoryuken, 3] + # [uppercut, 2] + # [other, 1] + queues << queue1 + queues << queue1 + queues << queue1 + queues << queue2 + queues << queue2 + queues << queue3 + + allow(subject).to receive(:delay).and_return(10) + + now = Time.now + allow(Time).to receive(:now).and_return(now) + + # pause the second queue, see it loop between 1 and 3 + subject.messages_found(queue2, 0) + expect(subject.next_queue).to eq(queue1) + expect(subject.next_queue).to eq(queue3) + expect(subject.next_queue).to eq(queue1) + + now += 5 + allow(Time).to receive(:now).and_return(now) + + # pause the first queue, see it repeat 3 + subject.messages_found(queue1, 0) + expect(subject.next_queue).to eq(queue3) + expect(subject.next_queue).to eq(queue3) + + # pause the third queue, see it have nothing + subject.messages_found(queue3, 0) + expect(subject.next_queue).to eq(nil) + + # unpause queue 2 + now += 6 + allow(Time).to receive(:now).and_return(now) + expect(subject.next_queue).to eq(queue2) + + # unpause queues 1 and 3 + now += 6 + allow(Time).to receive(:now).and_return(now) + expect(subject.next_queue).to eq(queue1) + expect(subject.next_queue).to eq(queue2) + expect(subject.next_queue).to eq(queue3) + end + end + + describe '#messages_found' do + it 'pauses a queue if there are no messages found' do + # [shoryuken, 2] + # [uppercut, 1] + queues << queue1 + queues << queue1 + queues << queue2 + + expect(subject.active_queues).to eq([[queue1, 2], [queue2, 1]]) + expect(subject).to receive(:pause).with(queue1).and_call_original + subject.messages_found(queue1, 0) + expect(subject.active_queues).to eq([[queue2, 1]]) + end + + it 'continues to queue the highest priority queue if messages are found' do + # [shoryuken, 3] + # [uppercut, 2] + # [other, 1] + queues << queue1 + queues << queue1 + queues << queue1 + queues << queue2 + queues << queue2 + queues << queue3 + + expect(subject.next_queue).to eq(queue1) + subject.messages_found(queue1, 1) + expect(subject.next_queue).to eq(queue1) + subject.messages_found(queue1, 1) + expect(subject.next_queue).to eq(queue1) + end + + it 'resets the priorities if messages are found part way' do + # [shoryuken, 3] + # [uppercut, 2] + # [other, 1] + queues << queue1 + queues << queue1 + queues << queue1 + queues << queue2 + queues << queue2 + queues << queue3 + + expect(subject.next_queue).to eq(queue1) + expect(subject.next_queue).to eq(queue2) + subject.messages_found(queue2, 1) + expect(subject.next_queue).to eq(queue1) + expect(subject.next_queue).to eq(queue2) + expect(subject.next_queue).to eq(queue3) + end + end +end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index d939b567..c58c4f44 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -4,7 +4,6 @@ require 'pry-byebug' require 'shoryuken' require 'json' -require 'multi_xml' require 'dotenv' Dotenv.load @@ -19,15 +18,6 @@ Shoryuken.logger.level = Logger::UNKNOWN -# I'm not sure whether this is an issue specific to running Shoryuken against github.com/comcast/cmb -# as opposed to AWS itself, but sometimes the receive_messages call returns XML that looks like this: -# -# \n\t\n\t ... -# -# The default MultiXML parser is ReXML, which seems to mishandle \n\t chars. Nokogiri seems to be -# the only one that correctly ignore this whitespace. -MultiXml.parser = :nokogiri - class TestWorker include Shoryuken::Worker