Skip to content

Commit 8fe17d1

Browse files
committed
Fix queue order when combining multiple prefixes or prefixes and names
We were altering the original order to be exact names and then prefixes in the order returned by the DB, which doesn't need to be the order specified for the worker. This change ensures the order is respected.
1 parent 5b80074 commit 8fe17d1

File tree

2 files changed

+107
-36
lines changed

2 files changed

+107
-36
lines changed

app/models/solid_queue/queue_selector.rb

+35-5
Original file line numberDiff line numberDiff line change
@@ -34,16 +34,20 @@ def queue_names
3434
def eligible_queues
3535
if include_all_queues? then all_queues
3636
else
37-
exact_names + prefixed_names
37+
in_raw_order(exact_names + prefixed_names)
3838
end
3939
end
4040

4141
def include_all_queues?
4242
"*".in? raw_queues
4343
end
4444

45+
def all_queues
46+
relation.distinct(:queue_name).pluck(:queue_name)
47+
end
48+
4549
def exact_names
46-
raw_queues.select { |queue| !queue.include?("*") }
50+
raw_queues.select { |queue| exact_name?(queue) }
4751
end
4852

4953
def prefixed_names
@@ -54,15 +58,41 @@ def prefixed_names
5458
end
5559

5660
def prefixes
57-
@prefixes ||= raw_queues.select { |queue| queue.ends_with?("*") }.map { |queue| queue.tr("*", "%") }
61+
@prefixes ||= raw_queues.select { |queue| prefixed_name?(queue) }.map { |queue| queue.tr("*", "%") }
5862
end
5963

60-
def all_queues
61-
relation.distinct(:queue_name).pluck(:queue_name)
64+
def exact_name?(queue)
65+
!queue.include?("*")
66+
end
67+
68+
def prefixed_name?(queue)
69+
queue.ends_with?("*")
6270
end
6371

6472
def paused_queues
6573
@paused_queues ||= Pause.all.pluck(:queue_name)
6674
end
75+
76+
def in_raw_order(queues)
77+
# Only need to sort if we have prefixes and more than one queue name.
78+
# Exact names are selected in the same order as they're found
79+
if queues.one? || prefixes.empty?
80+
queues
81+
else
82+
queues = queues.dup
83+
raw_queues.flat_map { |raw_queue| delete_in_order(raw_queue, queues) }.compact
84+
end
85+
end
86+
87+
def delete_in_order(raw_queue, queues)
88+
if exact_name?(raw_queue)
89+
queues.delete(raw_queue)
90+
elsif prefixed_name?(raw_queue)
91+
prefix = raw_queue.tr("*", "")
92+
queues.select { |queue| queue.start_with?(prefix) }.tap do |matches|
93+
queues -= matches
94+
end
95+
end
96+
end
6797
end
6898
end

test/models/solid_queue/ready_execution_test.rb

+72-31
Original file line numberDiff line numberDiff line change
@@ -49,28 +49,51 @@ class SolidQueue::ReadyExecutionTest < ActiveSupport::TestCase
4949
end
5050
end
5151

52-
test "queue order and then priority is respected when using a list of queues" do
52+
test "claim jobs using a wildcard" do
5353
AddToBufferJob.perform_later("hey")
54-
job = SolidQueue::Job.last
55-
assert_equal "background", job.queue_name
5654

57-
assert_claimed_jobs(3) do
58-
SolidQueue::ReadyExecution.claim(%w[ background backend ], 3, 42)
55+
assert_claimed_jobs(6) do
56+
SolidQueue::ReadyExecution.claim("*", SolidQueue::Job.count + 1, 42)
5957
end
58+
end
6059

61-
assert job.reload.claimed?
62-
@jobs.first(2).each do |job|
63-
assert_not job.reload.ready?
64-
assert job.claimed?
60+
test "claim jobs using queue prefixes" do
61+
AddToBufferJob.perform_later("hey")
62+
63+
assert_claimed_jobs(1) do
64+
SolidQueue::ReadyExecution.claim("backgr*", SolidQueue::Job.count + 1, 42)
6565
end
66+
67+
assert @jobs.none?(&:claimed?)
6668
end
6769

68-
test "claim jobs using a wildcard" do
70+
test "claim jobs using a wildcard and having paused queues" do
6971
AddToBufferJob.perform_later("hey")
7072

71-
assert_claimed_jobs(6) do
73+
SolidQueue::Queue.find_by_name("backend").pause
74+
75+
assert_claimed_jobs(1) do
7276
SolidQueue::ReadyExecution.claim("*", SolidQueue::Job.count + 1, 42)
7377
end
78+
79+
@jobs.each(&:reload)
80+
assert @jobs.none?(&:claimed?)
81+
end
82+
83+
test "claim jobs using both exact names and a prefix" do
84+
AddToBufferJob.perform_later("hey")
85+
86+
assert_claimed_jobs(6) do
87+
SolidQueue::ReadyExecution.claim(%w[ backe* background ], SolidQueue::Job.count + 1, 42)
88+
end
89+
end
90+
91+
test "claim jobs for queue without jobs at the moment using prefixes" do
92+
AddToBufferJob.perform_later("hey")
93+
94+
assert_claimed_jobs(0) do
95+
SolidQueue::ReadyExecution.claim(%w[ none* ], SolidQueue::Job.count + 1, 42)
96+
end
7497
end
7598

7699
test "priority order is used when claiming jobs using a wildcard" do
@@ -88,43 +111,61 @@ class SolidQueue::ReadyExecutionTest < ActiveSupport::TestCase
88111
end
89112
end
90113

91-
test "claim jobs using queue prefixes" do
114+
test "queue order and then priority is respected when using a list of queues" do
92115
AddToBufferJob.perform_later("hey")
116+
job = SolidQueue::Job.last
117+
assert_equal "background", job.queue_name
93118

94-
assert_claimed_jobs(1) do
95-
SolidQueue::ReadyExecution.claim("backgr*", SolidQueue::Job.count + 1, 42)
119+
assert_claimed_jobs(3) do
120+
SolidQueue::ReadyExecution.claim(%w[ background backend ], 3, 42)
96121
end
97122

98-
assert @jobs.none?(&:claimed?)
123+
assert job.reload.claimed?
124+
@jobs.first(2).each do |job|
125+
assert_not job.reload.ready?
126+
assert job.claimed?
127+
end
99128
end
100129

101-
test "claim jobs using a wildcard and having paused queues" do
102-
AddToBufferJob.perform_later("hey")
130+
test "queue order is respected when using prefixes" do
131+
%w[ queue_b1 queue_b2 queue_a2 queue_a1 queue_b1 queue_a2 queue_b2 queue_a1 ].each do |queue_name|
132+
AddToBufferJob.set(queue: queue_name).perform_later(1)
133+
end
103134

104-
SolidQueue::Queue.find_by_name("backend").pause
135+
# Claim 8 jobs
136+
claimed_jobs = []
137+
4.times do
138+
assert_claimed_jobs(2) do
139+
SolidQueue::ReadyExecution.claim(%w[ queue_b* queue_a* ], 2, 42)
140+
end
105141

106-
assert_claimed_jobs(1) do
107-
SolidQueue::ReadyExecution.claim("*", SolidQueue::Job.count + 1, 42)
142+
claimed_jobs += SolidQueue::ClaimedExecution.order(:id).last(2).map(&:job)
108143
end
109144

110-
@jobs.each(&:reload)
111-
assert @jobs.none?(&:claimed?)
145+
# Check claim order
146+
assert_equal %w[ queue_b1 queue_b1 queue_b2 queue_b2 queue_a1 queue_a1 queue_a2 queue_a2 ],
147+
claimed_jobs.map(&:queue_name)
112148
end
113149

114-
test "claim jobs using both exact names and a prefixes" do
115-
AddToBufferJob.perform_later("hey")
116150

117-
assert_claimed_jobs(6) do
118-
SolidQueue::ReadyExecution.claim(%w[ backe* background ], SolidQueue::Job.count + 1, 42)
151+
test "queue order is respected when mixing exact names with prefixes" do
152+
%w[ queue_b1 queue_b2 queue_a2 queue_c2 queue_a1 queue_c1 queue_b1 queue_a2 queue_b2 queue_a1 ].each do |queue_name|
153+
AddToBufferJob.set(queue: queue_name).perform_later(1)
119154
end
120-
end
121155

122-
test "claim jobs for queue without jobs at the moment using prefixes" do
123-
AddToBufferJob.perform_later("hey")
156+
# Claim 10 jobs
157+
claimed_jobs = []
158+
5.times do
159+
assert_claimed_jobs(2) do
160+
SolidQueue::ReadyExecution.claim(%w[ queue_a2 queue_c1 queue_b* queue_c2 queue_a* ], 2, 42)
161+
end
124162

125-
assert_claimed_jobs(0) do
126-
SolidQueue::ReadyExecution.claim(%w[ none* ], SolidQueue::Job.count + 1, 42)
163+
claimed_jobs += SolidQueue::ClaimedExecution.order(:id).last(2).map(&:job)
127164
end
165+
166+
# Check claim order
167+
assert_equal %w[ queue_a2 queue_a2 queue_c1 queue_b1 queue_b1 queue_b2 queue_b2 queue_c2 queue_a1 queue_a1 ],
168+
claimed_jobs.map(&:queue_name)
128169
end
129170

130171
test "discard all" do

0 commit comments

Comments
 (0)