Skip to content

Commit

Permalink
Add sqs mv command
Browse files Browse the repository at this point in the history
  • Loading branch information
Pablo Cantero committed Mar 11, 2017
1 parent dc00742 commit 54d9cfc
Showing 1 changed file with 42 additions and 13 deletions.
55 changes: 42 additions & 13 deletions bin/cli/sqs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,33 @@ def find_queue_url(queue_name_prefix)
fail_task "There's more than one queue starting with #{queue_name_prefix}: #{urls.join(', ')}"
end

urls.first
url = urls.first

fail_task "Queue #{queue_name_prefix} not found" unless url

url
end

def batch_delete(queue_url, messages)
def batch_delete(url, messages)
messages.to_a.flatten.each_slice(10) do |batch|
sqs.delete_message_batch(
queue_url: queue_url,
queue_url: url,
entries: batch.map { |message| { id: message.message_id, receipt_handle: message.receipt_handle } }
).failed.any? do |failure|
say "Could not delete #{failure.id}, code: #{failure.code}", :yellow
end
end
end

def find_all(queue_url, limit, &block)
def batch_send(url, messages)
messages.to_a.flatten.map(&method(:normalize_dump_message)).each_slice(10) do |batch|
sqs.send_message_batch(queue_url: url, entries: batch).failed.any? do |failure|
say "Could not requeue #{failure.id}, code: #{failure.code}", :yellow
end
end
end

def find_all(url, limit, &block)
count = 0
batch_size = limit > 10 ? 10 : limit

Expand All @@ -50,7 +62,7 @@ def find_all(queue_url, limit, &block)
batch_size = n if n < batch_size

messages = sqs.receive_message(
queue_url: queue_url,
queue_url: url,
max_number_of_messages: batch_size,
message_attribute_names: ['All']
).messages
Expand Down Expand Up @@ -102,7 +114,7 @@ def dump(queue_name)

url = find_queue_url(queue_name)

delete_batch = []
messages = []

file = nil

Expand All @@ -111,10 +123,10 @@ def dump(queue_name)

file.puts(JSON.dump(m.to_h))

delete_batch << messages if options[:delete]
messages << m if options[:delete]
end

batch_delete(url, delete_batch) if options[:delete]
batch_delete(url, messages) if options[:delete]

if count.zero?
say "Queue #{queue_name} is empty", :yellow
Expand All @@ -131,12 +143,29 @@ def requeue(queue_name, path)

messages = File.readlines(path).map { |line| JSON.parse(line, symbolize_names: true) }

url = find_queue_url(queue_name)
batch_send(find_queue_url(queue_name), messages)

messages.map(&method(:normalize_dump_message)).each_slice(10) do |batch|
sqs.send_message_batch(queue_url: url, entries: batch).failed.any? do |failure|
say "Could not requeue #{failure.id}, code: #{failure.code}", :yellow
end
say "Requeued #{messages.size} messages from #{path} to #{queue_name}", :green
end

desc 'mv QUEUE-NAME-SOURCE QUEUE-NAME-TARGET', 'Move messages from one queue (source) to another (target)'
method_option :number, aliases: '-n', type: :numeric, default: Float::INFINITY, desc: 'number of messages to move'
method_option :delete, aliases: '-d', type: :boolean, default: true, desc: 'delete from the queue'
def mv(queue_name_source, queue_name_target)
url_source = find_queue_url(queue_name_source)
messages = []

count = find_all(url_source, options[:number]) do |m|
messages << m
end

batch_send(find_queue_url(queue_name_target), messages.map(&:to_h))
batch_delete(url_source, messages) if options[:delete]

if count.zero?
say "Queue #{queue_name_source} is empty", :yellow
else
say "Moved #{count} messages from #{queue_name_source} to #{queue_name_target}", :green
end
end
end
Expand Down

0 comments on commit 54d9cfc

Please sign in to comment.