diff --git a/CHANGELOG.md b/CHANGELOG.md index f7ab5069c1a..75aa5e8e225 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,7 @@ * [CHANGE] `trace_id` field in log files has been renamed to `traceID`. #2518 * [CHANGE] Slow query log has a different output now. Previously used `url` field has been replaced with `host` and `path`, and query parameters are logged as individual log fields with `qs_` prefix. #2520 * [CHANGE] Experimental WAL: WAL and checkpoint compression is now disabled. #2436 +* [CHANGE] Query Frontend now uses Round Robin to choose a tenant queue to service next. #2553 * [CHANGE] Update in dependency `go-kit/kit` from `v0.9.0` to `v0.10.0`. HTML escaping disabled in JSON Logger. #2535 * [FEATURE] Ruler: The `-ruler.evaluation-delay` flag was added to allow users to configure a default evaluation delay for all rules in cortex. The default value is 0 which is the current behavior. #2423 * [FEATURE] Experimental: Added a new object storage client for OpenStack Swift. #2440 diff --git a/pkg/querier/frontend/frontend.go b/pkg/querier/frontend/frontend.go index 19e323c743b..012d1fdd38f 100644 --- a/pkg/querier/frontend/frontend.go +++ b/pkg/querier/frontend/frontend.go @@ -7,7 +7,6 @@ import ( "fmt" "io" "io/ioutil" - "math/rand" "net/http" "net/url" "path" @@ -64,7 +63,7 @@ type Frontend struct { mtx sync.Mutex cond *sync.Cond - queues map[string]chan *request + queues *queueIterator // Metrics. queueDuration prometheus.Histogram @@ -86,7 +85,7 @@ func New(cfg Config, log log.Logger, registerer prometheus.Registerer) (*Fronten f := &Frontend{ cfg: cfg, log: log, - queues: map[string]chan *request{}, + queues: newQueueIterator(cfg.MaxOutstandingPerTenant), queueDuration: promauto.With(registerer).NewHistogram(prometheus.HistogramOpts{ Namespace: "cortex", Name: "query_frontend_queue_duration_seconds", @@ -146,7 +145,7 @@ func (f RoundTripFunc) RoundTrip(r *http.Request) (*http.Response, error) { func (f *Frontend) Close() { f.mtx.Lock() defer f.mtx.Unlock() - for len(f.queues) > 0 { + for f.queues.len() > 0 { f.cond.Wait() } } @@ -348,11 +347,7 @@ func (f *Frontend) queueRequest(ctx context.Context, req *request) error { f.mtx.Lock() defer f.mtx.Unlock() - queue, ok := f.queues[userID] - if !ok { - queue = make(chan *request, f.cfg.MaxOutstandingPerTenant) - f.queues[userID] = queue - } + queue := f.queues.getOrAddQueue(userID) select { case queue <- req: @@ -371,7 +366,7 @@ func (f *Frontend) getNextRequest(ctx context.Context) (*request, error) { defer f.mtx.Unlock() FindQueue: - for len(f.queues) == 0 && ctx.Err() == nil { + for f.queues.len() == 0 && ctx.Err() == nil { f.cond.Wait() } @@ -379,16 +374,10 @@ FindQueue: return nil, err } - keys := make([]string, 0, len(f.queues)) - for k := range f.queues { - keys = append(keys, k) - } - rand.Shuffle(len(keys), func(i, j int) { keys[i], keys[j] = keys[j], keys[i] }) - - for _, userID := range keys { - queue, ok := f.queues[userID] - if !ok { - continue + for { + queue, userID := f.queues.getNextQueue() + if queue == nil { + break } /* We want to dequeue the next unexpired request from the chosen tenant queue. @@ -407,7 +396,7 @@ FindQueue: lastRequest := false request := <-queue if len(queue) == 0 { - delete(f.queues, userID) + f.queues.deleteQueue(userID) lastRequest = true } diff --git a/pkg/querier/frontend/frontend_queues.go b/pkg/querier/frontend/frontend_queues.go new file mode 100644 index 00000000000..4383ebfa251 --- /dev/null +++ b/pkg/querier/frontend/frontend_queues.go @@ -0,0 +1,90 @@ +package frontend + +import ( + "container/list" +) + +type queueRecord struct { + ch chan *request + userID string +} + +// queueIterator provides round robin access to a collection of chan *request. It is used to +// iterate fairly over the frontend per tenant request queues. It uses a combination of a +// linked list and map to provide O(1) complexity on the getNextQueue(), deleteQueue(), and +// getOrAddQueue() operations. +type queueIterator struct { + l *list.List + next *list.Element + userLookup map[string]*list.Element + + maxQueueSize int +} + +func newQueueIterator(maxQueueSize int) *queueIterator { + return &queueIterator{ + l: list.New(), + next: nil, + userLookup: make(map[string]*list.Element), + maxQueueSize: maxQueueSize, + } +} + +func (q *queueIterator) len() int { + return len(q.userLookup) +} + +func (q *queueIterator) getNextQueue() (chan *request, string) { + if q.next == nil { + q.next = q.l.Front() + } + + if q.next == nil { + return nil, "" + } + + var next *list.Element + next, q.next = q.next, q.next.Next() + + qr := next.Value.(queueRecord) + + return qr.ch, qr.userID +} + +func (q *queueIterator) deleteQueue(userID string) { + element := q.userLookup[userID] + + // remove from linked list + if element != nil { + if element == q.next { + q.next = element.Next() // if we're deleting the current item just move to the next one + } + + q.l.Remove(element) + } + + // remove from map + delete(q.userLookup, userID) +} + +func (q *queueIterator) getOrAddQueue(userID string) chan *request { + element := q.userLookup[userID] + + if element == nil { + qr := queueRecord{ + ch: make(chan *request, q.maxQueueSize), + userID: userID, + } + + // add the element right before the current linked list item for fifo + if q.next == nil { + element = q.l.PushBack(qr) + } else { + element = q.l.InsertBefore(qr, q.next) + } + + q.userLookup[userID] = element + } + + return element.Value.(queueRecord).ch +} diff --git a/pkg/querier/frontend/frontend_queues_test.go b/pkg/querier/frontend/frontend_queues_test.go new file mode 100644 index 00000000000..4a831da384a --- /dev/null +++ b/pkg/querier/frontend/frontend_queues_test.go @@ -0,0 +1,141 @@ +package frontend + +import ( + "fmt" + "math/rand" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestFrontendQueues(t *testing.T) { + m := newQueueIterator(0) + assert.NotNil(t, m) + assert.NoError(t, isConsistent(m)) + q, _ := m.getNextQueue() + assert.Nil(t, q) + + // add queues + qOne := getOrAddQueue(t, m, "one") + + confirmOrder(t, m, qOne, qOne) + + qTwo := getOrAddQueue(t, m, "two") + assert.NotEqual(t, qOne, qTwo) + + confirmOrder(t, m, qOne, qTwo, qOne) + + // confirm fifo by adding a third queue and iterating to it + qThree := getOrAddQueue(t, m, "three") + + confirmOrder(t, m, qTwo, qOne, qThree) + + // remove one and round robin the others + m.deleteQueue("one") + assert.NoError(t, isConsistent(m)) + + confirmOrder(t, m, qTwo, qThree, qTwo) + + qFour := getOrAddQueue(t, m, "four") + + confirmOrder(t, m, qThree, qTwo, qFour, qThree) + + // remove current and confirm round robin continues + m.deleteQueue("two") + assert.NoError(t, isConsistent(m)) + + confirmOrder(t, m, qFour, qThree, qFour) + + m.deleteQueue("three") + assert.NoError(t, isConsistent(m)) + + m.deleteQueue("four") + assert.NoError(t, isConsistent(m)) + + q, _ = m.getNextQueue() + assert.Nil(t, q) +} + +func TestFrontendQueuesConsistency(t *testing.T) { + m := newQueueIterator(0) + assert.NotNil(t, m) + assert.NoError(t, isConsistent(m)) + q, _ := m.getNextQueue() + assert.Nil(t, q) + + r := rand.New(rand.NewSource(time.Now().Unix())) + + for i := 0; i < 1000; i++ { + switch r.Int() % 3 { + case 0: + assert.NotNil(t, m.getOrAddQueue(generateTenant(r))) + case 1: + m.getNextQueue() + case 2: + m.deleteQueue(generateTenant(r)) + } + + assert.NoErrorf(t, isConsistent(m), "last action %d", i) + } +} + +func generateTenant(r *rand.Rand) string { + return fmt.Sprint("tenant-", r.Int()%5) +} + +func getOrAddQueue(t *testing.T, m *queueIterator, tenant string) chan *request { + q := m.getOrAddQueue(tenant) + assert.NotNil(t, q) + assert.NoError(t, isConsistent(m)) + assert.Equal(t, q, m.getOrAddQueue(tenant)) + + return q +} + +func confirmOrder(t *testing.T, m *queueIterator, qs ...chan *request) { + for _, q := range qs { + qNext, _ := m.getNextQueue() + assert.Equal(t, q, qNext) + assert.NoError(t, isConsistent(m)) + } +} + +// isConsistent() returns true if every userID in the map is also in the linked list and vice versa. +// This is horribly inefficient. Use for testing only. +func isConsistent(q *queueIterator) error { + // let's confirm that every element in the map is in the list and all values are request queues + for k, v := range q.userLookup { + found := false + + for e := q.l.Front(); e != nil; e = e.Next() { + qr, ok := e.Value.(queueRecord) + if !ok { + return fmt.Errorf("element value is not a queueRecord %v", e.Value) + } + + if e == v { + if qr.userID != k { + return fmt.Errorf("mismatched userID between map %s and linked list %s", k, qr.userID) + } + + found = true + break + } + } + + if !found { + return fmt.Errorf("userID %s not found in list", k) + } + } + + // now check the length to make sure there's not extra list items somehow + listLen := q.l.Len() + mapLen := len(q.userLookup) + + if listLen != mapLen { + return fmt.Errorf("Length mismatch list:%d map:%d", listLen, mapLen) + } + + return nil +} diff --git a/pkg/querier/frontend/queue_test.go b/pkg/querier/frontend/queue_test.go index abf9af1cce5..b85a101ef39 100644 --- a/pkg/querier/frontend/queue_test.go +++ b/pkg/querier/frontend/queue_test.go @@ -2,6 +2,7 @@ package frontend import ( "context" + "fmt" "strconv" "testing" @@ -61,13 +62,13 @@ func TestDequeuesExpiredRequests(t *testing.T) { req, err := f.getNextRequest(ctx) require.Nil(t, err) require.NotNil(t, req) - require.Equal(t, 9, len(f.queues[userID])) + require.Equal(t, 9, len(f.queues.getOrAddQueue(userID))) // the next unexpired request should be the 5th index req, err = f.getNextRequest(ctx) require.Nil(t, err) require.NotNil(t, req) - require.Equal(t, 4, len(f.queues[userID])) + require.Equal(t, 4, len(f.queues.getOrAddQueue(userID))) // add one request to a second tenant queue ctx2 := user.InjectOrgID(context.Background(), userID2) @@ -80,16 +81,47 @@ func TestDequeuesExpiredRequests(t *testing.T) { require.NotNil(t, req) // ensure either one or two queues are fully drained, depending on which was requested first - _, ok := f.queues[userID] + _, ok := f.queues.userLookup[userID] if ok { // if the second user's queue was chosen for the last request, // the first queue should still contain 4 (expired) requests. - require.Equal(t, 4, len(f.queues[userID])) + require.Equal(t, 4, len(f.queues.getOrAddQueue(userID))) } - _, ok = f.queues[userID2] + _, ok = f.queues.userLookup[userID2] require.Equal(t, false, ok) } +func TestRoundRobinQueues(t *testing.T) { + var config Config + flagext.DefaultValues(&config) + config.MaxOutstandingPerTenant = 100 + + f, err := setupFrontend(config) + require.NoError(t, err) + + for i := 0; i < 100; i++ { + userID := fmt.Sprint(i / 10) + ctx := user.InjectOrgID(context.Background(), userID) + + err = f.queueRequest(ctx, testReq(ctx)) + require.NoError(t, err) + } + + ctx := context.Background() + for i := 0; i < 100; i++ { + req, err := f.getNextRequest(ctx) + require.NoError(t, err) + require.NotNil(t, req) + + userID, err := user.ExtractOrgID(req.originalCtx) + require.NoError(t, err) + intUserID, err := strconv.Atoi(userID) + require.NoError(t, err) + + require.Equal(t, i%10, intUserID) + } +} + func BenchmarkGetNextRequest(b *testing.B) { var config Config flagext.DefaultValues(&config) @@ -131,3 +163,44 @@ func BenchmarkGetNextRequest(b *testing.B) { } } } + +func BenchmarkQueueRequest(b *testing.B) { + var config Config + flagext.DefaultValues(&config) + config.MaxOutstandingPerTenant = 2 + + const numTenants = 50 + + frontends := make([]*Frontend, 0, b.N) + contexts := make([]context.Context, 0, numTenants) + requests := make([]*request, 0, numTenants) + + for n := 0; n < b.N; n++ { + f, err := setupFrontend(config) + if err != nil { + b.Fatal(err) + } + frontends = append(frontends, f) + + for j := 0; j < numTenants; j++ { + userID := strconv.Itoa(j) + ctx := user.InjectOrgID(context.Background(), userID) + r := testReq(ctx) + + requests = append(requests, r) + contexts = append(contexts, ctx) + } + } + + b.ResetTimer() + for n := 0; n < b.N; n++ { + for i := 0; i < config.MaxOutstandingPerTenant; i++ { + for j := 0; j < numTenants; j++ { + err := frontends[n].queueRequest(contexts[j], requests[j]) + if err != nil { + b.Fatal(err) + } + } + } + } +}