diff --git a/README.md b/README.md index 86fcbc0d..d20c2a3c 100644 --- a/README.md +++ b/README.md @@ -105,7 +105,7 @@ end [Check the Middleware documentation](https://github.com/phstc/shoryuken/wiki/Middleware). -### Configuration (worker side) +### Shoryuken Configuration Sample configuration file `shoryuken.yml`. @@ -118,82 +118,15 @@ queues: - [low_priority, 1] ``` -And setup ```aws``` options to use ```configure_client``` in `config/initializers/shoryuken.rb`: +#### AWS Configuration -```ruby -Shoryuken.configure_client do |config| - config.aws = { - secret_access_key: ..., # or ENV["AWS_SECRET_ACCESS_KEY"] - access_key_id: ..., # or ENV["AWS_ACCESS_KEY_ID"] - region: "us-east-1", # or ENV["AWS_REGION"] - receive_message: { # See http://docs.aws.amazon.com/sdkforruby/api/Aws/SQS/Client.html#receive_message-instance_method - # wait_time_seconds: N, # The number of seconds to wait for new messages when polling. Defaults to the #wait_time_seconds defined on the queue - attribute_names: [ - "ApproximateReceiveCount", - "SentTimestamp" - ] - } - } -end -``` - -If you use Shoryuken with plain ruby worker class (not Rails), please call `configure_client` at the beginning of the worker file: - -```ruby -Shoryuken.configure_client do |config| - config.aws = { - secret_access_key: ENV["AWS_SECRET_ACCESS_KEY"], - access_key_id: ENV["AWS_ACCESS_KEY_ID"], - region: ENV["AWS_REGION"] - } -end - -class MyWorker -end -``` - -The `aws` section is used to configure both the Aws objects used by Shoryuken internally, and also to set up some Shoryuken-specific config. The Shoryuken-specific keys are listed below, and you can expect any other key defined in that block to be passed on untouched to `Aws::SQS::Client#initialize`: - -- `account_id` is used when generating SNS ARNs -- `sns_endpoint` can be used to explicitly override the SNS endpoint -- `sqs_endpoint` can be used to explicitly override the SQS endpoint -- `receive_message` can be used to define the options passed to the http://docs.aws.amazon.com/sdkforruby/api/Aws/SQS/Client.html#receive_message-instance_method +There are a few ways to configure the AWS client: -The `sns_endpoint` and `sqs_endpoint` Shoryuken-specific options will also fallback to the environment variables `AWS_SNS_ENDPOINT` and `AWS_SQS_ENDPOINT` respectively, if they are set. - -### Configuration (producer side) - -'Producer' processes need permissions to put messages into SQS. There are a few ways: - -* Use the `configure_server` in Rails initializer * Ensure the `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY` env vars are set. * Create a `~/.aws/credentials` file. * Set `Aws.config[:credentials]` from Ruby code (e.g. in a Rails initializer) * Use the Instance Profiles feature. The IAM role of the targeted machine must have an adequate SQS Policy. -For example, use the `configure_server` in `config/initializers/shoryuken.rb`: - -```ruby -Shoryuken.configure_client do |config| - config.aws = { - secret_access_key: ENV["AWS_SECRET_ACCESS_KEY"], - access_key_id: ENV["AWS_ACCESS_KEY_ID"], - region: ENV["AWS_REGION"] - } -end - -Shoryuken.configure_server do |config| - config.aws = { - secret_access_key: ENV["AWS_SECRET_ACCESS_KEY"], - access_key_id: ENV["AWS_ACCESS_KEY_ID"], - region: ENV["AWS_REGION"] - } -end -``` - - -Note that storing your credentials into Amazon instances represents a security risk. Instance Profiles tends to be the best choice. - You can read about these in more detail [here](http://docs.aws.amazon.com/sdkforruby/api/Aws/SQS/Client.html). ### Rails Integration diff --git a/lib/shoryuken.rb b/lib/shoryuken.rb index 1ec70caf..97cf926d 100644 --- a/lib/shoryuken.rb +++ b/lib/shoryuken.rb @@ -8,7 +8,6 @@ require 'shoryuken/core_ext' require 'shoryuken/util' require 'shoryuken/logging' -require 'shoryuken/aws_config' require 'shoryuken/environment_loader' require 'shoryuken/queue' require 'shoryuken/message' @@ -21,8 +20,6 @@ Shoryuken::Middleware::Server.autoload :AutoExtendVisibility, 'shoryuken/middleware/server/auto_extend_visibility' require 'shoryuken/middleware/server/exponential_backoff_retry' require 'shoryuken/middleware/server/timing' -require 'shoryuken/sns_arn' -require 'shoryuken/topic' require 'shoryuken/polling' require 'shoryuken/manager' require 'shoryuken/launcher' @@ -39,78 +36,102 @@ module Shoryuken lifecycle_events: { startup: [], quiet: [], - shutdown: [], + shutdown: [] }, - polling_strategy: Polling::WeightedRoundRobin, - } + polling_strategy: Polling::WeightedRoundRobin + }.freeze @@queues = [] @@worker_registry = DefaultWorkerRegistry.new @@active_job_queue_name_prefixing = false + @@sqs_client = nil + @@sqs_client_receive_message_opts = {} + @@start_callback = nil + @@stop_callback = nil class << self - def options - @options ||= DEFAULTS.dup - end - def queues @@queues end - def logger - Shoryuken::Logging.logger - end - - def register_worker(*args) - worker_registry.register_worker(*args) + def worker_registry + @@worker_registry end def worker_registry=(worker_registry) @@worker_registry = worker_registry end - def worker_registry - @@worker_registry + def start_callback + @@start_callback + end + + def start_callback=(start_callback) + @@start_callback = start_callback + end + + def stop_callback + @@stop_callback + end + + def stop_callback=(stop_callback) + @@stop_callback = stop_callback end def active_job_queue_name_prefixing @@active_job_queue_name_prefixing end - def active_job_queue_name_prefixing=(prefixing) - @@active_job_queue_name_prefixing = prefixing + def active_job_queue_name_prefixing=(active_job_queue_name_prefixing) + @@active_job_queue_name_prefixing = active_job_queue_name_prefixing + end + + def sqs_client + @@sqs_client + end + + def sqs_client=(sqs_client) + @@sqs_client + end + + def sqs_client_receive_message_opts + @@sqs_client_receive_message_opts + end + + def sqs_client_receive_message_opts=(sqs_client_receive_message_opts) + @@sqs_client_receive_message_opts + end + + def options + @@options ||= DEFAULTS.dup + end + + def logger + Shoryuken::Logging.logger + end + + def register_worker(*args) + @@worker_registry.register_worker(*args) end - ## - # Configuration for Shoryuken server, use like: - # - # Shoryuken.configure_server do |config| - # config.aws = { :sqs_endpoint => '...', :access_key_id: '...', :secret_access_key: '...', region: '...' } - # end def configure_server yield self if server? end def server_middleware - @server_chain ||= default_server_middleware - yield @server_chain if block_given? - @server_chain + @@server_chain ||= default_server_middleware + yield @@server_chain if block_given? + @@server_chain end - ## - # Configuration for Shoryuken client, use like: - # - # Shoryuken.configure_client do |config| - # config.aws = { :sqs_endpoint => '...', :access_key_id: '...', :secret_access_key: '...', region: '...' } - # end def configure_client yield self unless server? end def client_middleware - @client_chain ||= default_client_middleware - yield @client_chain if block_given? - @client_chain + @@client_chain ||= default_client_middleware + yield @@client_chain if block_given? + @@client_chain end def default_worker_options @@ -120,27 +141,24 @@ def default_worker_options 'auto_delete' => false, 'auto_visibility_timeout' => false, 'retry_intervals' => nil, - 'batch' => false } + 'batch' => false + } end - def default_worker_options=(options) - @@default_worker_options = options - end - - def on_aws_initialization(&block) - @aws_initialization_callback = block + def default_worker_options=(default_worker_options) + @@default_worker_options = default_worker_options end def on_start(&block) - @start_callback = block + @@start_callback = block end def on_stop(&block) - @stop_callback = block + @@stop_callback = block end - def aws=(hash) - Shoryuken::AwsConfig.setup(hash) + def sqs_client + @@sqs_client ||= Aws::SQS::Client.new end # Register a block to run at a point in the Shoryuken lifecycle. @@ -157,10 +175,6 @@ def on(event, &block) options[:lifecycle_events][event] << block end - attr_reader :aws_initialization_callback, - :start_callback, - :stop_callback - private def default_server_middleware diff --git a/lib/shoryuken/aws_config.rb b/lib/shoryuken/aws_config.rb deleted file mode 100644 index d58f3496..00000000 --- a/lib/shoryuken/aws_config.rb +++ /dev/null @@ -1,64 +0,0 @@ -# frozen_string_literal: true -module Shoryuken - class AwsConfig - class << self - attr_writer :options - - def options - @options ||= {} - end - - def setup(hash) - # aws-sdk tries to load the credentials from the ENV variables: AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY - # when not explicit supplied - return if hash.empty? - - self.options = hash - - shoryuken_keys = %w( - account_id - sns_endpoint - sqs_endpoint - receive_message - ).map(&:to_sym) - - @aws_options = hash.reject do |k, _| - shoryuken_keys.include?(k) - end - - # assume credentials based authentication - credentials = Aws::Credentials.new( - @aws_options.delete(:access_key_id), - @aws_options.delete(:secret_access_key) - ) - - # but only if the configuration options have valid values - @aws_options.merge!(credentials: credentials) if credentials.set? - - if (callback = Shoryuken.aws_initialization_callback) - Shoryuken.logger.info { 'Calling Shoryuken.on_aws_initialization block' } - callback.call(@aws_options) - end - end - - def sns - Aws::SNS::Client.new(aws_client_options(:sns_endpoint)) - end - - def sqs - Aws::SQS::Client.new(aws_client_options(:sqs_endpoint)) - end - - private - - def aws_client_options(service_endpoint_key) - environment_endpoint = ENV["AWS_#{service_endpoint_key.to_s.upcase}"] - explicit_endpoint = options[service_endpoint_key] || environment_endpoint - endpoint = {}.tap do |hash| - hash[:endpoint] = explicit_endpoint unless explicit_endpoint.to_s.empty? - end - @aws_options.to_h.merge(endpoint) - end - end - end -end diff --git a/lib/shoryuken/client.rb b/lib/shoryuken/client.rb index fbd3ccf2..b2b8dcdd 100644 --- a/lib/shoryuken/client.rb +++ b/lib/shoryuken/client.rb @@ -1,31 +1,19 @@ module Shoryuken class Client @@queues = {} - @@topics = {} class << self def queues(name) @@queues[name.to_s] ||= Shoryuken::Queue.new(sqs, name) end - def sns - @sns ||= Shoryuken::AwsConfig.sns - end - - def sns_arn - @sns_arn ||= SnsArn - end - def sqs - @sqs ||= Shoryuken::AwsConfig.sqs + @@sqs ||= Shoryuken.sqs_client end - def topics(name) - @@topics[name.to_s] ||= Topic.new(name, sns) + def sqs=(sqs) + @@sqs = sqs end - - attr_accessor :account_id - attr_writer :sns, :sqs, :sqs_resource, :sns_arn end end end diff --git a/lib/shoryuken/environment_loader.rb b/lib/shoryuken/environment_loader.rb index 4b705937..d957d1de 100644 --- a/lib/shoryuken/environment_loader.rb +++ b/lib/shoryuken/environment_loader.rb @@ -28,7 +28,6 @@ def load prefix_active_job_queue_names parse_queues require_workers - initialize_aws validate_queues validate_workers patch_deprecated_workers @@ -49,16 +48,6 @@ def config_file_options YAML.load(ERB.new(IO.read(path)).result).deep_symbolize_keys end - # DEPRECATED: Please use configure_server and configure_client in - # https://github.com/phstc/shoryuken/blob/a81637d577b36c5cf245882733ea91a335b6602f/lib/shoryuken.rb#L82 - # Please delete this method afert next release (v2.0.12 or later) - def initialize_aws - unless Shoryuken.options[:aws].to_h.empty? - Shoryuken.logger.warn { '[DEPRECATION] aws in shoryuken.yml is deprecated. Please use configure_server and configure_client in your initializer' } - end - Shoryuken::AwsConfig.setup(Shoryuken.options[:aws]) - end - def initialize_logger Shoryuken::Logging.initialize_logger(Shoryuken.options[:logfile]) if Shoryuken.options[:logfile] Shoryuken.logger.level = Logger::DEBUG if Shoryuken.options[:verbose] diff --git a/lib/shoryuken/fetcher.rb b/lib/shoryuken/fetcher.rb index e4568154..5baced68 100644 --- a/lib/shoryuken/fetcher.rb +++ b/lib/shoryuken/fetcher.rb @@ -29,7 +29,7 @@ def receive_messages(queue, limit) # AWS limits the batch size by 10 limit = limit > FETCH_LIMIT ? FETCH_LIMIT : limit - options = (Shoryuken.options[:aws][:receive_message] || {}).dup + options = Shoryuken.sqs_client_receive_message_opts.to_h.dup options[:max_number_of_messages] = limit options[:message_attribute_names] = %w(All) options[:attribute_names] = %w(All) diff --git a/lib/shoryuken/manager.rb b/lib/shoryuken/manager.rb index e5544a30..0692d254 100644 --- a/lib/shoryuken/manager.rb +++ b/lib/shoryuken/manager.rb @@ -2,7 +2,8 @@ module Shoryuken class Manager include Util - BATCH_LIMIT = 10 + BATCH_LIMIT = 10 + HEARTBEAT_INTERVAL = 0.1 def initialize(fetcher, polling_strategy) @count = Shoryuken.options.fetch(:concurrency, 25) @@ -17,7 +18,9 @@ def initialize(fetcher, polling_strategy) @fetcher = fetcher @polling_strategy = polling_strategy - @heartbeat = Concurrent::TimerTask.new(run_now: true, execution_interval: 0.1, timeout_interval: 60) { dispatch } + @heartbeat = Concurrent::TimerTask.new(run_now: true, + execution_interval: HEARTBEAT_INTERVAL, + timeout_interval: 60) { dispatch } @pool = Concurrent::FixedThreadPool.new(@count, max_queue: @count) end @@ -59,15 +62,10 @@ def dispatch return if @done.true? return unless @dispatching.make_true - logger.debug { "Ready: #{ready}, Busy: #{busy}, Active Queues: #{@polling_strategy.active_queues}" } - - if ready == 0 - return logger.debug { 'Pausing fetcher, because all processors are busy' } - end + return if ready == 0 + return unless (queue = @polling_strategy.next_queue) - unless (queue = @polling_strategy.next_queue) - return logger.debug { 'Pausing fetcher, because all queues are paused' } - end + logger.debug { "Ready: #{ready}, Busy: #{busy}, Active Queues: #{@polling_strategy.active_queues}" } batched_queue?(queue) ? dispatch_batch(queue) : dispatch_single_messages(queue) ensure diff --git a/lib/shoryuken/polling.rb b/lib/shoryuken/polling.rb index 885c90b1..2a3bf974 100644 --- a/lib/shoryuken/polling.rb +++ b/lib/shoryuken/polling.rb @@ -65,7 +65,6 @@ def delay end class WeightedRoundRobin < BaseStrategy - def initialize(queues) @initial_queues = queues @queues = queues.dup.uniq @@ -129,7 +128,6 @@ def queue_weight(queues, queue) end class StrictPriority < BaseStrategy - def initialize(queues) # Priority ordering of the queues, highest priority first @queues = queues @@ -178,7 +176,7 @@ def next_active_queue return queue unless queue_paused?(queue) end - return nil + nil end def queues_unpaused_since? diff --git a/lib/shoryuken/sns_arn.rb b/lib/shoryuken/sns_arn.rb deleted file mode 100644 index 94d864f3..00000000 --- a/lib/shoryuken/sns_arn.rb +++ /dev/null @@ -1,27 +0,0 @@ -module Shoryuken - class SnsArn - def initialize(topic) - @topic = topic - end - - def to_s - @arn ||= "arn:aws:sns:#{region}:#{account_id}:#{@topic}" - end - - private - - def account_id - Shoryuken::Client.account_id.tap do |account_id| - if account_id.to_s.empty? - fail 'To generate SNS ARNs, you must assign an :account_id in your Shoryuken::Client.' - end - end - end - - def region - Aws.config.fetch(:region) do - fail 'To generate SNS ARNs, you must include a :region in your AWS config.' - end - end - end -end diff --git a/lib/shoryuken/topic.rb b/lib/shoryuken/topic.rb deleted file mode 100644 index 8c03c4a9..00000000 --- a/lib/shoryuken/topic.rb +++ /dev/null @@ -1,17 +0,0 @@ -module Shoryuken - class Topic - def initialize(name, sns) - @name, @sns = name, sns - end - - def arn - @arn ||= Client.sns_arn.new(@name).to_s - end - - def send_message(body, options = {}) - body = JSON.dump(body) if body.is_a?(Hash) - - @sns.publish(topic_arn: arn, message: body) - end - end -end diff --git a/spec/integration/launcher_spec.rb b/spec/integration/launcher_spec.rb index f5bd2a06..e4a84cd8 100644 --- a/spec/integration/launcher_spec.rb +++ b/spec/integration/launcher_spec.rb @@ -5,8 +5,6 @@ RSpec.describe Shoryuken::Launcher do describe 'Consuming messages', slow: :true do before do - Shoryuken.options[:aws][:receive_message] = { wait_time_seconds: 5 } - StandardWorker.received_messages = 0 queue = "test_shoryuken#{StandardWorker}_#{SecureRandom.uuid}" diff --git a/spec/shoryuken/client_spec.rb b/spec/shoryuken/client_spec.rb index da2faf82..c12e1d8b 100644 --- a/spec/shoryuken/client_spec.rb +++ b/spec/shoryuken/client_spec.rb @@ -1,17 +1,16 @@ require 'spec_helper' -describe Shoryuken::Client do +RSpec.describe Shoryuken::Client do let(:credentials) { Aws::Credentials.new('access_key_id', 'secret_access_key') } let(:sqs) { Aws::SQS::Client.new(stub_responses: true, credentials: credentials) } let(:queue_name) { 'shoryuken' } let(:queue_url) { 'https://eu-west-1.amazonaws.com:6059/123456789012/shoryuken' } - let(:sqs_endpoint) { 'http://localhost:4568' } - let(:sns_endpoint) { 'http://0.0.0.0:4568' } describe '.queue' do before do described_class.sqs = sqs end + it 'memoizes queues' do sqs.stub_responses(:get_queue_url, { queue_url: queue_url }, { queue_url: 'xyz' }) @@ -19,46 +18,4 @@ expect(Shoryuken::Client.queues(queue_name).url).to eq queue_url end end - - describe 'environment variable endpoints' do - before do - ENV['AWS_SQS_ENDPOINT'] = sqs_endpoint - ENV['AWS_SNS_ENDPOINT'] = sns_endpoint - ENV['AWS_REGION'] = 'us-east-1' - Shoryuken.options[:aws] = {} - Shoryuken::AwsConfig.options = {} - end - - it 'will use config file settings if set' do - load_config_file_by_file_name('shoryuken_endpoint.yml') - expect(described_class.sqs.config.endpoint.to_s).to eql('https://github.com/phstc/shoryuken:4568') - expect(described_class.sns.config.endpoint.to_s).to eq('http://127.0.0.1:4568') - end - - it 'should fallback to environment variable if config file not found or set' do - load_config_file_by_file_name(nil) - expect(described_class.sqs.config.endpoint.to_s).to eql(sqs_endpoint) - expect(described_class.sns.config.endpoint.to_s).to eq(sns_endpoint) - end - - it 'should fallback to environment variable if config file found but settings not set' do - load_config_file_by_file_name('shoryuken.yml') - expect(described_class.sqs.config.endpoint.to_s).to eql(sqs_endpoint) - expect(described_class.sns.config.endpoint.to_s).to eq(sns_endpoint) - end - - it 'will fallback to default settings if no config file settings or environment variables found' do - ENV['AWS_SQS_ENDPOINT'] = nil - ENV['AWS_SNS_ENDPOINT'] = nil - load_config_file_by_file_name('shoryuken.yml') - expect(described_class.sqs.config.endpoint.to_s).to eql('https://sqs.us-east-1.amazonaws.com') - expect(described_class.sns.config.endpoint.to_s).to eq('https://sns.us-east-1.amazonaws.com') - end - end - - def load_config_file_by_file_name(file_name) - path_name = file_name ? File.join(File.expand_path('../../..', __FILE__), 'spec', file_name) : nil - loader = Shoryuken::EnvironmentLoader.setup_options(config_file: path_name) - loader.load - end end diff --git a/spec/shoryuken/sns_arn_spec.rb b/spec/shoryuken/sns_arn_spec.rb deleted file mode 100644 index f4e28dec..00000000 --- a/spec/shoryuken/sns_arn_spec.rb +++ /dev/null @@ -1,42 +0,0 @@ -require 'spec_helper' - -describe Shoryuken::SnsArn do - let(:account_id) { '1234567890' } - let(:region) { 'eu-west-1' } - let(:topic) { 'topic-x' } - - before do - Shoryuken::Client.account_id = account_id - Aws.config = { region: region } - end - - subject { described_class.new(topic).to_s } - - describe '#to_s' do - context 'when the Aws config includes all the information necessary' do - it 'generates an SNS arn' do - expect(subject).to eq('arn:aws:sns:eu-west-1:1234567890:topic-x') - end - end - - context 'when the Aws config does not include the account id' do - before do - Shoryuken::Client.account_id = nil - end - - it 'fails' do - expect { subject }.to raise_error(/an :account_id/) - end - end - - context 'when the Aws config does not include the region' do - before do - Aws.config.delete :region - end - - it 'fails' do - expect { subject }.to raise_error(/a :region/) - end - end - end -end diff --git a/spec/shoryuken/topic_spec.rb b/spec/shoryuken/topic_spec.rb deleted file mode 100644 index e9507852..00000000 --- a/spec/shoryuken/topic_spec.rb +++ /dev/null @@ -1,32 +0,0 @@ -require 'spec_helper' - -describe Shoryuken::Topic do - let(:sns) { Aws::SNS::Client.new stub_responses: true } - let(:topic_arn) { 'arn:aws:sns:us-east-1:0987654321:shoryuken' } - let(:topic_name) { 'shoryuken' } - - before do - Shoryuken::Client.account_id = '0987654321' - Aws.config = { region: 'us-east-1' } - end - - subject { described_class.new(topic_name, sns) } - - describe '#send_message' do - it 'enqueues a message' do - sns.stub_responses(:publish, { message_id: 'msg1' }) - expect(sns).to receive(:publish).with(topic_arn: topic_arn, message: 'test') - - subject.send_message('test') - end - - it 'parses as JSON by default' do - msg = { field: 'test', other_field: 'other' } - - sns.stub_responses(:publish, { message_id: 'msg2' }) - expect(sns).to receive(:publish).with(topic_arn: topic_arn, message: JSON.dump(msg)) - - subject.send_message(msg) - end - end -end diff --git a/spec/shoryuken_endpoint.yml b/spec/shoryuken_endpoint.yml deleted file mode 100644 index fec9aa46..00000000 --- a/spec/shoryuken_endpoint.yml +++ /dev/null @@ -1,6 +0,0 @@ -aws: - access_key_id: <%= ENV['AWS_ACCESS_KEY_ID'] %> - secret_access_key: <%= ENV['AWS_SECRET_ACCESS_KEY'] %> - region: us-east-1 - sqs_endpoint: https://github.com/phstc/shoryuken:4568 - sns_endpoint: http://127.0.0.1:4568 \ No newline at end of file diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index c58c4f44..cf486a8b 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -34,11 +34,8 @@ def perform(sqs_msg, body); end config.before do Shoryuken::Client.class_variable_set :@@queues, {} - Shoryuken::Client.class_variable_set :@@visibility_timeouts, {} Shoryuken::Client.sqs = nil - Shoryuken::Client.sqs_resource = nil - Shoryuken::Client.sns = nil Shoryuken.queues.clear @@ -48,8 +45,6 @@ def perform(sqs_msg, body); end Shoryuken.options[:daemon] = nil Shoryuken.options[:logfile] = nil - Shoryuken.options[:aws].delete(:receive_message) - TestWorker.get_shoryuken_options.clear TestWorker.get_shoryuken_options['queue'] = 'default'