This repository was archived by the owner on Aug 23, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 107
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
simplify TimeLimiter, fixing its tests
* previous implementation was a bit hastily implemented and much more complicated than needed: trying to support concurrency but not doing it well, while we don't actually need any of it. * unit testing was very flakey, in particular in circleCI there's lots of timing inaccuracy leading to broken tests * it was inaccurate, for shortlived operations it would be negligible but for longer operations it is not. While our use case operations should be short, it is nice to be able to fix this while we're at it. So now we simplify the design a lot, basically reduce it to a non-concurrent accounting structure, removing all channels, goroutines and context, making the accounting more correct and the testing more robust.
- Loading branch information
Showing
3 changed files
with
103 additions
and
126 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,94 +1,101 @@ | ||
package memory | ||
|
||
import ( | ||
"context" | ||
"sync" | ||
"fmt" | ||
"time" | ||
|
||
"github.com/davecgh/go-spew/spew" | ||
) | ||
|
||
// TimeLimiter limits the rate of a set of operations. | ||
// It does this by slowing down further operations as soon | ||
// TimeLimiter limits the rate of a set of serial operations. | ||
// It does this by tracking how much time has been spent (updated via Add()), | ||
// and comparing this to the window size and the limit, slowing down further operations as soon | ||
// as one Add() is called informing it the per-window allowed budget has been exceeded. | ||
// Limitations: | ||
// * concurrently running operations can all exceed the budget, | ||
// so it works best for serial operations. | ||
// * for serial operations, the last operation is allowed to exceed the budget | ||
// * when an operation takes very long (e.g. 10 seconds, with a 100ms limit per second), it | ||
// is counted as exceeding the 100ms budget, but no other provisions are being made. | ||
// * the last operation is allowed to exceed the budget (but the next call will be delayed to compensate) | ||
// * concurrency is not supported | ||
// | ||
// Thus, TimeLimiter is designed for, and works best with, serially running operations, | ||
// each of which takes a fraction of the limit. | ||
// For correctness, you should always follow up an Add() with a Wait() | ||
type TimeLimiter struct { | ||
sync.Mutex | ||
ctx context.Context | ||
since time.Time | ||
next time.Time | ||
timeSpent time.Duration | ||
window time.Duration | ||
limit time.Duration | ||
addCh chan time.Duration | ||
queryCh chan chan struct{} | ||
} | ||
|
||
// NewTimeLimiter creates a new TimeLimiter. A background goroutine will run until the | ||
// provided context is done. When the amount of time spent on task (the time is determined | ||
// by calls to "Add()") every "window" duration is more then "limit", then calls to | ||
// Wait() will block until the start if the next window period. | ||
func NewTimeLimiter(ctx context.Context, window, limit time.Duration) *TimeLimiter { | ||
l := &TimeLimiter{ | ||
ctx: ctx, | ||
window: window, | ||
limit: limit, | ||
addCh: make(chan time.Duration), | ||
queryCh: make(chan chan struct{}), | ||
} | ||
go l.run() | ||
return l | ||
} | ||
|
||
func (l *TimeLimiter) run() { | ||
ticker := time.NewTicker(l.window) | ||
done := l.ctx.Done() | ||
var blockedQueries []chan struct{} | ||
for { | ||
select { | ||
case <-done: | ||
//context done. shutting down | ||
for _, ch := range blockedQueries { | ||
close(ch) | ||
} | ||
return | ||
case <-ticker.C: | ||
l.timeSpent = 0 | ||
for _, ch := range blockedQueries { | ||
close(ch) | ||
} | ||
blockedQueries = blockedQueries[:0] | ||
case d := <-l.addCh: | ||
l.timeSpent += d | ||
case respCh := <-l.queryCh: | ||
if l.timeSpent < l.limit { | ||
close(respCh) | ||
} else { | ||
// rate limit exceeded. On the next tick respCh will be closed | ||
// notifying the caller that they can continue. | ||
blockedQueries = append(blockedQueries, respCh) | ||
} | ||
} | ||
// NewTimeLimiter creates a new TimeLimiter. | ||
func NewTimeLimiter(window, limit time.Duration, now time.Time) *TimeLimiter { | ||
l := TimeLimiter{ | ||
since: now, | ||
next: now.Add(window), | ||
window: window, | ||
limit: limit, | ||
} | ||
spew.Dump(l) | ||
return &l | ||
} | ||
|
||
// Add increments the "time spent" counter by "d" | ||
func (l *TimeLimiter) Add(d time.Duration) { | ||
l.addCh <- d | ||
l.add(time.Now(), d) | ||
} | ||
|
||
// add increments the "time spent" counter by "d" at a given time | ||
func (l *TimeLimiter) add(now time.Time, d time.Duration) { | ||
if now.After(l.next) { | ||
l.timeSpent = d | ||
l.since = now.Add(-d) | ||
l.next = l.since.Add(l.window) | ||
fmt.Println("added and updated") | ||
spew.Dump(l) | ||
return | ||
} | ||
l.timeSpent += d | ||
} | ||
|
||
// Wait returns when we are not rate limited, which may be | ||
// anywhere between immediately or after the window. | ||
// Wait returns when we are not rate limited | ||
// * if we passed the window, we reset everything (this is only safe for callers | ||
// that behave correctly, i.e. that wait the instructed time after each add) | ||
// * if limit is not reached, no sleep is needed | ||
// * if limit has been exceeded, sleep until next period + extra multiple to compensate | ||
// this is perhaps best explained with an example: | ||
// if window is 1s and limit 100ms, but we spent 250ms, then we spent effectively 2.5 seconds worth of work. | ||
// let's say we are 800ms into the 1s window, that means we should sleep 2500-800 = 1.7s | ||
// in order to maximize work while honoring the imposed limit. | ||
// * if limit has been met exactly, sleep until next period (this is a special case of the above) | ||
func (l *TimeLimiter) Wait() { | ||
respCh := make(chan struct{}) | ||
l.queryCh <- respCh | ||
time.Sleep(l.wait(time.Now())) | ||
} | ||
|
||
// wait returns how long should be slept at a given time. See Wait() for more info | ||
func (l *TimeLimiter) wait(now time.Time) time.Duration { | ||
|
||
// if we passed the window, reset and start over | ||
// if clock is adjusted backwards, best we can do is also just reset and start over | ||
if now.After(l.next) || now.Before(l.since) { | ||
l.timeSpent = 0 | ||
l.since = now | ||
l.next = now.Add(l.window) | ||
fmt.Println("wait and update") | ||
spew.Dump(l) | ||
return 0 | ||
} | ||
if l.timeSpent < l.limit { | ||
return 0 | ||
} | ||
|
||
// if we have not exceeded our locking quota then respCh will be | ||
// immediately closed. Otherwise it wont be closed until the next tick (duration of "l.window") | ||
// and we will block until then. | ||
<-respCh | ||
// now <= next | ||
// now >= since | ||
// timespent >= limit | ||
excess := l.timeSpent - l.limit | ||
multiplier := l.window / l.limit | ||
timeToPass := excess * multiplier | ||
timePassed := now.Sub(l.since) | ||
// not sure if this should happen, but let's be safe anyway | ||
if timePassed >= timeToPass { | ||
return 0 | ||
} | ||
fmt.Println("wait and now is", now) | ||
return timeToPass - timePassed | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,71 +1,45 @@ | ||
package memory | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"testing" | ||
"time" | ||
) | ||
|
||
func shouldTakeAbout(t *testing.T, fn func(), expDur time.Duration, sloppynessFactor int, info string) { | ||
// on Dieter's laptop: | ||
// takes about <=15 micros for add/wait sequences | ||
// takes about 150micros for a add + blocking wait | ||
// on circleCI, takes 75micros for add/wait sequence | ||
slop := time.Duration(sloppynessFactor) * time.Microsecond | ||
pre := time.Now() | ||
fn() | ||
dur := time.Since(pre) | ||
if dur > expDur+slop || dur < expDur-slop { | ||
t.Fatalf("scenario %s. was supposed to take %s, but took %s", info, expDur, dur) | ||
var now = time.Unix(10, 0) | ||
|
||
func shouldTake(t *testing.T, tl *TimeLimiter, workDone, expDur time.Duration, info string) { | ||
|
||
// account for work done, as well as moving our clock forward by the same amount | ||
now.Add(workDone) | ||
tl.add(now, workDone) | ||
|
||
fmt.Println("now is now", now) | ||
dur := tl.wait(now) | ||
if dur != expDur { | ||
t.Fatalf("scenario %s. expected wait %s, got wait %s", info, expDur, dur) | ||
} | ||
|
||
// fake the "sleep" so we're a properly behaving caller | ||
now.Add(dur) | ||
} | ||
|
||
func TestTimeLimiter(t *testing.T) { | ||
window := time.Second | ||
limit := 100 * time.Millisecond | ||
|
||
ctx, cancel := context.WithCancel(context.Background()) | ||
tl := NewTimeLimiter(ctx, window, limit) | ||
tl := NewTimeLimiter(window, limit, now) | ||
|
||
// TEST 1 : Start first window by doing work and seeing when it starts blocking | ||
shouldTakeAbout(t, tl.Wait, 0, 100, "window 1: work done: 0 - wait should be 0") | ||
|
||
tl.Add(5 * time.Millisecond) | ||
shouldTakeAbout(t, tl.Wait, 0, 100, "window 1: work done: 5ms - wait should be 0") | ||
|
||
tl.Add(10 * time.Millisecond) | ||
shouldTakeAbout(t, tl.Wait, 0, 100, "window 1: work done: 15ms - wait should be 0") | ||
|
||
tl.Add(80 * time.Millisecond) | ||
shouldTakeAbout(t, tl.Wait, 0, 100, "window 1: work done: 95ms - wait should be 0") | ||
|
||
tl.Add(4 * time.Millisecond) | ||
shouldTakeAbout(t, tl.Wait, 0, 100, "window 1: work done: 99ms - wait should be 0") | ||
|
||
tl.Add(3 * time.Millisecond) | ||
|
||
shouldTakeAbout(t, tl.Wait, time.Second, 500, "window 1: work done: 102ms - almost no time has passed, so wait should be full window") | ||
shouldTake(t, tl, 0, 0, "window 1: work done: 0") | ||
shouldTake(t, tl, 5*time.Millisecond, 0, "window 1: work done: 5ms") | ||
shouldTake(t, tl, 10*time.Millisecond, 0, "window 1: work done: 15ms") | ||
shouldTake(t, tl, 80*time.Millisecond, 0, "window 1: work done: 95ms") | ||
shouldTake(t, tl, 4*time.Millisecond, 0, "window 1: work done: 99ms") | ||
shouldTake(t, tl, 3*time.Millisecond, time.Second-102*time.Millisecond, "window 1: work done: 102ms") | ||
|
||
// TEST 2 : Now that we waited until a full window, should be able to up to limit work again | ||
tl.Add(50 * time.Millisecond) | ||
shouldTakeAbout(t, tl.Wait, 0, 100, "window 2: work done: 50ms - wait should be 0") | ||
|
||
tl.Add(40 * time.Millisecond) | ||
shouldTakeAbout(t, tl.Wait, 0, 100, "window 2: work done: 90ms - wait should be 0") | ||
|
||
tl.Add(40 * time.Millisecond) | ||
shouldTakeAbout(t, tl.Wait, time.Second, 500, "window 2: work done: 130ms - wait should be 1s") | ||
|
||
// TEST 3 : Now that we waited until a full window, should be able to up to limit work again | ||
// but this time we cancel, so we don't have to wait as long | ||
tl.Add(50 * time.Millisecond) | ||
shouldTakeAbout(t, tl.Wait, 0, 100, "window 3: work done: 50ms - wait should be 0") | ||
|
||
tl.Add(40 * time.Millisecond) | ||
shouldTakeAbout(t, tl.Wait, 0, 100, "window 3: work done: 90ms - wait should be 0") | ||
|
||
tl.Add(40 * time.Millisecond) | ||
|
||
time.AfterFunc(500*time.Millisecond, cancel) | ||
shouldTakeAbout(t, tl.Wait, 500*time.Millisecond, 500, "window 3: work done: 130ms, canceling after 500ms - wait should be 500ms") | ||
shouldTake(t, tl, 50*time.Millisecond, 0, "window 2: work done: 50ms") | ||
shouldTake(t, tl, 40*time.Millisecond, 0, "window 2: work done: 90ms") | ||
shouldTake(t, tl, 40*time.Millisecond, time.Second-130*time.Millisecond, "window 2: work done: 130ms") | ||
} |