Skip to content

Commit

Permalink
Merge branch 'master' into thread-pool
Browse files Browse the repository at this point in the history
  • Loading branch information
Pablo Cantero committed Feb 13, 2017
2 parents fed9d81 + 087b175 commit c36d58f
Show file tree
Hide file tree
Showing 13 changed files with 310 additions and 114 deletions.
29 changes: 29 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,32 @@
## [v2.1.3] - 2017-01-27
- Show a warn message when batch isn't supported
- [#302](https://github.com/phstc/shoryuken/pull/302)

- Require Celluloid ~> 17
- [#305](https://github.com/phstc/shoryuken/pull/305)

- Fix excessive logging when 0 messages found
- [#307](https://github.com/phstc/shoryuken/pull/307)

## [v2.1.2] - 2016-12-22
- Fix loading `logfile` from shoryuken.yml
- [#296](https://github.com/phstc/shoryuken/pull/296)

- Add support for Strict priority polling (pending documentation)
- [#288](https://github.com/phstc/shoryuken/pull/288)

- Add `test_workers` for end-to-end testing supporting
- [#286](https://github.com/phstc/shoryuken/pull/286)

- Update README documenting `configure_client` and `configure_server`
- [#283](https://github.com/phstc/shoryuken/pull/283)

- Fix memory leak caused by async tracking busy threads
- [#289](https://github.com/phstc/shoryuken/pull/289)

- Refactor fetcher, polling strategy and manager
- [#284](https://github.com/phstc/shoryuken/pull/284)

## [v2.1.1] - 2016-12-05
- Fix aws deprecation warning message
- [#279](https://github.com/phstc/shoryuken/pull/279)
Expand Down
76 changes: 0 additions & 76 deletions Gemfile.lock

This file was deleted.

4 changes: 2 additions & 2 deletions lib/shoryuken/environment_loader.rb
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ def initialize_aws
end

def initialize_logger
Shoryuken::Logging.initialize_logger(options[:logfile]) if options[:logfile]
Shoryuken.logger.level = Logger::DEBUG if options[:verbose]
Shoryuken::Logging.initialize_logger(Shoryuken.options[:logfile]) if Shoryuken.options[:logfile]
Shoryuken.logger.level = Logger::DEBUG if Shoryuken.options[:verbose]
end

def load_rails
Expand Down
2 changes: 1 addition & 1 deletion lib/shoryuken/fetcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ def fetch(queue, available_processors)
limit = available_processors > FETCH_LIMIT ? FETCH_LIMIT : available_processors

sqs_msgs = Array(receive_messages(queue, limit))
logger.info { "Found #{sqs_msgs.size} messages for '#{queue.name}'" } if sqs_msgs.size > 0
logger.info { "Found #{sqs_msgs.size} messages for '#{queue.name}'" } unless sqs_msgs.empty?
logger.debug { "Fetcher for '#{queue}' completed in #{elapsed(started_at)} ms" }
sqs_msgs
rescue => ex
Expand Down
4 changes: 1 addition & 3 deletions lib/shoryuken/middleware/server/auto_extend_visibility.rb
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,7 @@ def auto_extend(worker, queue, sqs_msg, body)
def auto_visibility_timer(worker, queue, sqs_msg, body)
return unless worker.class.auto_visibility_timeout?

timer = MessageVisibilityExtender.new.auto_extend(worker, queue, sqs_msg, body)
timer.execute
timer
MessageVisibilityExtender.new.auto_extend(worker, queue, sqs_msg, body).tap(&:execute)
end
end
end
Expand Down
5 changes: 5 additions & 0 deletions lib/shoryuken/middleware/server/exponential_backoff_retry.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ class ExponentialBackoffRetry
include Util

def call(worker, queue, sqs_msg, body)
if sqs_msg.is_a?(Array)
logger.warn { "Exponential backoff isn't supported for batch workers" }
return yield
end

started_at = Time.now
yield
rescue
Expand Down
136 changes: 118 additions & 18 deletions lib/shoryuken/polling.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,53 @@ def ==(other)
end

alias_method :eql?, :==

def to_s
if options.empty?
name
else
"#<QueueConfiguration #{name} options=#{options.inspect}>"
end
end
end

class WeightedRoundRobin
class BaseStrategy
include Util

def next_queue
fail NotImplementedError
end

def messages_found(queue, messages_found)
fail NotImplementedError
end

def active_queues
fail NotImplementedError
end

def ==(other)
case other
when Array
@queues == other
else
if other.respond_to?(:active_queues)
active_queues == other.active_queues
else
false
end
end
end

private

def delay
Shoryuken.options[:delay].to_f
end
end

class WeightedRoundRobin < BaseStrategy

def initialize(queues)
@initial_queues = queues
@queues = queues.dup.uniq
Expand Down Expand Up @@ -57,25 +99,8 @@ def active_queues
unparse_queues(@queues)
end

def ==(other)
case other
when Array
@queues == other
else
if other.respond_to?(:active_queues)
active_queues == other.active_queues
else
false
end
end
end

private

def delay
Shoryuken.options[:delay].to_f
end

def pause(queue)
return unless @queues.delete(queue)
@paused_queues << [Time.now + delay, queue]
Expand All @@ -102,5 +127,80 @@ def queue_weight(queues, queue)
queues.count { |q| q == queue }
end
end

class StrictPriority < BaseStrategy

def initialize(queues)
# Priority ordering of the queues, highest priority first
@queues = queues
.group_by { |q| q }
.sort_by { |_, qs| -qs.count }
.map(&:first)

# Pause status of the queues, default to past time (unpaused)
@paused_until = queues
.each_with_object(Hash.new) { |queue, h| h[queue] = Time.at(0) }

# Start queues at 0
reset_next_queue
end

def next_queue
next_queue = next_active_queue
next_queue.nil? ? nil : QueueConfiguration.new(next_queue, {})
end

def messages_found(queue, messages_found)
if messages_found == 0
pause(queue)
else
reset_next_queue
end
end

def active_queues
@queues
.reverse
.map.with_index(1)
.reject { |q, _| queue_paused?(q) }
.reverse
end

private

def next_active_queue
reset_next_queue if queues_unpaused_since?

size = @queues.length
size.times do
queue = @queues[@next_queue_index]
@next_queue_index = (@next_queue_index + 1) % size
return queue unless queue_paused?(queue)
end

return nil
end

def queues_unpaused_since?
last = @last_unpause_check
now = @last_unpause_check = Time.now

last && @paused_until.values.any? { |t| t > last && t <= now }
end

def reset_next_queue
@next_queue_index = 0
end

def queue_paused?(queue)
@paused_until[queue] > Time.now
end

def pause(queue)
return unless delay > 0
@paused_until[queue] = Time.now + delay
logger.debug "Paused '#{queue}'"
end
end
end
end
2 changes: 1 addition & 1 deletion lib/shoryuken/version.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
module Shoryuken
VERSION = '2.1.1'
VERSION = '2.1.3'
end
1 change: 0 additions & 1 deletion shoryuken.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ Gem::Specification.new do |spec|
spec.add_development_dependency 'rake'
spec.add_development_dependency 'rspec'
spec.add_development_dependency 'pry-byebug'
spec.add_development_dependency 'nokogiri'
spec.add_development_dependency 'dotenv'

spec.add_dependency 'aws-sdk-core', '~> 2'
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
require 'spec_helper'

describe Shoryuken::Middleware::Server::AutoExtendVisibility do
RSpec.describe Shoryuken::Middleware::Server::AutoExtendVisibility do
let(:queue) { 'default' }
let(:visibility_timeout) { 3 }
let(:extend_upfront) { 1 }
Expand Down Expand Up @@ -32,6 +32,12 @@ def run_and_raise(worker, queue, sqs_msg, error_class)
stub_const('Shoryuken::Middleware::Server::AutoExtendVisibility::EXTEND_UPFRONT_SECONDS', extend_upfront)
end

context 'when batch worker' do
it 'yields' do
expect { |b| subject.call(nil, nil, [], nil, &b) }.to yield_control
end
end

it 'extends message visibility if jobs takes a long time' do
TestWorker.get_shoryuken_options['auto_visibility_timeout'] = true

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
require 'spec_helper'

describe Shoryuken::Middleware::Server::ExponentialBackoffRetry do
RSpec.describe Shoryuken::Middleware::Server::ExponentialBackoffRetry do
let(:queue) { 'default' }
let(:sqs_queue) { double Shoryuken::Queue }
let(:sqs_msg) { double Shoryuken::Message, queue_url: queue, body: 'test', receipt_handle: SecureRandom.uuid,
Expand All @@ -10,6 +10,12 @@
allow(Shoryuken::Client).to receive(:queues).with(queue).and_return(sqs_queue)
end

context 'when batch worker' do
it 'yields' do
expect { |b| subject.call(nil, nil, [], nil, &b) }.to yield_control
end
end

context 'when a job succeeds' do
it 'does not retry the job' do
TestWorker.get_shoryuken_options['retry_intervals'] = [300, 1800]
Expand Down
Loading

0 comments on commit c36d58f

Please sign in to comment.