-
-
Notifications
You must be signed in to change notification settings - Fork 280
/
launcher_spec.rb
128 lines (92 loc) · 3.35 KB
/
launcher_spec.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
require 'spec_helper'
require 'shoryuken/manager'
require 'shoryuken/launcher'
require 'securerandom'
RSpec.describe Shoryuken::Launcher do
let(:sqs_client) do
Aws::SQS::Client.new(
region: 'us-east-1',
endpoint: 'http://localhost:5000',
access_key_id: 'fake',
secret_access_key: 'fake'
)
end
let(:executor) do
# We can't use Concurrent.global_io_executor in these tests since once you
# shut down a thread pool, you can't start it back up. Instead, we create
# one new thread pool executor for each spec. We use a new
# CachedThreadPool, since that most closely resembles
# Concurrent.global_io_executor
Concurrent::CachedThreadPool.new auto_terminate: true
end
describe 'Consuming messages' do
before do
Aws.config[:stub_responses] = false
allow(Shoryuken).to receive(:launcher_executor).and_return(executor)
Shoryuken.configure_client do |config|
config.sqs_client = sqs_client
end
Shoryuken.configure_server do |config|
config.sqs_client = sqs_client
end
StandardWorker.received_messages = 0
queue = "shoryuken-travis-#{StandardWorker}-#{SecureRandom.uuid}"
Shoryuken::Client.sqs.create_queue(queue_name: queue)
Shoryuken.add_group('default', 1)
Shoryuken.add_queue(queue, 1, 'default')
StandardWorker.get_shoryuken_options['queue'] = queue
Shoryuken.register_worker(queue, StandardWorker)
end
after do
Aws.config[:stub_responses] = true
queue_url = Shoryuken::Client.sqs.get_queue_url(
queue_name: StandardWorker.get_shoryuken_options['queue']
).queue_url
Shoryuken::Client.sqs.delete_queue(queue_url: queue_url)
end
it 'consumes as a command worker' do
StandardWorker.perform_async('Yo')
poll_queues_until { StandardWorker.received_messages > 0 }
expect(StandardWorker.received_messages).to eq 1
end
it 'consumes a message' do
StandardWorker.get_shoryuken_options['batch'] = false
Shoryuken::Client.queues(StandardWorker.get_shoryuken_options['queue']).send_message(message_body: 'Yo')
poll_queues_until { StandardWorker.received_messages > 0 }
expect(StandardWorker.received_messages).to eq 1
end
it 'consumes a batch' do
StandardWorker.get_shoryuken_options['batch'] = true
entries = 10.times.map { |i| { id: SecureRandom.uuid, message_body: i.to_s } }
Shoryuken::Client.queues(StandardWorker.get_shoryuken_options['queue']).send_messages(entries: entries)
# Give the messages a chance to hit the queue so they are all available at the same time
sleep 2
poll_queues_until { StandardWorker.received_messages > 0 }
expect(StandardWorker.received_messages).to be > 1
end
def poll_queues_until
subject.start
Timeout::timeout(10) do
begin
sleep 0.5
end until yield
end
ensure
subject.stop
end
class StandardWorker
include Shoryuken::Worker
@@received_messages = 0
shoryuken_options auto_delete: true
def perform(sqs_msg, _body)
@@received_messages += Array(sqs_msg).size
end
def self.received_messages
@@received_messages
end
def self.received_messages=(received_messages)
@@received_messages = received_messages
end
end
end
end