Skip to content

Commit b8d2675

Browse files
alazarevmenghanl
authored andcommitted
wrr: add EDF implementation of weighted round robin. (#2957)
1 parent a074ab2 commit b8d2675

File tree

2 files changed

+96
-0
lines changed

2 files changed

+96
-0
lines changed

Diff for: internal/wrr/edf.go

+92
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
/*
2+
*
3+
* Copyright 2019 gRPC authors.
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package wrr
19+
20+
import (
21+
"container/heap"
22+
"sync"
23+
)
24+
25+
// edfWrr is a struct for EDF weighted round robin implementation.
26+
type edfWrr struct {
27+
lock sync.Mutex
28+
items edfPriorityQueue
29+
}
30+
31+
// NewEDF creates Earliest Deadline First (EDF)
32+
// (https://en.wikipedia.org/wiki/Earliest_deadline_first_scheduling) implementation for weighted round robin.
33+
// Each pick from the schedule has the earliest deadline entry selected. Entries have deadlines set
34+
// at current time + 1 / weight, providing weighted round robin behavior with O(log n) pick time.
35+
func NewEDF() WRR {
36+
return &edfWrr{}
37+
}
38+
39+
// edfEntry is an internal wrapper for item that also stores weight and relative position in the queue.
40+
type edfEntry struct {
41+
deadline float64
42+
weight int64
43+
item interface{}
44+
}
45+
46+
// edfPriorityQueue is a heap.Interface implementation for edfEntry elements.
47+
type edfPriorityQueue []*edfEntry
48+
49+
func (pq edfPriorityQueue) Len() int { return len(pq) }
50+
func (pq edfPriorityQueue) Less(i, j int) bool { return pq[i].deadline < pq[j].deadline }
51+
func (pq edfPriorityQueue) Swap(i, j int) { pq[i], pq[j] = pq[j], pq[i] }
52+
53+
func (pq *edfPriorityQueue) Push(x interface{}) {
54+
*pq = append(*pq, x.(*edfEntry))
55+
}
56+
57+
func (pq *edfPriorityQueue) Pop() interface{} {
58+
old := *pq
59+
*pq = old[0 : len(old)-1]
60+
return old[len(old)-1]
61+
}
62+
63+
// Current time in EDF scheduler.
64+
func (edf edfWrr) currentTime() float64 {
65+
if len(edf.items) == 0 {
66+
return 0.0
67+
}
68+
return edf.items[0].deadline
69+
}
70+
71+
func (edf *edfWrr) Add(item interface{}, weight int64) {
72+
edf.lock.Lock()
73+
defer edf.lock.Unlock()
74+
entry := edfEntry{
75+
deadline: edf.currentTime() + 1.0/float64(weight),
76+
weight: weight,
77+
item: item,
78+
}
79+
heap.Push(&edf.items, &entry)
80+
}
81+
82+
func (edf *edfWrr) Next() interface{} {
83+
edf.lock.Lock()
84+
defer edf.lock.Unlock()
85+
if len(edf.items) == 0 {
86+
return nil
87+
}
88+
item := edf.items[0]
89+
item.deadline = edf.currentTime() + 1.0/float64(item.weight)
90+
heap.Fix(&edf.items, 0)
91+
return item.item
92+
}

Diff for: internal/wrr/wrr_test.go

+4
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,10 @@ func TestRandomWRRNext(t *testing.T) {
9999
testWRRNext(t, NewRandom)
100100
}
101101

102+
func TestEdfWrrNext(t *testing.T) {
103+
testWRRNext(t, NewEDF)
104+
}
105+
102106
func init() {
103107
r := rand.New(rand.NewSource(0))
104108
grpcrandInt63n = r.Int63n

0 commit comments

Comments
 (0)