From 2270e5560eaab510818f7f5ba02168b7cc6025a7 Mon Sep 17 00:00:00 2001 From: Pablo Cantero Date: Sun, 30 Jul 2017 08:35:13 -0400 Subject: [PATCH 1/8] :lipstick: --- bin/shoryuken | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bin/shoryuken b/bin/shoryuken index c00f52bd..37c0dbee 100755 --- a/bin/shoryuken +++ b/bin/shoryuken @@ -32,7 +32,7 @@ module Shoryuken opts = options.to_h.symbolize_keys if opts[:config_file] - say "[DEPRECATED] Please use --config instead of --config-file", :yellow + say '[DEPRECATED] Please use --config instead of --config-file', :yellow end opts[:config_file] = opts.delete(:config) if opts[:config] From df28e3bf13070c3a7a45f2388fea52fe922dc3d3 Mon Sep 17 00:00:00 2001 From: Pablo Cantero Date: Sun, 30 Jul 2017 08:35:22 -0400 Subject: [PATCH 2/8] Allow creating FIFO queues `sqs create QUEUE-NAME.fifo` --- bin/cli/sqs.rb | 5 ++++- lib/shoryuken/fetcher.rb | 11 +++++------ lib/shoryuken/middleware/server/timing.rb | 8 ++++---- 3 files changed, 13 insertions(+), 11 deletions(-) diff --git a/bin/cli/sqs.rb b/bin/cli/sqs.rb index d46e89c6..34a3d3bb 100644 --- a/bin/cli/sqs.rb +++ b/bin/cli/sqs.rb @@ -188,7 +188,10 @@ def purge(queue_name) desc 'create QUEUE-NAME', 'Create a queue' def create(queue_name) - queue_url = sqs.create_queue(queue_name: queue_name).queue_url + attributes = {} + attributes['FifoQueue'] = 'true' if queue_name.end_with?('.fifo') + + queue_url = sqs.create_queue(queue_name: queue_name, attributes: attributes).queue_url say "Queue #{queue_name} was successfully created. Queue URL #{queue_url}", :green end diff --git a/lib/shoryuken/fetcher.rb b/lib/shoryuken/fetcher.rb index 24a74f02..4e5c73c5 100644 --- a/lib/shoryuken/fetcher.rb +++ b/lib/shoryuken/fetcher.rb @@ -4,8 +4,6 @@ class Fetcher FETCH_LIMIT = 10 - attr_reader :group - def initialize(group) @group = group end @@ -13,12 +11,13 @@ def initialize(group) def fetch(queue, limit) started_at = Time.now - logger.debug { "Looking for new messages in #{queue}" } + # logger.info { "Looking for new messages in #{queue} - #{limit}" } sqs_msgs = Array(receive_messages(queue, [FETCH_LIMIT, limit].min)) + # sqs_msgs = Array(receive_messages(queue, 1)) - logger.info { "Found #{sqs_msgs.size} messages for #{queue.name}" } unless sqs_msgs.empty? - logger.debug { "Fetcher for #{queue} completed in #{elapsed(started_at)} ms" } + # logger.info { "Found #{sqs_msgs.size} messages for #{queue.name}" } #unless sqs_msgs.empty? + # logger.debug { "Fetcher for #{queue} completed in #{elapsed(started_at)} ms" } sqs_msgs end @@ -29,7 +28,7 @@ def receive_messages(queue, limit) # AWS limits the batch size by 10 limit = limit > FETCH_LIMIT ? FETCH_LIMIT : limit - options = Shoryuken.sqs_client_receive_message_opts[group].to_h.dup + options = Shoryuken.sqs_client_receive_message_opts[@group].to_h.dup options[:max_number_of_messages] = limit options[:message_attribute_names] = %w(All) diff --git a/lib/shoryuken/middleware/server/timing.rb b/lib/shoryuken/middleware/server/timing.rb index 908310f2..c17187af 100644 --- a/lib/shoryuken/middleware/server/timing.rb +++ b/lib/shoryuken/middleware/server/timing.rb @@ -9,19 +9,19 @@ def call(worker, queue, sqs_msg, body) begin started_at = Time.now - logger.info { "started at #{started_at}" } + # logger.info { "started at #{started_at}" } yield total_time = elapsed(started_at) if (total_time / 1000.0) > (timeout = Shoryuken::Client.queues(queue).visibility_timeout) - logger.warn { "exceeded the queue visibility timeout by #{total_time - (timeout * 1000)} ms" } + # logger.warn { "exceeded the queue visibility timeout by #{total_time - (timeout * 1000)} ms" } end - logger.info { "completed in: #{total_time} ms" } + # logger.info { "completed in: #{total_time} ms" } rescue - logger.info { "failed in: #{elapsed(started_at)} ms" } + # logger.info { "failed in: #{elapsed(started_at)} ms" } raise end end From 0679bec6b2c751dcd94f82cb074154bdb940aa30 Mon Sep 17 00:00:00 2001 From: Pablo Cantero Date: Sun, 30 Jul 2017 08:42:56 -0400 Subject: [PATCH 3/8] Allow sending attributes for `sqs create` --- bin/cli/sqs.rb | 3 ++- lib/shoryuken/fetcher.rb | 11 ++++++----- lib/shoryuken/middleware/server/timing.rb | 8 ++++---- 3 files changed, 12 insertions(+), 10 deletions(-) diff --git a/bin/cli/sqs.rb b/bin/cli/sqs.rb index 34a3d3bb..af0dbb00 100644 --- a/bin/cli/sqs.rb +++ b/bin/cli/sqs.rb @@ -187,8 +187,9 @@ def purge(queue_name) end desc 'create QUEUE-NAME', 'Create a queue' + method_option :attributes, aliases: '-a', type: :hash, default: {} def create(queue_name) - attributes = {} + attributes = options[:attributes] attributes['FifoQueue'] = 'true' if queue_name.end_with?('.fifo') queue_url = sqs.create_queue(queue_name: queue_name, attributes: attributes).queue_url diff --git a/lib/shoryuken/fetcher.rb b/lib/shoryuken/fetcher.rb index 4e5c73c5..24a74f02 100644 --- a/lib/shoryuken/fetcher.rb +++ b/lib/shoryuken/fetcher.rb @@ -4,6 +4,8 @@ class Fetcher FETCH_LIMIT = 10 + attr_reader :group + def initialize(group) @group = group end @@ -11,13 +13,12 @@ def initialize(group) def fetch(queue, limit) started_at = Time.now - # logger.info { "Looking for new messages in #{queue} - #{limit}" } + logger.debug { "Looking for new messages in #{queue}" } sqs_msgs = Array(receive_messages(queue, [FETCH_LIMIT, limit].min)) - # sqs_msgs = Array(receive_messages(queue, 1)) - # logger.info { "Found #{sqs_msgs.size} messages for #{queue.name}" } #unless sqs_msgs.empty? - # logger.debug { "Fetcher for #{queue} completed in #{elapsed(started_at)} ms" } + logger.info { "Found #{sqs_msgs.size} messages for #{queue.name}" } unless sqs_msgs.empty? + logger.debug { "Fetcher for #{queue} completed in #{elapsed(started_at)} ms" } sqs_msgs end @@ -28,7 +29,7 @@ def receive_messages(queue, limit) # AWS limits the batch size by 10 limit = limit > FETCH_LIMIT ? FETCH_LIMIT : limit - options = Shoryuken.sqs_client_receive_message_opts[@group].to_h.dup + options = Shoryuken.sqs_client_receive_message_opts[group].to_h.dup options[:max_number_of_messages] = limit options[:message_attribute_names] = %w(All) diff --git a/lib/shoryuken/middleware/server/timing.rb b/lib/shoryuken/middleware/server/timing.rb index c17187af..908310f2 100644 --- a/lib/shoryuken/middleware/server/timing.rb +++ b/lib/shoryuken/middleware/server/timing.rb @@ -9,19 +9,19 @@ def call(worker, queue, sqs_msg, body) begin started_at = Time.now - # logger.info { "started at #{started_at}" } + logger.info { "started at #{started_at}" } yield total_time = elapsed(started_at) if (total_time / 1000.0) > (timeout = Shoryuken::Client.queues(queue).visibility_timeout) - # logger.warn { "exceeded the queue visibility timeout by #{total_time - (timeout * 1000)} ms" } + logger.warn { "exceeded the queue visibility timeout by #{total_time - (timeout * 1000)} ms" } end - # logger.info { "completed in: #{total_time} ms" } + logger.info { "completed in: #{total_time} ms" } rescue - # logger.info { "failed in: #{elapsed(started_at)} ms" } + logger.info { "failed in: #{elapsed(started_at)} ms" } raise end end From db80a97fd8bbf82ccac24f91ff3ed5c6fdde953f Mon Sep 17 00:00:00 2001 From: Pablo Cantero Date: Sun, 30 Jul 2017 10:35:49 -0400 Subject: [PATCH 4/8] Make `ls` more similar to `$ watch` --- bin/cli/sqs.rb | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/bin/cli/sqs.rb b/bin/cli/sqs.rb index af0dbb00..7ebca651 100644 --- a/bin/cli/sqs.rb +++ b/bin/cli/sqs.rb @@ -96,8 +96,8 @@ def dump_file(path, queue_name) end desc 'ls [QUEUE-NAME-PREFIX]', 'Lists queues' - method_option :watch, aliases: '-w', type: :boolean, desc: 'watch queues' - method_option :watch_interval, type: :numeric, default: 10, desc: 'watch interval' + method_option :watch, aliases: '-w', type: :boolean, desc: 'watch queues' + method_option :interval, aliases: '-n', type: :numeric, default: 2, desc: 'watch interval in seconds' def ls(queue_name_prefix = '') trap('SIGINT', 'EXIT') # expect ctrl-c from loop @@ -108,7 +108,7 @@ def ls(queue_name_prefix = '') break unless options[:watch] - sleep options[:watch_interval] + sleep options[:interval] puts end end From 0db7d093c5d443ac84c35617dfbdebece5a40b22 Mon Sep 17 00:00:00 2001 From: Pablo Cantero Date: Sun, 30 Jul 2017 10:57:38 -0400 Subject: [PATCH 5/8] Only set `FifoQueue` if not set already --- bin/cli/sqs.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bin/cli/sqs.rb b/bin/cli/sqs.rb index 7ebca651..87de6371 100644 --- a/bin/cli/sqs.rb +++ b/bin/cli/sqs.rb @@ -190,7 +190,7 @@ def purge(queue_name) method_option :attributes, aliases: '-a', type: :hash, default: {} def create(queue_name) attributes = options[:attributes] - attributes['FifoQueue'] = 'true' if queue_name.end_with?('.fifo') + attributes['FifoQueue'] ||= 'true' if queue_name.end_with?('.fifo') queue_url = sqs.create_queue(queue_name: queue_name, attributes: attributes).queue_url From 6ef980a3cd7d1d96c58e394b1b0e4660d8a4ca68 Mon Sep 17 00:00:00 2001 From: Pablo Cantero Date: Sun, 30 Jul 2017 11:05:38 -0400 Subject: [PATCH 6/8] Experimenting clearing out the console for sqs ls -w See https://stackoverflow.com/a/31214215/464685 --- bin/cli/sqs.rb | 2 ++ 1 file changed, 2 insertions(+) diff --git a/bin/cli/sqs.rb b/bin/cli/sqs.rb index 87de6371..c151326d 100644 --- a/bin/cli/sqs.rb +++ b/bin/cli/sqs.rb @@ -87,6 +87,8 @@ def list_and_print_queues(urls) entries.unshift(['Queue', 'Messages Available', 'Messages Inflight', 'Last Modified']) + system(Gem.win_platform? ? 'cls' : 'clear') + print_table(entries) end From b1edbbef48451db6d569b7f72ba96de7abd654f5 Mon Sep 17 00:00:00 2001 From: Pablo Cantero Date: Sun, 30 Jul 2017 19:18:06 -0400 Subject: [PATCH 7/8] Revert "Experimenting clearing out the console for sqs ls -w" This reverts commit 6ef980a3cd7d1d96c58e394b1b0e4660d8a4ca68. --- bin/cli/sqs.rb | 2 -- 1 file changed, 2 deletions(-) diff --git a/bin/cli/sqs.rb b/bin/cli/sqs.rb index c151326d..87de6371 100644 --- a/bin/cli/sqs.rb +++ b/bin/cli/sqs.rb @@ -87,8 +87,6 @@ def list_and_print_queues(urls) entries.unshift(['Queue', 'Messages Available', 'Messages Inflight', 'Last Modified']) - system(Gem.win_platform? ? 'cls' : 'clear') - print_table(entries) end From 5a14b884fc320e79a8f7e12bd239a7d8a4338bf1 Mon Sep 17 00:00:00 2001 From: Pablo Cantero Date: Sun, 30 Jul 2017 19:19:33 -0400 Subject: [PATCH 8/8] Add desc for create/attributes --- bin/cli/sqs.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bin/cli/sqs.rb b/bin/cli/sqs.rb index 87de6371..4aa4acdc 100644 --- a/bin/cli/sqs.rb +++ b/bin/cli/sqs.rb @@ -187,7 +187,7 @@ def purge(queue_name) end desc 'create QUEUE-NAME', 'Create a queue' - method_option :attributes, aliases: '-a', type: :hash, default: {} + method_option :attributes, aliases: '-a', type: :hash, default: {}, desc: 'queue attributes' def create(queue_name) attributes = options[:attributes] attributes['FifoQueue'] ||= 'true' if queue_name.end_with?('.fifo')