Skip to content

Commit

Permalink
Merge pull request #330 from phstc/cleanup
Browse files Browse the repository at this point in the history
Add thor (cli) and sqs commands
  • Loading branch information
phstc committed Mar 11, 2017
2 parents ce5a304 + a7f2e2f commit 60905be
Show file tree
Hide file tree
Showing 9 changed files with 319 additions and 119 deletions.
28 changes: 9 additions & 19 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -139,26 +139,16 @@ You can read about these in more detail [here](http://docs.aws.amazon.com/sdkfor
bundle exec shoryuken -r worker.rb -C shoryuken.yml
```

Other options:

```bash
shoryuken --help
shoryuken [options]
-c, --concurrency INT Processor threads to use
-d, --daemon Daemonize process
-q, --queue QUEUE[,WEIGHT]... Queues to process with optional weights
-r, --require [PATH|DIR] Location of the worker
-C, --config PATH Path to YAML config file
-R, --rails Attempts to load the containing Rails project
-L, --logfile PATH Path to writable logfile
-P, --pidfile PATH Path to pidfile
-v, --verbose Print more verbose output
-V, --version Print version and exit
-h, --help Show help
...
```
For other options check `bundle exec shoryuken help start`

#### SQS commands

Check also some available SQS commands `bundle exec shoryuken help sqs`, such as:

- `ls` list queues
- `mv` move messages from one queue to another
- `dump` dump messages from a queue into a JSON lines file
- `requeue` requeue messages from a dump file

## More Information

Expand Down
42 changes: 42 additions & 0 deletions bin/cli/base.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# rubocop:disable Metrics/BlockLength
module Shoryuken
module CLI
class Base < Thor
no_commands do
def print_table(entries)
column_sizes = print_columns_size(entries)

entries.map do |entry|
puts entry.map.with_index { |e, i| print_format_column(e, column_sizes[i]) }.join
end
end

def print_columns_size(entries)
column_sizes = Hash.new(0)

entries.each do |entry|
entry.each_with_index do |e, i|
e = e.to_s
column_sizes[i] = e.size if column_sizes[i] < e.size
end
end

column_sizes
end

def print_format_column(column, size)
size = 40 if size > 40
size_with_padding = size + 4
column = column.to_s.ljust(size_with_padding)
column = "#{column[0...size - 2]}.." if column.size > size_with_padding
column
end

def fail_task(msg, quit = true)
say "[FAIL] #{msg}", :red
exit(1) if quit
end
end
end
end
end
186 changes: 186 additions & 0 deletions bin/cli/sqs.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
require 'date'

# rubocop:disable Metrics/AbcSize, Metrics/BlockLength
module Shoryuken
module CLI
class SQS < Base
namespace :sqs

no_commands do
def normalize_dump_message(message)
message[:id] = message.delete(:message_id)
message[:message_body] = message.delete(:body)
message.delete(:receipt_handle)
message.delete(:md5_of_body)
message.delete(:md5_of_message_attributes)
message
end

def sqs
@_sqs ||= Aws::SQS::Client.new
end

def find_queue_url(queue_name_prefix)
urls = sqs.list_queues(queue_name_prefix: queue_name_prefix).queue_urls

if urls.size > 1
fail_task "There's more than one queue starting with #{queue_name_prefix}: #{urls.join(', ')}"
end

url = urls.first

fail_task "Queue #{queue_name_prefix} not found" unless url

url
end

def batch_delete(url, messages)
messages.to_a.flatten.each_slice(10) do |batch|
sqs.delete_message_batch(
queue_url: url,
entries: batch.map { |message| { id: message.message_id, receipt_handle: message.receipt_handle } }
).failed.any? do |failure|
say "Could not delete #{failure.id}, code: #{failure.code}", :yellow
end
end
end

def batch_send(url, messages)
messages.to_a.flatten.map(&method(:normalize_dump_message)).each_slice(10) do |batch|
sqs.send_message_batch(queue_url: url, entries: batch).failed.any? do |failure|
say "Could not requeue #{failure.id}, code: #{failure.code}", :yellow
end
end
end

def find_all(url, limit, &block)
count = 0
batch_size = limit > 10 ? 10 : limit

loop do
n = limit - count
batch_size = n if n < batch_size

messages = sqs.receive_message(
queue_url: url,
max_number_of_messages: batch_size,
message_attribute_names: ['All']
).messages

messages.each { |m| yield m }

count += messages.size

break if count >= limit
break if messages.empty?
end

count
end

def list_and_print_queues(urls)
attrs = %w(QueueArn ApproximateNumberOfMessages ApproximateNumberOfMessagesNotVisible LastModifiedTimestamp)

entries = urls.map { |u| sqs.get_queue_attributes(queue_url: u, attribute_names: attrs).attributes }.map do |q|
[
q['QueueArn'].split(':').last,
q['ApproximateNumberOfMessages'],
q['ApproximateNumberOfMessagesNotVisible'],
Time.at(q['LastModifiedTimestamp'].to_i)
]
end

entries.unshift(['Queue', 'Messages Available', 'Messages Inflight', 'Last Modified'])

print_table(entries)
end

def dump_file(path, queue_name)
File.join(path, "#{queue_name}-#{Date.today}.jsonl")
end
end

desc 'ls [QUEUE-NAME-PREFIX]', 'List queues'
method_option :watch, aliases: '-w', type: :boolean, desc: 'watch queues'
method_option :watch_interval, type: :numeric, default: 10, desc: 'watch interval'
def ls(queue_name_prefix = '')
urls = sqs.list_queues(queue_name_prefix: queue_name_prefix).queue_urls

loop do
list_and_print_queues(urls)

break unless options.watch

sleep options.watch_interval
puts
end
end

desc 'dump QUEUE-NAME', 'Dump messages from a queue into a JSON lines file'
method_option :number, aliases: '-n', type: :numeric, default: Float::INFINITY, desc: 'number of messages to dump'
method_option :path, aliases: '-p', type: :string, default: './', desc: 'path to save the dump file'
method_option :delete, aliases: '-d', type: :boolean, default: true, desc: 'delete from the queue'
def dump(queue_name)
path = dump_file(options.path, queue_name)

fail_task "File #{path} already exists" if File.exist?(path)

url = find_queue_url(queue_name)

messages = []

file = nil

count = find_all(url, options.number) do |m|
file ||= File.open(path, 'w')

file.puts(JSON.dump(m.to_h))

messages << m if options.delete
end

batch_delete(url, messages) if options.delete

if count.zero?
say "Queue #{queue_name} is empty", :yellow
else
say "Dump saved in #{path} with #{count} messages", :green
end
ensure
file.close if file
end

desc 'requeue QUEUE-NAME PATH', 'Requeue messages from a dump file'
def requeue(queue_name, path)
fail_task "Path #{path} not found" unless File.exist?(path)

messages = File.readlines(path).map { |line| JSON.parse(line, symbolize_names: true) }

batch_send(find_queue_url(queue_name), messages)

say "Requeued #{messages.size} messages from #{path} to #{queue_name}", :green
end

desc 'mv QUEUE-NAME-SOURCE QUEUE-NAME-TARGET', 'Move messages from one queue (source) to another (target)'
method_option :number, aliases: '-n', type: :numeric, default: Float::INFINITY, desc: 'number of messages to move'
method_option :delete, aliases: '-d', type: :boolean, default: true, desc: 'delete from the queue'
def mv(queue_name_source, queue_name_target)
url_source = find_queue_url(queue_name_source)
messages = []

count = find_all(url_source, options.number) do |m|
messages << m
end

batch_send(find_queue_url(queue_name_target), messages.map(&:to_h))
batch_delete(url_source, messages) if options.delete

if count.zero?
say "Queue #{queue_name_source} is empty", :yellow
else
say "Moved #{count} messages from #{queue_name_source} to #{queue_name_target}", :green
end
end
end
end
end
55 changes: 46 additions & 9 deletions bin/shoryuken
Original file line number Diff line number Diff line change
@@ -1,12 +1,49 @@
#!/usr/bin/env ruby

require_relative '../lib/shoryuken/cli'

begin
Shoryuken::CLI.instance.run(ARGV)
rescue => e
raise e if $DEBUG
STDERR.puts e.message
STDERR.puts e.backtrace.join("\n")
exit 1
require 'rubygems'

require 'thor'
require 'aws-sdk-core'
require_relative 'cli/base'
require_relative 'cli/sqs'
require_relative '../lib/shoryuken/runner'

module Shoryuken
module CLI
class Runner < Base
default_task :start

register(Shoryuken::CLI::SQS, 'sqs', 'sqs COMMAND', 'SQS commands')

desc 'start', 'Start shoryuken'
method_option :concurrency, aliases: '-c', type: :numeric, desc: 'Processor threads to use'
method_option :daemon, aliases: '-d', type: :boolean, desc: 'Daemonize process'
method_option :queues, aliases: '-q', type: :array, desc: 'Queues to process with optional weights'
method_option :require, aliases: '-r', type: :string, desc: 'Dir or path of the workers'
method_option :config, aliases: '-C', type: :string, desc: 'Path to config file'
method_option :rails, aliases: '-R', type: :boolean, desc: 'Load Rails'
method_option :logfile, aliases: '-L', type: :string, desc: 'Path to logfile'
method_option :pidfile, aliases: '-P', type: :string, desc: 'Path to pidfile'
method_option :verbose, aliases: '-v', type: :boolean, desc: 'Print more verbose output'
def start
opts = options.to_h.symbolize_keys

# Keep compatibility with old CLI queue format
opts[:queues] = opts.queues.to_a.map { |q| q.split(',') }

if opts.daemon && opts.logfile.nil?
fail_task "You should set a logfile if you're going to daemonize"
end

Shoryuken::Runner.instance.run(opts.freeze)
end

desc 'version', 'Print version'
def version
say "Shoryuken #{Shoryuken::VERSION}"
end
end
end
end

Shoryuken::CLI::Runner.start
7 changes: 3 additions & 4 deletions lib/shoryuken/environment_loader.rb
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,11 @@ def load_rails
end

def merge_cli_defined_queues
cli_defined_queues = options.delete(:queues) || []
cli_defined_queues = options[:queues].to_a

cli_defined_queues.each do |cli_defined_queue|
Shoryuken.options[:queues].delete_if do |config_file_queue|
config_file_queue[0] == cli_defined_queue[0]
end
# CLI defined queues override config_file defined queues
Shoryuken.options[:queues].delete_if { |config_file_queue| config_file_queue[0] == cli_defined_queue[0] }

Shoryuken.options[:queues] << cli_defined_queue
end
Expand Down
Loading

0 comments on commit 60905be

Please sign in to comment.