-
Notifications
You must be signed in to change notification settings - Fork 839
Update Query Frontend to use Round Robin for Choosing the Next Queue #2553
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 19 commits
Commits
Show all changes
20 commits
Select commit
Hold shift + click to select a range
cb13615
Basic queue manager thing
joe-elliott ba38cbb
Finished basic implementation and test
joe-elliott dddee30
Added tests
joe-elliott 3b01702
Created Add queue
joe-elliott 51f3dd1
Added confirmOrder
joe-elliott 8db1da0
Extended order tests
joe-elliott b4043dd
changed to userID to match local code
joe-elliott 06af085
return user id with queue
joe-elliott b280fb7
tests passing
joe-elliott 7d4dffc
Added a test for round robin
joe-elliott bc5ddc1
Removed invalid comment
joe-elliott 07074f2
Moved isConsistent into tests
joe-elliott ca881fb
Improved internal naming
joe-elliott 3b87ab3
Added bench for queueRequest
joe-elliott 7d31ffc
lint + changelog
joe-elliott 9fdacc0
Made TestFrontendQueuesConsistency() actually random
joe-elliott b490f7e
renaming for clarity
joe-elliott 3578ae7
Rename
joe-elliott a01459d
Merge branch 'master' into round-robin
joe-elliott 88c2889
Improved struct name and added comment
joe-elliott File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,86 @@ | ||
| package frontend | ||
|
|
||
| import ( | ||
| "container/list" | ||
| ) | ||
|
|
||
| type queueRecord struct { | ||
| ch chan *request | ||
| userID string | ||
| } | ||
|
|
||
| type queueManager struct { | ||
| l *list.List | ||
| next *list.Element | ||
| userLookup map[string]*list.Element | ||
|
|
||
| maxQueueSize int | ||
| } | ||
|
|
||
| func newQueueManager(maxQueueSize int) *queueManager { | ||
| return &queueManager{ | ||
| l: list.New(), | ||
| next: nil, | ||
| userLookup: make(map[string]*list.Element), | ||
| maxQueueSize: maxQueueSize, | ||
| } | ||
| } | ||
|
|
||
| func (q *queueManager) len() int { | ||
| return len(q.userLookup) | ||
| } | ||
|
|
||
| func (q *queueManager) getNextQueue() (chan *request, string) { | ||
| if q.next == nil { | ||
| q.next = q.l.Front() | ||
| } | ||
|
|
||
| if q.next == nil { | ||
| return nil, "" | ||
| } | ||
|
|
||
| var next *list.Element | ||
| next, q.next = q.next, q.next.Next() | ||
|
|
||
| qr := next.Value.(queueRecord) | ||
|
|
||
| return qr.ch, qr.userID | ||
| } | ||
|
|
||
| func (q *queueManager) deleteQueue(userID string) { | ||
| element := q.userLookup[userID] | ||
|
|
||
| // remove from linked list | ||
| if element != nil { | ||
| if element == q.next { | ||
| q.next = element.Next() // if we're deleting the current item just move to the next one | ||
| } | ||
|
|
||
| q.l.Remove(element) | ||
| } | ||
|
|
||
| // remove from map | ||
| delete(q.userLookup, userID) | ||
| } | ||
|
|
||
| func (q *queueManager) getOrAddQueue(userID string) chan *request { | ||
| element := q.userLookup[userID] | ||
|
|
||
| if element == nil { | ||
| qr := queueRecord{ | ||
| ch: make(chan *request, q.maxQueueSize), | ||
| userID: userID, | ||
| } | ||
|
|
||
| // add the element right before the current linked list item for fifo | ||
| if q.next == nil { | ||
| element = q.l.PushBack(qr) | ||
| } else { | ||
| element = q.l.InsertBefore(qr, q.next) | ||
| } | ||
|
|
||
| q.userLookup[userID] = element | ||
| } | ||
|
|
||
| return element.Value.(queueRecord).ch | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,141 @@ | ||
| package frontend | ||
|
|
||
| import ( | ||
| "fmt" | ||
| "math/rand" | ||
| "testing" | ||
| "time" | ||
|
|
||
| "github.com/stretchr/testify/assert" | ||
| ) | ||
|
|
||
| func TestFrontendQueues(t *testing.T) { | ||
| m := newQueueManager(0) | ||
| assert.NotNil(t, m) | ||
| assert.NoError(t, isConsistent(m)) | ||
| q, _ := m.getNextQueue() | ||
| assert.Nil(t, q) | ||
|
|
||
| // add queues | ||
| qOne := getOrAddQueue(t, m, "one") | ||
|
|
||
| confirmOrder(t, m, qOne, qOne) | ||
|
|
||
| qTwo := getOrAddQueue(t, m, "two") | ||
| assert.NotEqual(t, qOne, qTwo) | ||
|
|
||
| confirmOrder(t, m, qOne, qTwo, qOne) | ||
pstibrany marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| // confirm fifo by adding a third queue and iterating to it | ||
| qThree := getOrAddQueue(t, m, "three") | ||
|
|
||
| confirmOrder(t, m, qTwo, qOne, qThree) | ||
|
|
||
| // remove one and round robin the others | ||
| m.deleteQueue("one") | ||
| assert.NoError(t, isConsistent(m)) | ||
|
|
||
| confirmOrder(t, m, qTwo, qThree, qTwo) | ||
|
|
||
| qFour := getOrAddQueue(t, m, "four") | ||
|
|
||
| confirmOrder(t, m, qThree, qTwo, qFour, qThree) | ||
|
|
||
| // remove current and confirm round robin continues | ||
| m.deleteQueue("two") | ||
| assert.NoError(t, isConsistent(m)) | ||
|
|
||
| confirmOrder(t, m, qFour, qThree, qFour) | ||
|
|
||
| m.deleteQueue("three") | ||
| assert.NoError(t, isConsistent(m)) | ||
|
|
||
| m.deleteQueue("four") | ||
| assert.NoError(t, isConsistent(m)) | ||
|
|
||
| q, _ = m.getNextQueue() | ||
| assert.Nil(t, q) | ||
| } | ||
|
|
||
| func TestFrontendQueuesConsistency(t *testing.T) { | ||
| m := newQueueManager(0) | ||
| assert.NotNil(t, m) | ||
| assert.NoError(t, isConsistent(m)) | ||
| q, _ := m.getNextQueue() | ||
| assert.Nil(t, q) | ||
|
|
||
| r := rand.New(rand.NewSource(time.Now().Unix())) | ||
|
|
||
| for i := 0; i < 1000; i++ { | ||
| switch r.Int() % 3 { | ||
| case 0: | ||
| assert.NotNil(t, m.getOrAddQueue(generateTenant(r))) | ||
| case 1: | ||
| m.getNextQueue() | ||
| case 2: | ||
| m.deleteQueue(generateTenant(r)) | ||
| } | ||
|
|
||
| assert.NoErrorf(t, isConsistent(m), "last action %d", i) | ||
| } | ||
| } | ||
|
|
||
| func generateTenant(r *rand.Rand) string { | ||
| return fmt.Sprint("tenant-", r.Int()%5) | ||
| } | ||
|
|
||
| func getOrAddQueue(t *testing.T, m *queueManager, tenant string) chan *request { | ||
| q := m.getOrAddQueue(tenant) | ||
| assert.NotNil(t, q) | ||
| assert.NoError(t, isConsistent(m)) | ||
| assert.Equal(t, q, m.getOrAddQueue(tenant)) | ||
|
|
||
| return q | ||
| } | ||
|
|
||
| func confirmOrder(t *testing.T, m *queueManager, qs ...chan *request) { | ||
| for _, q := range qs { | ||
| qNext, _ := m.getNextQueue() | ||
| assert.Equal(t, q, qNext) | ||
| assert.NoError(t, isConsistent(m)) | ||
| } | ||
| } | ||
|
|
||
| // isConsistent() returns true if every userID in the map is also in the linked list and vice versa. | ||
| // This is horribly inefficient. Use for testing only. | ||
| func isConsistent(q *queueManager) error { | ||
| // let's confirm that every element in the map is in the list and all values are request queues | ||
| for k, v := range q.userLookup { | ||
| found := false | ||
|
|
||
| for e := q.l.Front(); e != nil; e = e.Next() { | ||
| qr, ok := e.Value.(queueRecord) | ||
| if !ok { | ||
| return fmt.Errorf("element value is not a queueRecord %v", e.Value) | ||
| } | ||
|
|
||
| if e == v { | ||
| if qr.userID != k { | ||
| return fmt.Errorf("mismatched userID between map %s and linked list %s", k, qr.userID) | ||
| } | ||
|
|
||
| found = true | ||
| break | ||
| } | ||
| } | ||
|
|
||
| if !found { | ||
| return fmt.Errorf("userID %s not found in list", k) | ||
| } | ||
| } | ||
|
|
||
| // now check the length to make sure there's not extra list items somehow | ||
| listLen := q.l.Len() | ||
| mapLen := len(q.userLookup) | ||
|
|
||
| if listLen != mapLen { | ||
| return fmt.Errorf("Length mismatch list:%d map:%d", listLen, mapLen) | ||
| } | ||
|
|
||
| return nil | ||
| } | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Short comment about this struct would be nice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a comment and went with
queueIterator. Let me know if you'd like any other changes.Thanks for the review!