Skip to content

Commit 03c59ad

Browse files
authored
refactor code (#4200)
1 parent 4c97fa3 commit 03c59ad

File tree

9 files changed

+252
-200
lines changed

9 files changed

+252
-200
lines changed

router/throttler/adaptive.go

+26-24
Original file line numberDiff line numberDiff line change
@@ -6,23 +6,34 @@ import (
66
"time"
77

88
"github.com/rudderlabs/rudder-go-kit/config"
9+
"github.com/rudderlabs/rudder-server/utils/misc"
910
)
1011

1112
type adaptiveThrottleConfig struct {
12-
enabled bool
13-
limit int64
14-
window time.Duration
15-
minLimit *config.Reloadable[int64]
16-
maxLimit *config.Reloadable[int64]
13+
window misc.ValueLoader[time.Duration]
14+
minLimit misc.ValueLoader[int64]
15+
maxLimit misc.ValueLoader[int64]
1716
}
1817

1918
func (c *adaptiveThrottleConfig) readThrottlingConfig(config *config.Config, destName, destID string) {
20-
c.window = config.GetDurationVar(1, time.Second, fmt.Sprintf(`Router.throttler.adaptive.%s.timeWindow`, destID), fmt.Sprintf(`Router.throttler.adaptive.%s.timeWindow`, destName), `Router.throttler.adaptive.timeWindow`, fmt.Sprintf(`Router.throttler.%s.%s.timeWindow`, destName, destID), fmt.Sprintf(`Router.throttler.%s.timeWindow`, destName))
21-
c.minLimit = config.GetReloadableInt64Var(1, 1, fmt.Sprintf(`Router.throttler.adaptive.%s.minLimit`, destID), fmt.Sprintf(`Router.throttler.adaptive.%s.minLimit`, destName), `Router.throttler.adaptive.minLimit`, fmt.Sprintf(`Router.throttler.%s.%s.limit`, destName, destID), fmt.Sprintf(`Router.throttler.%s.limit`, destName))
22-
c.maxLimit = config.GetReloadableInt64Var(250, 1, fmt.Sprintf(`Router.throttler.adaptive.%s.maxLimit`, destID), fmt.Sprintf(`Router.throttler.adaptive.%s.maxLimit`, destName), `Router.throttler.adaptive.maxLimit`, fmt.Sprintf(`Router.throttler.%s.%s.limit`, destName, destID), fmt.Sprintf(`Router.throttler.%s.limit`, destName))
23-
if c.window > 0 {
24-
c.enabled = true
25-
}
19+
c.window = config.GetReloadableDurationVar(0, time.Second,
20+
fmt.Sprintf(`Router.throttler.%s.%s.timeWindow`, destName, destID),
21+
fmt.Sprintf(`Router.throttler.%s.timeWindow`, destName),
22+
`Router.throttler.adaptive.timeWindow`)
23+
c.minLimit = config.GetReloadableInt64Var(1, 1,
24+
fmt.Sprintf(`Router.throttler.adaptive.%s.%s.minLimit`, destName, destID),
25+
fmt.Sprintf(`Router.throttler.adaptive.%s.minLimit`, destName),
26+
`Router.throttler.adaptive.minLimit`)
27+
c.maxLimit = config.GetReloadableInt64Var(0, 1,
28+
fmt.Sprintf(`Router.throttler.adaptive.%s.%s.maxLimit`, destName, destID),
29+
fmt.Sprintf(`Router.throttler.adaptive.%s.maxLimit`, destName),
30+
fmt.Sprintf(`Router.throttler.%s.%s.limit`, destName, destID),
31+
fmt.Sprintf(`Router.throttler.%s.limit`, destName),
32+
`Router.throttler.adaptive.maxLimit`)
33+
}
34+
35+
func (c *adaptiveThrottleConfig) enabled() bool {
36+
return c.minLimit.Load() > 0 && c.maxLimit.Load() > 0 && c.window.Load() > 0 && c.minLimit.Load() <= c.maxLimit.Load()
2637
}
2738

2839
type adaptiveThrottler struct {
@@ -33,13 +44,10 @@ type adaptiveThrottler struct {
3344

3445
// CheckLimitReached returns true if we're not allowed to process the number of events we asked for with cost.
3546
func (t *adaptiveThrottler) CheckLimitReached(ctx context.Context, key string, cost int64) (limited bool, retErr error) {
36-
if !t.config.enabled {
47+
if !t.config.enabled() {
3748
return false, nil
3849
}
39-
if t.config.minLimit.Load() > t.config.maxLimit.Load() {
40-
return false, fmt.Errorf("minLimit %d is greater than maxLimit %d", t.config.minLimit.Load(), t.config.maxLimit.Load())
41-
}
42-
allowed, _, err := t.limiter.Allow(ctx, cost, t.getLimit(), getWindowInSecs(t.config.window), key)
50+
allowed, _, err := t.limiter.Allow(ctx, cost, t.getLimit(), getWindowInSecs(t.config.window.Load()), key)
4351
if err != nil {
4452
return false, fmt.Errorf("could not limit: %w", err)
4553
}
@@ -58,12 +66,6 @@ func (t *adaptiveThrottler) Shutdown() {
5866
}
5967

6068
func (t *adaptiveThrottler) getLimit() int64 {
61-
limit := t.config.limit
62-
if limit == 0 {
63-
limit = t.config.maxLimit.Load()
64-
}
65-
limit = int64(float64(limit) * t.algorithm.LimitFactor())
66-
limit = max(t.config.minLimit.Load(), min(limit, t.config.maxLimit.Load()))
67-
t.config.limit = limit
68-
return t.config.limit
69+
limit := int64(float64(t.config.maxLimit.Load()) * t.algorithm.LimitFactor())
70+
return max(t.config.minLimit.Load(), limit)
6971
}

router/throttler/adaptivethrottlercounter/algorithm.go

+52-89
Original file line numberDiff line numberDiff line change
@@ -2,131 +2,94 @@ package adaptivethrottlercounter
22

33
import (
44
"context"
5-
"net/http"
65
"sync"
76
"time"
87

98
"github.com/rudderlabs/rudder-go-kit/config"
9+
"github.com/rudderlabs/rudder-server/utils/misc"
1010
)
1111

12-
type timer struct {
13-
frequency time.Duration // frequency at which the timer runs and resets the tooManyRequestsCount and totalRequestsCount
14-
delay time.Duration
15-
mu sync.Mutex
16-
limitSet bool // set when the limit is set by the timer
17-
tooManyRequestsCount int64
18-
totalRequestsCount int64
19-
}
20-
2112
type Adaptive struct {
22-
shortTimer *timer
23-
longTimer *timer
24-
decreaseRatePercentage *config.Reloadable[int64]
25-
increaseRatePercentage *config.Reloadable[int64]
26-
cancel context.CancelFunc
27-
wg *sync.WaitGroup
13+
limitFactor *limitFactor
14+
increaseLimitCounter *increaseLimitCounter
15+
decreaseLimitCounter *decreaseLimitCounter
16+
cancel context.CancelFunc
17+
wg *sync.WaitGroup
2818
}
2919

30-
func New(config *config.Config, destWindow time.Duration) *Adaptive {
31-
increaseRateEvaluationFrequency := config.GetInt64("Router.throttler.adaptive.increaseRateEvaluationFrequency", 2)
32-
decreaseRateDelay := config.GetInt64("Router.throttler.adaptive.decreaseRateDelay", 1)
20+
func New(config *config.Config, window misc.ValueLoader[time.Duration]) *Adaptive {
21+
lf := &limitFactor{value: 1}
3322

34-
shortTimerDelay := time.Duration(decreaseRateDelay) * destWindow
35-
shortTimer := &timer{
36-
frequency: shortTimerDelay + destWindow,
37-
delay: shortTimerDelay,
23+
increaseWindowMultiplier := config.GetReloadableIntVar(2, 1, "Router.throttler.adaptive.increaseWindowMultiplier")
24+
increaseCounterWindow := func() time.Duration { return window.Load() * time.Duration(increaseWindowMultiplier.Load()) }
25+
increasePercentage := config.GetReloadableInt64Var(10, 1, "Router.throttler.adaptive.increasePercentage")
26+
ilc := &increaseLimitCounter{
27+
limitFactor: lf,
28+
window: increaseCounterWindow,
29+
increasePercentage: increasePercentage,
3830
}
39-
longTimer := &timer{
40-
frequency: time.Duration(increaseRateEvaluationFrequency) * destWindow,
31+
32+
decreaseWaitWindowMultiplier := config.GetReloadableIntVar(1, 1, "Router.throttler.adaptive.decreaseWaitWindowMultiplier")
33+
decreaseWaitWindow := func() time.Duration { return window.Load() * time.Duration(decreaseWaitWindowMultiplier.Load()) }
34+
decreasePercentage := config.GetReloadableInt64Var(30, 1, "Router.throttler.adaptive.decreasePercentage")
35+
dlc := &decreaseLimitCounter{
36+
limitFactor: lf,
37+
window: func() time.Duration { return window.Load() },
38+
waitWindow: decreaseWaitWindow,
39+
decreasePercentage: decreasePercentage,
40+
throttleTolerancePercentage: func() int64 { return min(increasePercentage.Load(), decreasePercentage.Load()) },
4141
}
4242

4343
var wg sync.WaitGroup
44-
4544
ctx, cancel := context.WithCancel(context.Background())
46-
go shortTimer.run(&wg, ctx)
4745
wg.Add(1)
48-
go longTimer.run(&wg, ctx)
46+
go ilc.run(ctx, &wg)
4947
wg.Add(1)
48+
go dlc.run(ctx, &wg)
5049

5150
return &Adaptive{
52-
shortTimer: shortTimer,
53-
longTimer: longTimer,
54-
decreaseRatePercentage: config.GetReloadableInt64Var(30, 1, "Router.throttler.adaptive.decreaseRatePercentage"),
55-
increaseRatePercentage: config.GetReloadableInt64Var(10, 1, "Router.throttler.adaptive.increaseRatePercentage"),
56-
cancel: cancel,
57-
wg: &wg,
51+
increaseLimitCounter: ilc,
52+
decreaseLimitCounter: dlc,
53+
limitFactor: lf,
54+
cancel: cancel,
55+
wg: &wg,
5856
}
5957
}
6058

6159
func (a *Adaptive) LimitFactor() float64 {
62-
resolution := min(a.decreaseRatePercentage.Load(), a.increaseRatePercentage.Load())
63-
if a.shortTimer.getLimitReached() && a.shortTimer.SetLimit(true) && a.shortTimer.tooManyRequestsCount*100 >= a.shortTimer.totalRequestsCount*resolution { // if the number of 429s in the last 1 second is greater than the resolution
64-
return 1 - float64(a.decreaseRatePercentage.Load())/100
65-
} else if !a.longTimer.getLimitReached() && a.longTimer.SetLimit(true) {
66-
return 1 + float64(a.increaseRatePercentage.Load())/100
67-
}
68-
return 1.0
60+
return a.limitFactor.Get()
6961
}
7062

7163
func (a *Adaptive) ResponseCodeReceived(code int) {
72-
a.shortTimer.updateLimitReached(code)
73-
a.longTimer.updateLimitReached(code)
64+
a.increaseLimitCounter.ResponseCodeReceived(code)
65+
a.decreaseLimitCounter.ResponseCodeReceived(code)
7466
}
7567

7668
func (a *Adaptive) Shutdown() {
7769
a.cancel()
7870
a.wg.Wait()
7971
}
8072

81-
func (t *timer) run(wg *sync.WaitGroup, ctx context.Context) {
82-
defer wg.Done()
83-
for {
84-
select {
85-
case <-ctx.Done():
86-
return
87-
case <-time.After(t.frequency):
88-
t.resetLimitReached()
89-
if t.delay > 0 {
90-
select {
91-
case <-ctx.Done():
92-
return
93-
case <-time.After(t.delay):
94-
t.resetLimitReached()
95-
}
96-
}
97-
}
98-
}
73+
type limitFactor struct {
74+
mu sync.RWMutex
75+
value float64
9976
}
10077

101-
func (t *timer) updateLimitReached(code int) {
102-
t.mu.Lock()
103-
defer t.mu.Unlock()
104-
if code == http.StatusTooManyRequests {
105-
t.tooManyRequestsCount++
78+
// Add adds value to the current value of the limit factor, and clamps it between 0 and 1
79+
func (l *limitFactor) Add(value float64) {
80+
l.mu.Lock()
81+
defer l.mu.Unlock()
82+
l.value += value
83+
if l.value < 0 {
84+
l.value = 0
85+
}
86+
if l.value > 1 {
87+
l.value = 1
10688
}
107-
t.totalRequestsCount++
108-
}
109-
110-
func (t *timer) getLimitReached() bool {
111-
t.mu.Lock()
112-
defer t.mu.Unlock()
113-
return t.tooManyRequestsCount > 0
114-
}
115-
116-
func (t *timer) resetLimitReached() {
117-
t.mu.Lock()
118-
defer t.mu.Unlock()
119-
t.tooManyRequestsCount = 0
120-
t.totalRequestsCount = 0
121-
t.limitSet = false
12289
}
12390

124-
func (t *timer) SetLimit(limitSet bool) bool {
125-
t.mu.Lock()
126-
defer t.mu.Unlock()
127-
if limitSet == t.limitSet {
128-
return false
129-
}
130-
t.limitSet = limitSet
131-
return true
91+
func (l *limitFactor) Get() float64 {
92+
l.mu.RLock()
93+
defer l.mu.RUnlock()
94+
return l.value
13295
}

router/throttler/adaptivethrottlercounter/algorithm_test.go

+25-25
Original file line numberDiff line numberDiff line change
@@ -9,55 +9,55 @@ import (
99
"github.com/stretchr/testify/require"
1010

1111
"github.com/rudderlabs/rudder-go-kit/config"
12+
"github.com/rudderlabs/rudder-server/utils/misc"
1213
)
1314

1415
const float64EqualityThreshold = 1e-9
1516

1617
func TestAdaptiveRateLimit(t *testing.T) {
17-
t.Run("when there is a 429s in the last shortTimer frequency", func(t *testing.T) {
18-
config := config.New()
19-
al := New(config, 1*time.Second)
20-
defer al.Shutdown()
18+
cfg := config.New()
19+
al := New(cfg, misc.SingleValueLoader(500*time.Millisecond))
20+
defer al.Shutdown()
21+
t.Run("when there is a 429s in the last decrease limit counter window", func(t *testing.T) {
2122
al.ResponseCodeReceived(429)
22-
require.True(t, floatCheck(al.LimitFactor(), float64(0.7))) // reduces by 30% since there is an error in the last 1 second
23+
require.Eventually(t, func() bool {
24+
return floatCheck(al.LimitFactor(), float64(0.7))
25+
}, time.Second, 100*time.Millisecond) // reduces by 30% since there is an error in the last 1 second
2326
})
2427

25-
t.Run("when there are no 429s ins the last longTimer frequency", func(t *testing.T) {
26-
config := config.New()
27-
al := New(config, 1*time.Second)
28-
defer al.Shutdown()
29-
require.True(t, floatCheck(al.LimitFactor(), float64(1.1))) // increases by 10% since there is no error in the last 2 seconds
28+
t.Run("when there are no 429s ins the last increase limit counter window", func(t *testing.T) {
29+
require.Eventually(t, func() bool {
30+
return floatCheck(al.LimitFactor(), float64(0.8))
31+
}, 2*time.Second, 100*time.Millisecond) // increases by 10% since there is no error in the last 2 seconds
3032
})
3133

3234
t.Run("429s less than resolution", func(t *testing.T) {
33-
config := config.New()
34-
al := New(config, 1*time.Second)
35-
defer al.Shutdown()
3635
for i := 0; i < 10; i++ {
3736
al.ResponseCodeReceived(200)
3837
}
3938
al.ResponseCodeReceived(429)
40-
require.True(t, floatCheck(al.LimitFactor(), float64(1.0))) // does not change since 429s less than resolution
39+
require.True(t, floatCheck(al.LimitFactor(), float64(0.8))) // does not change since 429s less than resolution
4140
})
4241

4342
t.Run("429s more than resolution", func(t *testing.T) {
44-
config := config.New()
45-
al := New(config, 1*time.Second)
46-
defer al.Shutdown()
4743
for i := 0; i < 5; i++ {
4844
al.ResponseCodeReceived(200)
4945
}
5046
al.ResponseCodeReceived(429)
51-
require.True(t, floatCheck(al.LimitFactor(), float64(0.7))) // does not change since 429s less than resolution
47+
require.Eventually(t, func() bool {
48+
return floatCheck(al.LimitFactor(), float64(0.5))
49+
}, time.Second, 100*time.Millisecond) // does not change since 429s less than resolution
5250
})
5351

5452
t.Run("should delay for few windows before decreasing again", func(t *testing.T) {
55-
config := config.New()
56-
config.Set("Router.throttler.adaptive.decreaseRateDelay", 2)
57-
al := New(config, 1*time.Second)
53+
cfg := config.New()
54+
cfg.Set("Router.throttler.adaptive.decreaseRateDelay", 2)
55+
al := New(cfg, misc.SingleValueLoader(500*time.Millisecond))
5856
defer al.Shutdown()
5957
al.ResponseCodeReceived(429)
60-
require.True(t, floatCheck(al.LimitFactor(), float64(0.7)))
58+
require.Eventually(t, func() bool {
59+
return floatCheck(al.LimitFactor(), float64(0.7))
60+
}, time.Second, 100*time.Millisecond) // reduces by 30% since there is an error in the last 1 second
6161

6262
ctx, cancel := context.WithCancel(context.Background())
6363
defer cancel()
@@ -71,10 +71,10 @@ func TestAdaptiveRateLimit(t *testing.T) {
7171
}
7272
}
7373
}(ctx)
74-
require.True(t, floatCheck(al.LimitFactor(), float64(1.0)))
74+
require.True(t, floatCheck(al.LimitFactor(), float64(0.7))) // does not change since there is a delay
7575
require.Eventually(t, func() bool {
76-
return floatCheck(al.LimitFactor(), float64(0.7))
77-
}, 4*time.Second, 100*time.Millisecond)
76+
return floatCheck(al.LimitFactor(), float64(0.4))
77+
}, 2*time.Second, 100*time.Millisecond)
7878
})
7979
}
8080

0 commit comments

Comments
 (0)