Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add thor (cli) and sqs commands #330

Merged
merged 20 commits into from
Mar 11, 2017
28 changes: 9 additions & 19 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -139,26 +139,16 @@ You can read about these in more detail [here](http://docs.aws.amazon.com/sdkfor
bundle exec shoryuken -r worker.rb -C shoryuken.yml
```

Other options:

```bash
shoryuken --help

shoryuken [options]
-c, --concurrency INT Processor threads to use
-d, --daemon Daemonize process
-q, --queue QUEUE[,WEIGHT]... Queues to process with optional weights
-r, --require [PATH|DIR] Location of the worker
-C, --config PATH Path to YAML config file
-R, --rails Attempts to load the containing Rails project
-L, --logfile PATH Path to writable logfile
-P, --pidfile PATH Path to pidfile
-v, --verbose Print more verbose output
-V, --version Print version and exit
-h, --help Show help
...
```
For other options check `bundle exec shoryuken help start`

#### SQS commands

Check also some available SQS commands `bundle exec shoryuken help sqs`, such as:

- `ls` list queues
- `mv` move messages from one queue to another
- `dump` dump messages from a queue into a JSON lines file
- `requeue` requeue messages from a dump file

## More Information

Expand Down
42 changes: 42 additions & 0 deletions bin/cli/base.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# rubocop:disable Metrics/BlockLength
module Shoryuken
module CLI
class Base < Thor
no_commands do
def print_table(entries)
column_sizes = print_columns_size(entries)

entries.map do |entry|
puts entry.map.with_index { |e, i| print_format_column(e, column_sizes[i]) }.join
end
end

def print_columns_size(entries)
column_sizes = Hash.new(0)

entries.each do |entry|
entry.each_with_index do |e, i|
e = e.to_s
column_sizes[i] = e.size if column_sizes[i] < e.size
end
end

column_sizes
end

def print_format_column(column, size)
size = 40 if size > 40
size_with_padding = size + 4
column = column.to_s.ljust(size_with_padding)
column = "#{column[0...size - 2]}.." if column.size > size_with_padding
column
end

def fail_task(msg, quit = true)
say "[FAIL] #{msg}", :red
exit(1) if quit
end
end
end
end
end
186 changes: 186 additions & 0 deletions bin/cli/sqs.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
require 'date'

# rubocop:disable Metrics/AbcSize, Metrics/BlockLength
module Shoryuken
module CLI
class SQS < Base
namespace :sqs

no_commands do
def normalize_dump_message(message)
message[:id] = message.delete(:message_id)
message[:message_body] = message.delete(:body)
message.delete(:receipt_handle)
message.delete(:md5_of_body)
message.delete(:md5_of_message_attributes)
message
end

def sqs
@_sqs ||= Aws::SQS::Client.new
end

def find_queue_url(queue_name_prefix)
urls = sqs.list_queues(queue_name_prefix: queue_name_prefix).queue_urls

if urls.size > 1
fail_task "There's more than one queue starting with #{queue_name_prefix}: #{urls.join(', ')}"
end

url = urls.first

fail_task "Queue #{queue_name_prefix} not found" unless url

url
end

def batch_delete(url, messages)
messages.to_a.flatten.each_slice(10) do |batch|
sqs.delete_message_batch(
queue_url: url,
entries: batch.map { |message| { id: message.message_id, receipt_handle: message.receipt_handle } }
).failed.any? do |failure|
say "Could not delete #{failure.id}, code: #{failure.code}", :yellow
end
end
end

def batch_send(url, messages)
messages.to_a.flatten.map(&method(:normalize_dump_message)).each_slice(10) do |batch|
sqs.send_message_batch(queue_url: url, entries: batch).failed.any? do |failure|
say "Could not requeue #{failure.id}, code: #{failure.code}", :yellow
end
end
end

def find_all(url, limit, &block)
count = 0
batch_size = limit > 10 ? 10 : limit

loop do
n = limit - count
batch_size = n if n < batch_size

messages = sqs.receive_message(
queue_url: url,
max_number_of_messages: batch_size,
message_attribute_names: ['All']
).messages

messages.each { |m| yield m }

count += messages.size

break if count >= limit
break if messages.empty?
end

count
end

def list_and_print_queues(urls)
attrs = %w(QueueArn ApproximateNumberOfMessages ApproximateNumberOfMessagesNotVisible LastModifiedTimestamp)

entries = urls.map { |u| sqs.get_queue_attributes(queue_url: u, attribute_names: attrs).attributes }.map do |q|
[
q['QueueArn'].split(':').last,
q['ApproximateNumberOfMessages'],
q['ApproximateNumberOfMessagesNotVisible'],
Time.at(q['LastModifiedTimestamp'].to_i)
]
end

entries.unshift(['Queue', 'Messages Available', 'Messages Inflight', 'Last Modified'])

print_table(entries)
end

def dump_file(path, queue_name)
File.join(path, "#{queue_name}-#{Date.today}.jsonl")
end
end

desc 'ls [QUEUE-NAME-PREFIX]', 'List queues'
method_option :watch, aliases: '-w', type: :boolean, desc: 'watch queues'
method_option :watch_interval, type: :numeric, default: 10, desc: 'watch interval'
def ls(queue_name_prefix = '')
urls = sqs.list_queues(queue_name_prefix: queue_name_prefix).queue_urls

loop do
list_and_print_queues(urls)

break unless options.watch

sleep options.watch_interval
puts
end
end

desc 'dump QUEUE-NAME', 'Dump messages from a queue into a JSON lines file'
method_option :number, aliases: '-n', type: :numeric, default: Float::INFINITY, desc: 'number of messages to dump'
method_option :path, aliases: '-p', type: :string, default: './', desc: 'path to save the dump file'
method_option :delete, aliases: '-d', type: :boolean, default: true, desc: 'delete from the queue'
def dump(queue_name)
path = dump_file(options.path, queue_name)

fail_task "File #{path} already exists" if File.exist?(path)

url = find_queue_url(queue_name)

messages = []

file = nil

count = find_all(url, options.number) do |m|
file ||= File.open(path, 'w')

file.puts(JSON.dump(m.to_h))

messages << m if options.delete
end

batch_delete(url, messages) if options.delete

if count.zero?
say "Queue #{queue_name} is empty", :yellow
else
say "Dump saved in #{path} with #{count} messages", :green
end
ensure
file.close if file
end

desc 'requeue QUEUE-NAME PATH', 'Requeue messages from a dump file'
def requeue(queue_name, path)
fail_task "Path #{path} not found" unless File.exist?(path)

messages = File.readlines(path).map { |line| JSON.parse(line, symbolize_names: true) }

batch_send(find_queue_url(queue_name), messages)

say "Requeued #{messages.size} messages from #{path} to #{queue_name}", :green
end

desc 'mv QUEUE-NAME-SOURCE QUEUE-NAME-TARGET', 'Move messages from one queue (source) to another (target)'
method_option :number, aliases: '-n', type: :numeric, default: Float::INFINITY, desc: 'number of messages to move'
method_option :delete, aliases: '-d', type: :boolean, default: true, desc: 'delete from the queue'
def mv(queue_name_source, queue_name_target)
url_source = find_queue_url(queue_name_source)
messages = []

count = find_all(url_source, options.number) do |m|
messages << m
end

batch_send(find_queue_url(queue_name_target), messages.map(&:to_h))
batch_delete(url_source, messages) if options.delete

if count.zero?
say "Queue #{queue_name_source} is empty", :yellow
else
say "Moved #{count} messages from #{queue_name_source} to #{queue_name_target}", :green
end
end
end
end
end
55 changes: 46 additions & 9 deletions bin/shoryuken
Original file line number Diff line number Diff line change
@@ -1,12 +1,49 @@
#!/usr/bin/env ruby

require_relative '../lib/shoryuken/cli'

begin
Shoryuken::CLI.instance.run(ARGV)
rescue => e
raise e if $DEBUG
STDERR.puts e.message
STDERR.puts e.backtrace.join("\n")
exit 1
require 'rubygems'

require 'thor'
require 'aws-sdk-core'
require_relative 'cli/base'
require_relative 'cli/sqs'
require_relative '../lib/shoryuken/runner'

module Shoryuken
module CLI
class Runner < Base
default_task :start

register(Shoryuken::CLI::SQS, 'sqs', 'sqs COMMAND', 'SQS commands')

desc 'start', 'Start shoryuken'
method_option :concurrency, aliases: '-c', type: :numeric, desc: 'Processor threads to use'
method_option :daemon, aliases: '-d', type: :boolean, desc: 'Daemonize process'
method_option :queues, aliases: '-q', type: :array, desc: 'Queues to process with optional weights'
method_option :require, aliases: '-r', type: :string, desc: 'Dir or path of the workers'
method_option :config, aliases: '-C', type: :string, desc: 'Path to config file'
method_option :rails, aliases: '-R', type: :boolean, desc: 'Load Rails'
method_option :logfile, aliases: '-L', type: :string, desc: 'Path to logfile'
method_option :pidfile, aliases: '-P', type: :string, desc: 'Path to pidfile'
method_option :verbose, aliases: '-v', type: :boolean, desc: 'Print more verbose output'
def start
opts = options.to_h.symbolize_keys

# Keep compatibility with old CLI queue format
opts[:queues] = opts.queues.to_a.map { |q| q.split(',') }

if opts.daemon && opts.logfile.nil?
fail_task "You should set a logfile if you're going to daemonize"
end

Shoryuken::Runner.instance.run(opts.freeze)
end

desc 'version', 'Print version'
def version
say "Shoryuken #{Shoryuken::VERSION}"
end
end
end
end

Shoryuken::CLI::Runner.start
7 changes: 3 additions & 4 deletions lib/shoryuken/environment_loader.rb
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,11 @@ def load_rails
end

def merge_cli_defined_queues
cli_defined_queues = options.delete(:queues) || []
cli_defined_queues = options[:queues].to_a

cli_defined_queues.each do |cli_defined_queue|
Shoryuken.options[:queues].delete_if do |config_file_queue|
config_file_queue[0] == cli_defined_queue[0]
end
# CLI defined queues override config_file defined queues
Shoryuken.options[:queues].delete_if { |config_file_queue| config_file_queue[0] == cli_defined_queue[0] }

Shoryuken.options[:queues] << cli_defined_queue
end
Expand Down
Loading