Skip to content

Commit b9d66fc

Browse files
committed
Add EDF implementation of weighted round robin.
EDF is strictly better than current random based wrr (balancer/internal/wrr/random.go). It can be used to load balance both between balancer groups (balancer/xds/edsbalancer/balancergroup.go) and within balancer group using weighted round robin balancer implementation (comming in the next pull request for #2827).
1 parent 92635fa commit b9d66fc

File tree

2 files changed

+88
-0
lines changed

2 files changed

+88
-0
lines changed

Diff for: balancer/internal/wrr/edf.go

+84
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/*
2+
*
3+
* Copyright 2019 gRPC authors.
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package wrr
19+
20+
import "container/heap"
21+
22+
// edfWrr is a struct for EDF weighted round robin implementation.
23+
type edfWrr struct {
24+
items edfPriorityQueue
25+
}
26+
27+
// NewEdfWrr creates Earliest Deadline First (EDF)
28+
// (https://en.wikipedia.org/wiki/Earliest_deadline_first_scheduling) implementation for weighted round robin.
29+
// Each pick from the schedule has the earliest deadline entry selected. Entries have deadlines set
30+
// at current time + 1 / weight, providing weighted round robin behavior with O(log n) pick time.
31+
func NewEdfWrr() WRR {
32+
return &edfWrr{}
33+
}
34+
35+
// edfEntry is an internal wrapper for item that also stores weight and relative position in the queue.
36+
type edfEntry struct {
37+
deadline float64
38+
weight int64
39+
item interface{}
40+
}
41+
42+
// edfPriorityQueue is a heap.Interface implementation for edfEntry elements.
43+
type edfPriorityQueue []*edfEntry
44+
45+
func (pq edfPriorityQueue) Len() int { return len(pq) }
46+
func (pq edfPriorityQueue) Less(i, j int) bool { return pq[i].deadline < pq[j].deadline }
47+
func (pq edfPriorityQueue) Swap(i, j int) { pq[i], pq[j] = pq[j], pq[i] }
48+
49+
func (pq *edfPriorityQueue) Push(x interface{}) {
50+
*pq = append(*pq, x.(*edfEntry))
51+
}
52+
53+
func (pq *edfPriorityQueue) Pop() interface{} {
54+
old := *pq
55+
*pq = old[0 : len(old)-1]
56+
return old[len(old)-1]
57+
}
58+
59+
// Current time in EDF scheduler.
60+
func (edf edfWrr) currentTime() float64 {
61+
if len(edf.items) == 0 {
62+
return 0.0
63+
}
64+
return edf.items[0].deadline
65+
}
66+
67+
func (edf *edfWrr) Add(item interface{}, weight int64) {
68+
entry := edfEntry{
69+
deadline: edf.currentTime() + 1.0/float64(weight),
70+
weight: weight,
71+
item: item,
72+
}
73+
heap.Push(&edf.items, &entry)
74+
}
75+
76+
func (edf *edfWrr) Next() interface{} {
77+
if len(edf.items) == 0 {
78+
return nil
79+
}
80+
item := edf.items[0]
81+
item.deadline = edf.currentTime() + 1.0/float64(item.weight)
82+
heap.Fix(&edf.items, 0)
83+
return item.item
84+
}

Diff for: balancer/internal/wrr/wrr_test.go

+4
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,10 @@ func TestRandomWRRNext(t *testing.T) {
9999
testWRRNext(t, NewRandom)
100100
}
101101

102+
func TestEdfWrrNext(t *testing.T) {
103+
testWRRNext(t, NewEdfWrr)
104+
}
105+
102106
func init() {
103107
r := rand.New(rand.NewSource(0))
104108
grpcrandInt63n = r.Int63n

0 commit comments

Comments
 (0)