Skip to content

Commit 9970bce

Browse files
committed
Create user request queue test
Signed-off-by: Justin Jung <[email protected]>
1 parent 2b5273a commit 9970bce

File tree

7 files changed

+97
-62
lines changed

7 files changed

+97
-62
lines changed

pkg/scheduler/queue/queue.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,12 @@ FindQueue:
156156

157157
// Pick next request from the queue.
158158
for {
159-
request := queue.dequeueRequest()
159+
request := queue.dequeueRequest(q.queues.getMinPriority(userID, querierID))
160+
if request == nil {
161+
// the queue does not contain request with the min priority, break to wait for more requests
162+
break
163+
}
164+
160165
if queue.length() == 0 {
161166
q.queues.deleteQueue(userID)
162167
}

pkg/scheduler/queue/user_queues.go

Lines changed: 14 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -59,13 +59,12 @@ type queues struct {
5959
}
6060

6161
type userQueue struct {
62-
queue requestQueue
62+
queue userRequestQueue
6363

6464
// If not nil, only these queriers can handle user requests. If nil, all queriers can.
6565
// We set this to nil if number of available queriers <= maxQueriers.
66-
queriers map[string]struct{}
67-
reservedQueriers map[string]int64
68-
maxQueriers int
66+
queriers map[string]struct{}
67+
maxQueriers int
6968

7069
// Seed for shuffle sharding of queriers. This seed is based on userID only and is therefore consistent
7170
// between different frontends.
@@ -111,7 +110,7 @@ func (q *queues) deleteQueue(userID string) {
111110
// MaxQueriers is used to compute which queriers should handle requests for this user.
112111
// If maxQueriers is <= 0, all queriers can handle this user's requests.
113112
// If maxQueriers has changed since the last call, queriers for this are recomputed.
114-
func (q *queues) getOrAddQueue(userID string, maxQueriers int) requestQueue {
113+
func (q *queues) getOrAddQueue(userID string, maxQueriers int) userRequestQueue {
115114
// Empty user is not allowed, as that would break our users list ("" is used for free spot).
116115
if userID == "" {
117116
return nil
@@ -139,7 +138,7 @@ func (q *queues) getOrAddQueue(userID string, maxQueriers int) requestQueue {
139138
if queryPriority.Enabled {
140139
uq.queue = NewPriorityRequestQueue(util.NewPriorityQueue(nil))
141140
} else {
142-
uq.queue = NewFIFOQueue(make(chan Request, queueSize))
141+
uq.queue = NewFIFORequestQueue(make(chan Request, queueSize))
143142
}
144143

145144
q.userQueues[userID] = uq
@@ -163,7 +162,6 @@ func (q *queues) getOrAddQueue(userID string, maxQueriers int) requestQueue {
163162
if uq.maxQueriers != maxQueriers {
164163
uq.maxQueriers = maxQueriers
165164
uq.queriers = shuffleQueriersForUser(uq.seed, maxQueriers, q.sortedQueriers, nil)
166-
uq.reservedQueriers = getReservedQueriers(uq.queriers, q.limits.QueryPriority(userID).Priorities)
167165
}
168166

169167
return uq.queue
@@ -172,7 +170,7 @@ func (q *queues) getOrAddQueue(userID string, maxQueriers int) requestQueue {
172170
// Finds next queue for the querier. To support fair scheduling between users, client is expected
173171
// to pass last user index returned by this function as argument. Is there was no previous
174172
// last user index, use -1.
175-
func (q *queues) getNextQueueForQuerier(lastUserIndex int, querierID string) (requestQueue, string, int) {
173+
func (q *queues) getNextQueueForQuerier(lastUserIndex int, querierID string) (userRequestQueue, string, int) {
176174
uid := lastUserIndex
177175

178176
for iters := 0; iters < len(q.users); iters++ {
@@ -198,7 +196,7 @@ func (q *queues) getNextQueueForQuerier(lastUserIndex int, querierID string) (re
198196
}
199197
}
200198

201-
//TODO: justinjung04, reserved queriers
199+
// TODO: justinjung04, reserved queriers
202200
//if priority, isReserved := uq.reservedQueriers[querierID]; isReserved {
203201
// return uq.queues[priority], u, uid
204202
//}
@@ -208,6 +206,13 @@ func (q *queues) getNextQueueForQuerier(lastUserIndex int, querierID string) (re
208206
return nil, "", uid
209207
}
210208

209+
func (q *queues) getMinPriority(userID string, querierID string) int64 {
210+
// TODO: justinjung04 reserved querier
211+
// check list of queriers and QueryPriority config
212+
// from QueryPriority config, establish map of
213+
return 0
214+
}
215+
211216
func (q *queues) addQuerierConnection(querierID string) {
212217
info := q.queriers[querierID]
213218
if info != nil {
@@ -320,16 +325,6 @@ func (q *queues) recomputeUserQueriers() {
320325
// In that case *all* queriers should be used.
321326
// Scratchpad is used for shuffling, to avoid new allocations. If nil, new slice is allocated.
322327
func shuffleQueriersForUser(userSeed int64, queriersToSelect int, allSortedQueriers []string, scratchpad []string) map[string]struct{} {
323-
//numOfReservedQueriers := getNumOfReservedQueriers(queriersToSelect, len(allSortedQueriers), numOfReservedQueriersFloat)
324-
//reservedQueriers := make(map[string]struct{}, numOfReservedQueriers)
325-
326-
//if queriersToSelect == 0 || len(allSortedQueriers) <= queriersToSelect {
327-
// for i := 0; i < numOfReservedQueriers; i++ {
328-
// reservedQueriers[allSortedQueriers[i]] = struct{}{}
329-
// }
330-
// return nil, reservedQueriers
331-
//}
332-
333328
queriers := make(map[string]struct{}, queriersToSelect)
334329
rnd := rand.New(rand.NewSource(userSeed))
335330

@@ -340,41 +335,13 @@ func shuffleQueriersForUser(userSeed int64, queriersToSelect int, allSortedQueri
340335
for i := 0; i < queriersToSelect; i++ {
341336
r := rnd.Intn(last + 1)
342337
queriers[scratchpad[r]] = struct{}{}
343-
//if i < numOfReservedQueriers {
344-
// reservedQueriers[scratchpad[r]] = struct{}{}
345-
//}
346-
// move selected item to the end, it won't be selected anymore.
347338
scratchpad[r], scratchpad[last] = scratchpad[last], scratchpad[r]
348339
last--
349340
}
350341

351342
return queriers
352343
}
353344

354-
func getReservedQueriers(queriers map[string]struct{}, priorities []validation.PriorityDef) map[string]int64 {
355-
// TODO: justinjung04
356-
return map[string]int64{}
357-
}
358-
359-
// TODO: justinjung04
360-
//func getNumOfReservedQueriers(queriersToSelect int, totalNumOfQueriers int, reservedQueriers float64) int {
361-
// numOfReservedQueriers := int(reservedQueriers)
362-
//
363-
// if reservedQueriers < 1 && reservedQueriers > 0 {
364-
// if queriersToSelect == 0 || queriersToSelect > totalNumOfQueriers {
365-
// queriersToSelect = totalNumOfQueriers
366-
// }
367-
//
368-
// numOfReservedQueriers = int(math.Ceil(float64(queriersToSelect) * reservedQueriers))
369-
// }
370-
//
371-
// if numOfReservedQueriers > totalNumOfQueriers {
372-
// return totalNumOfQueriers
373-
// }
374-
//
375-
// return numOfReservedQueriers
376-
//}
377-
378345
// MockLimits implements the Limits interface. Used in tests only.
379346
type MockLimits struct {
380347
MaxOutstanding int

pkg/scheduler/queue/user_queues_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -360,16 +360,16 @@ func generateQuerier(r *rand.Rand) string {
360360
return fmt.Sprint("querier-", r.Int()%5)
361361
}
362362

363-
func getOrAdd(t *testing.T, uq *queues, tenant string, maxQueriers int) requestQueue {
363+
func getOrAdd(t *testing.T, uq *queues, tenant string, maxQueriers int) userRequestQueue {
364364
q := uq.getOrAddQueue(tenant, maxQueriers)
365365
assert.NotNil(t, q)
366366
assert.NoError(t, isConsistent(uq))
367367
assert.Equal(t, q, uq.getOrAddQueue(tenant, maxQueriers))
368368
return q
369369
}
370370

371-
func confirmOrderForQuerier(t *testing.T, uq *queues, querier string, lastUserIndex int, qs ...requestQueue) int {
372-
var n requestQueue
371+
func confirmOrderForQuerier(t *testing.T, uq *queues, querier string, lastUserIndex int, qs ...userRequestQueue) int {
372+
var n userRequestQueue
373373
for _, q := range qs {
374374
n, _, lastUserIndex = uq.getNextQueueForQuerier(lastUserIndex, querier)
375375
assert.Equal(t, q, n)

pkg/scheduler/queue/request_queue.go renamed to pkg/scheduler/queue/user_request_queue.go

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@ package queue
22

33
import "github.com/cortexproject/cortex/pkg/util"
44

5-
type requestQueue interface {
5+
type userRequestQueue interface {
66
enqueueRequest(Request)
7-
dequeueRequest() Request
7+
dequeueRequest(minPriority int64) Request
88
length() int
99
closeQueue()
1010
}
@@ -13,15 +13,15 @@ type FIFORequestQueue struct {
1313
queue chan Request
1414
}
1515

16-
func NewFIFOQueue(queue chan Request) *FIFORequestQueue {
16+
func NewFIFORequestQueue(queue chan Request) *FIFORequestQueue {
1717
return &FIFORequestQueue{queue: queue}
1818
}
1919

2020
func (f *FIFORequestQueue) enqueueRequest(r Request) {
2121
f.queue <- r
2222
}
2323

24-
func (f *FIFORequestQueue) dequeueRequest() Request {
24+
func (f *FIFORequestQueue) dequeueRequest(_ int64) Request {
2525
return <-f.queue
2626
}
2727

@@ -45,7 +45,10 @@ func (f *PriorityRequestQueue) enqueueRequest(r Request) {
4545
f.queue.Enqueue(r)
4646
}
4747

48-
func (f *PriorityRequestQueue) dequeueRequest() Request {
48+
func (f *PriorityRequestQueue) dequeueRequest(minPriority int64) Request {
49+
if f.queue.Peek().Priority() < minPriority {
50+
return nil
51+
}
4952
return f.queue.Dequeue()
5053
}
5154

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package queue
2+
3+
import (
4+
"github.com/cortexproject/cortex/pkg/util"
5+
"github.com/stretchr/testify/assert"
6+
"testing"
7+
)
8+
9+
func TestFIFORequestQueue(t *testing.T) {
10+
queue := NewFIFORequestQueue(make(chan Request, 2))
11+
request1 := MockRequest{
12+
id: "request 1",
13+
priority: 1,
14+
}
15+
request2 := MockRequest{
16+
id: "request 2",
17+
priority: 2,
18+
}
19+
20+
queue.enqueueRequest(request1)
21+
queue.enqueueRequest(request2)
22+
assert.Equal(t, 2, queue.length())
23+
assert.Equal(t, request1, queue.dequeueRequest(0))
24+
assert.Equal(t, 1, queue.length())
25+
assert.Equal(t, request2, queue.dequeueRequest(0))
26+
assert.Equal(t, 0, queue.length())
27+
queue.closeQueue()
28+
assert.Panics(t, func() { queue.enqueueRequest(request1) })
29+
}
30+
31+
func TestPriorityRequestQueue(t *testing.T) {
32+
queue := NewPriorityRequestQueue(util.NewPriorityQueue(nil))
33+
request1 := MockRequest{
34+
id: "request 1",
35+
priority: 1,
36+
}
37+
request2 := MockRequest{
38+
id: "request 2",
39+
priority: 2,
40+
}
41+
42+
queue.enqueueRequest(request1)
43+
queue.enqueueRequest(request2)
44+
assert.Equal(t, 2, queue.length())
45+
assert.Equal(t, request2, queue.dequeueRequest(0))
46+
assert.Equal(t, 1, queue.length())
47+
assert.Equal(t, request1, queue.dequeueRequest(0))
48+
assert.Equal(t, 0, queue.length())
49+
50+
queue.enqueueRequest(request1)
51+
queue.enqueueRequest(request2)
52+
assert.Equal(t, 2, queue.length())
53+
assert.Equal(t, request2, queue.dequeueRequest(2))
54+
55+
queue.closeQueue()
56+
assert.Panics(t, func() { queue.enqueueRequest(request1) })
57+
}

pkg/util/priority_queue.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ func (pq *PriorityQueue) Enqueue(op PriorityOp) {
9393
}
9494
}
9595

96-
// Dequeue will return the op with the highest priority; block if queue is
96+
// Dequeue will remove and return the op with the highest priority; block if queue is
9797
// empty; returns nil if queue is closed.
9898
func (pq *PriorityQueue) Dequeue() PriorityOp {
9999
pq.lock.Lock()
@@ -114,3 +114,9 @@ func (pq *PriorityQueue) Dequeue() PriorityOp {
114114
}
115115
return op
116116
}
117+
118+
// Peek will return the op with the highest priority without removing it from the queue
119+
func (pq *PriorityQueue) Peek() PriorityOp {
120+
op := pq.queue[0]
121+
return op
122+
}

pkg/util/priority_queue_test.go

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package util
22

33
import (
44
"runtime"
5-
"strconv"
65
"testing"
76
"time"
87

@@ -15,10 +14,6 @@ func (i simpleItem) Priority() int64 {
1514
return int64(i)
1615
}
1716

18-
func (i simpleItem) Key() string {
19-
return strconv.FormatInt(int64(i), 10)
20-
}
21-
2217
func TestPriorityQueueBasic(t *testing.T) {
2318
queue := NewPriorityQueue(nil)
2419
assert.Equal(t, 0, queue.Length(), "Expected length = 0")
@@ -39,6 +34,7 @@ func TestPriorityQueuePriorities(t *testing.T) {
3934
queue.Enqueue(simpleItem(1))
4035
queue.Enqueue(simpleItem(2))
4136

37+
assert.Equal(t, simpleItem(2), queue.Peek().(simpleItem), "Expected to peek simpleItem(2)")
4238
assert.Equal(t, simpleItem(2), queue.Dequeue().(simpleItem), "Expected to dequeue simpleItem(2)")
4339
assert.Equal(t, simpleItem(1), queue.Dequeue().(simpleItem), "Expected to dequeue simpleItem(1)")
4440

@@ -51,6 +47,7 @@ func TestPriorityQueuePriorities2(t *testing.T) {
5147
queue.Enqueue(simpleItem(2))
5248
queue.Enqueue(simpleItem(1))
5349

50+
assert.Equal(t, simpleItem(2), queue.Peek().(simpleItem), "Expected to peek simpleItem(2)")
5451
assert.Equal(t, simpleItem(2), queue.Dequeue().(simpleItem), "Expected to dequeue simpleItem(2)")
5552
assert.Equal(t, simpleItem(1), queue.Dequeue().(simpleItem), "Expected to dequeue simpleItem(1)")
5653

0 commit comments

Comments
 (0)