@@ -2,131 +2,94 @@ package adaptivethrottlercounter
2
2
3
3
import (
4
4
"context"
5
- "net/http"
6
5
"sync"
7
6
"time"
8
7
9
8
"github.com/rudderlabs/rudder-go-kit/config"
9
+ "github.com/rudderlabs/rudder-server/utils/misc"
10
10
)
11
11
12
- type timer struct {
13
- frequency time.Duration // frequency at which the timer runs and resets the tooManyRequestsCount and totalRequestsCount
14
- delay time.Duration
15
- mu sync.Mutex
16
- limitSet bool // set when the limit is set by the timer
17
- tooManyRequestsCount int64
18
- totalRequestsCount int64
19
- }
20
-
21
12
type Adaptive struct {
22
- shortTimer * timer
23
- longTimer * timer
24
- decreaseRatePercentage * config.Reloadable [int64 ]
25
- increaseRatePercentage * config.Reloadable [int64 ]
26
- cancel context.CancelFunc
27
- wg * sync.WaitGroup
13
+ limitFactor * limitFactor
14
+ increaseLimitCounter * increaseLimitCounter
15
+ decreaseLimitCounter * decreaseLimitCounter
16
+ cancel context.CancelFunc
17
+ wg * sync.WaitGroup
28
18
}
29
19
30
- func New (config * config.Config , destWindow time.Duration ) * Adaptive {
31
- increaseRateEvaluationFrequency := config .GetInt64 ("Router.throttler.adaptive.increaseRateEvaluationFrequency" , 2 )
32
- decreaseRateDelay := config .GetInt64 ("Router.throttler.adaptive.decreaseRateDelay" , 1 )
20
+ func New (config * config.Config , window misc.ValueLoader [time.Duration ]) * Adaptive {
21
+ lf := & limitFactor {value : 1 }
33
22
34
- shortTimerDelay := time .Duration (decreaseRateDelay ) * destWindow
35
- shortTimer := & timer {
36
- frequency : shortTimerDelay + destWindow ,
37
- delay : shortTimerDelay ,
23
+ increaseWindowMultiplier := config .GetReloadableIntVar (2 , 1 , "Router.throttler.adaptive.increaseWindowMultiplier" )
24
+ increaseCounterWindow := func () time.Duration { return window .Load () * time .Duration (increaseWindowMultiplier .Load ()) }
25
+ increasePercentage := config .GetReloadableInt64Var (10 , 1 , "Router.throttler.adaptive.increasePercentage" )
26
+ ilc := & increaseLimitCounter {
27
+ limitFactor : lf ,
28
+ window : increaseCounterWindow ,
29
+ increasePercentage : increasePercentage ,
38
30
}
39
- longTimer := & timer {
40
- frequency : time .Duration (increaseRateEvaluationFrequency ) * destWindow ,
31
+
32
+ decreaseWaitWindowMultiplier := config .GetReloadableIntVar (1 , 1 , "Router.throttler.adaptive.decreaseWaitWindowMultiplier" )
33
+ decreaseWaitWindow := func () time.Duration { return window .Load () * time .Duration (decreaseWaitWindowMultiplier .Load ()) }
34
+ decreasePercentage := config .GetReloadableInt64Var (30 , 1 , "Router.throttler.adaptive.decreasePercentage" )
35
+ dlc := & decreaseLimitCounter {
36
+ limitFactor : lf ,
37
+ window : func () time.Duration { return window .Load () },
38
+ waitWindow : decreaseWaitWindow ,
39
+ decreasePercentage : decreasePercentage ,
40
+ throttleTolerancePercentage : func () int64 { return min (increasePercentage .Load (), decreasePercentage .Load ()) },
41
41
}
42
42
43
43
var wg sync.WaitGroup
44
-
45
44
ctx , cancel := context .WithCancel (context .Background ())
46
- go shortTimer .run (& wg , ctx )
47
45
wg .Add (1 )
48
- go longTimer .run (& wg , ctx )
46
+ go ilc .run (ctx , & wg )
49
47
wg .Add (1 )
48
+ go dlc .run (ctx , & wg )
50
49
51
50
return & Adaptive {
52
- shortTimer : shortTimer ,
53
- longTimer : longTimer ,
54
- decreaseRatePercentage : config .GetReloadableInt64Var (30 , 1 , "Router.throttler.adaptive.decreaseRatePercentage" ),
55
- increaseRatePercentage : config .GetReloadableInt64Var (10 , 1 , "Router.throttler.adaptive.increaseRatePercentage" ),
56
- cancel : cancel ,
57
- wg : & wg ,
51
+ increaseLimitCounter : ilc ,
52
+ decreaseLimitCounter : dlc ,
53
+ limitFactor : lf ,
54
+ cancel : cancel ,
55
+ wg : & wg ,
58
56
}
59
57
}
60
58
61
59
func (a * Adaptive ) LimitFactor () float64 {
62
- resolution := min (a .decreaseRatePercentage .Load (), a .increaseRatePercentage .Load ())
63
- if a .shortTimer .getLimitReached () && a .shortTimer .SetLimit (true ) && a .shortTimer .tooManyRequestsCount * 100 >= a .shortTimer .totalRequestsCount * resolution { // if the number of 429s in the last 1 second is greater than the resolution
64
- return 1 - float64 (a .decreaseRatePercentage .Load ())/ 100
65
- } else if ! a .longTimer .getLimitReached () && a .longTimer .SetLimit (true ) {
66
- return 1 + float64 (a .increaseRatePercentage .Load ())/ 100
67
- }
68
- return 1.0
60
+ return a .limitFactor .Get ()
69
61
}
70
62
71
63
func (a * Adaptive ) ResponseCodeReceived (code int ) {
72
- a .shortTimer . updateLimitReached (code )
73
- a .longTimer . updateLimitReached (code )
64
+ a .increaseLimitCounter . ResponseCodeReceived (code )
65
+ a .decreaseLimitCounter . ResponseCodeReceived (code )
74
66
}
75
67
76
68
func (a * Adaptive ) Shutdown () {
77
69
a .cancel ()
78
70
a .wg .Wait ()
79
71
}
80
72
81
- func (t * timer ) run (wg * sync.WaitGroup , ctx context.Context ) {
82
- defer wg .Done ()
83
- for {
84
- select {
85
- case <- ctx .Done ():
86
- return
87
- case <- time .After (t .frequency ):
88
- t .resetLimitReached ()
89
- if t .delay > 0 {
90
- select {
91
- case <- ctx .Done ():
92
- return
93
- case <- time .After (t .delay ):
94
- t .resetLimitReached ()
95
- }
96
- }
97
- }
98
- }
73
+ type limitFactor struct {
74
+ mu sync.RWMutex
75
+ value float64
99
76
}
100
77
101
- func (t * timer ) updateLimitReached (code int ) {
102
- t .mu .Lock ()
103
- defer t .mu .Unlock ()
104
- if code == http .StatusTooManyRequests {
105
- t .tooManyRequestsCount ++
78
+ // Add adds value to the current value of the limit factor, and clamps it between 0 and 1
79
+ func (l * limitFactor ) Add (value float64 ) {
80
+ l .mu .Lock ()
81
+ defer l .mu .Unlock ()
82
+ l .value += value
83
+ if l .value < 0 {
84
+ l .value = 0
85
+ }
86
+ if l .value > 1 {
87
+ l .value = 1
106
88
}
107
- t .totalRequestsCount ++
108
- }
109
-
110
- func (t * timer ) getLimitReached () bool {
111
- t .mu .Lock ()
112
- defer t .mu .Unlock ()
113
- return t .tooManyRequestsCount > 0
114
- }
115
-
116
- func (t * timer ) resetLimitReached () {
117
- t .mu .Lock ()
118
- defer t .mu .Unlock ()
119
- t .tooManyRequestsCount = 0
120
- t .totalRequestsCount = 0
121
- t .limitSet = false
122
89
}
123
90
124
- func (t * timer ) SetLimit (limitSet bool ) bool {
125
- t .mu .Lock ()
126
- defer t .mu .Unlock ()
127
- if limitSet == t .limitSet {
128
- return false
129
- }
130
- t .limitSet = limitSet
131
- return true
91
+ func (l * limitFactor ) Get () float64 {
92
+ l .mu .RLock ()
93
+ defer l .mu .RUnlock ()
94
+ return l .value
132
95
}
0 commit comments