Skip to content

Commit 44e1790

Browse files
committed
address few comments
1 parent 2916ba2 commit 44e1790

File tree

6 files changed

+144
-133
lines changed

6 files changed

+144
-133
lines changed

app/apphandlers/embeddedAppHandler.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -270,7 +270,7 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options)
270270
enrichers,
271271
processor.WithAdaptiveLimit(adaptiveLimit),
272272
)
273-
throttlerFactory, err := rtThrottler.New(stats.Default)
273+
throttlerFactory, err := rtThrottler.New(stats.Default, config)
274274
if err != nil {
275275
return fmt.Errorf("failed to create rt throttler factory: %w", err)
276276
}

app/apphandlers/processorAppHandler.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -260,7 +260,7 @@ func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options
260260
enrichers,
261261
proc.WithAdaptiveLimit(adaptiveLimit),
262262
)
263-
throttlerFactory, err := throttler.New(stats.Default)
263+
throttlerFactory, err := throttler.New(stats.Default, config.Default)
264264
if err != nil {
265265
return fmt.Errorf("failed to create throttler factory: %w", err)
266266
}

router/throttler/adaptive.go

+124
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
package throttler
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"sync"
7+
"time"
8+
9+
"github.com/rudderlabs/rudder-go-kit/config"
10+
)
11+
12+
type timer struct {
13+
frequency *config.Reloadable[time.Duration]
14+
limitReached map[string]bool
15+
mu sync.Mutex
16+
limitSet bool
17+
cancel context.CancelFunc
18+
}
19+
20+
type adaptiveConfig struct {
21+
enabled bool
22+
minLimit *config.Reloadable[int64]
23+
maxLimit *config.Reloadable[int64]
24+
minChangePercentage *config.Reloadable[int64]
25+
maxChangePercentage *config.Reloadable[int64]
26+
}
27+
28+
type Adaptive struct {
29+
shortTimer *timer
30+
longTimer *timer
31+
config adaptiveConfig
32+
}
33+
34+
func NewAdaptive(config *config.Config) *Adaptive {
35+
shortTimeFrequency := config.GetReloadableDurationVar(5, time.Second, "Router.throttler.adaptiveRateLimit.shortTimeFrequency")
36+
longTimeFrequency := config.GetReloadableDurationVar(15, time.Second, "Router.throttler.adaptiveRateLimit.longTimeFrequency")
37+
38+
shortTimer := &timer{
39+
frequency: shortTimeFrequency,
40+
limitReached: make(map[string]bool),
41+
}
42+
longTimer := &timer{
43+
frequency: longTimeFrequency,
44+
limitReached: make(map[string]bool),
45+
}
46+
47+
go shortTimer.run()
48+
go longTimer.run()
49+
50+
return &Adaptive{
51+
shortTimer: shortTimer,
52+
longTimer: longTimer,
53+
}
54+
}
55+
56+
func (a *Adaptive) loadConfig(destName, destID string) {
57+
a.config.enabled = config.GetBoolVar(true, fmt.Sprintf(`Router.throttler.adaptiveRateLimit.%s.%s.enabled`, destName, destID), fmt.Sprintf(`Router.throttler.adaptiveRateLimit.%s.enabled`, destName), `Router.throttler.adaptiveRateLimit.enabled`)
58+
a.config.minLimit = config.GetReloadableInt64Var(1, 1, fmt.Sprintf(`Router.throttler.adaptiveRateLimit.%s.%s.minLimit`, destName, destID), fmt.Sprintf(`Router.throttler.adaptiveRateLimit.%s.minLimit`, destName), `Router.throttler.adaptiveRateLimit.minLimit`, fmt.Sprintf(`Router.throttler.%s.%s.limit`, destName, destID), fmt.Sprintf(`Router.throttler.%s.limit`, destName))
59+
a.config.maxLimit = config.GetReloadableInt64Var(250, 1, fmt.Sprintf(`Router.throttler.adaptiveRateLimit.%s.%s.maxLimit`, destName, destID), fmt.Sprintf(`Router.throttler.adaptiveRateLimit.%s.maxLimit`, destName), `Router.throttler.adaptiveRateLimit.maxLimit`, fmt.Sprintf(`Router.throttler.%s.%s.limit`, destName, destID), fmt.Sprintf(`Router.throttler.%s.limit`, destName))
60+
a.config.minChangePercentage = config.GetReloadableInt64Var(30, 1, fmt.Sprintf(`Router.throttler.adaptiveRateLimit.%s.%s.minChangePercentage`, destName, destID), fmt.Sprintf(`Router.throttler.adaptiveRateLimit.%s.minChangePercentage`, destName), `Router.throttler.adaptiveRateLimit.minChangePercentage`)
61+
a.config.maxChangePercentage = config.GetReloadableInt64Var(10, 1, fmt.Sprintf(`Router.throttler.adaptiveRateLimit.%s.%s.maxChangePercentage`, destName, destID), fmt.Sprintf(`Router.throttler.adaptiveRateLimit.%s.maxChangePercentage`, destName), `Router.throttler.adaptiveRateLimit.maxChangePercentage`)
62+
}
63+
64+
func (a *Adaptive) Limit(destName, destID string, limit int64) int64 {
65+
a.loadConfig(destName, destID)
66+
if !a.config.enabled || a.config.minLimit.Load() > a.config.maxLimit.Load() {
67+
return limit
68+
}
69+
if limit <= 0 {
70+
limit = a.config.maxLimit.Load()
71+
}
72+
newLimit := limit
73+
if a.shortTimer.getLimitReached(destID) && !a.shortTimer.limitSet {
74+
newLimit = limit - (limit * a.config.minChangePercentage.Load() / 100)
75+
a.shortTimer.limitSet = true
76+
} else if !a.longTimer.getLimitReached(destID) && !a.longTimer.limitSet {
77+
newLimit = limit + (limit * a.config.maxChangePercentage.Load() / 100)
78+
a.longTimer.limitSet = true
79+
}
80+
newLimit = max(a.config.minLimit.Load(), min(newLimit, a.config.maxLimit.Load()))
81+
return newLimit
82+
}
83+
84+
func (a *Adaptive) SetLimitReached(destID string) {
85+
a.shortTimer.updateLimitReached(destID)
86+
a.longTimer.updateLimitReached(destID)
87+
}
88+
89+
func (a *Adaptive) ShutDown() {
90+
a.shortTimer.cancel()
91+
a.longTimer.cancel()
92+
}
93+
94+
func (t *timer) run() {
95+
ctx, cancel := context.WithCancel(context.Background())
96+
t.cancel = cancel
97+
for {
98+
select {
99+
case <-ctx.Done():
100+
return
101+
case <-time.After(t.frequency.Load()):
102+
t.resetLimitReached()
103+
}
104+
}
105+
}
106+
107+
func (t *timer) updateLimitReached(destID string) {
108+
t.mu.Lock()
109+
defer t.mu.Unlock()
110+
t.limitReached[destID] = true
111+
}
112+
113+
func (t *timer) getLimitReached(destID string) bool {
114+
t.mu.Lock()
115+
defer t.mu.Unlock()
116+
return t.limitReached[destID]
117+
}
118+
119+
func (t *timer) resetLimitReached() {
120+
t.mu.Lock()
121+
defer t.mu.Unlock()
122+
clear(t.limitReached)
123+
t.limitSet = false
124+
}

router/throttler/adaptiveRateLimit.go

-112
This file was deleted.

router/throttler/adaptiveRateLimit_test.go renamed to router/throttler/adaptive_test.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,11 @@ import (
1010
)
1111

1212
func TestAdaptiveRateLimit(t *testing.T) {
13+
config := config.Default
1314
config.Set("Router.throttler.adaptiveRateLimit.enabled", true)
1415
config.Set("Router.throttler.adaptiveRateLimit.shortTimeFrequency", 1*time.Second)
1516
config.Set("Router.throttler.adaptiveRateLimit.longTimeFrequency", 2*time.Second)
16-
f, err := New(nil)
17+
f, err := New(nil, config)
1718
require.NoError(t, err)
1819
require.Equal(t, int64(250), f.Get("destName", "dest1").config.limit)
1920

router/throttler/throttler.go

+16-18
Original file line numberDiff line numberDiff line change
@@ -25,22 +25,20 @@ type limiter interface {
2525
}
2626

2727
type Factory struct {
28-
Stats stats.Stats
29-
limiter limiter
30-
throttlers map[string]*Throttler // map key is the destinationID
31-
throttlersMu sync.Mutex
32-
limitReachedPerDestination chan string // channel to send destinationID when limit is reached
33-
adaptiveRateLimiter func(destName, destID string, limit int64) int64
28+
Stats stats.Stats
29+
limiter limiter
30+
throttlers map[string]*Throttler // map key is the destinationID
31+
throttlersMu sync.Mutex
32+
adaptive *Adaptive
3433
}
3534

3635
// New constructs a new Throttler Factory
37-
func New(stats stats.Stats) (*Factory, error) {
36+
func New(stats stats.Stats, config *config.Config) (*Factory, error) {
3837
f := Factory{
39-
Stats: stats,
40-
throttlers: make(map[string]*Throttler),
41-
limitReachedPerDestination: make(chan string),
38+
Stats: stats,
39+
throttlers: make(map[string]*Throttler),
4240
}
43-
if err := f.initThrottlerFactory(); err != nil {
41+
if err := f.initThrottlerFactory(config); err != nil {
4442
return nil, err
4543
}
4644
return &f, nil
@@ -50,16 +48,16 @@ func (f *Factory) Get(destName, destID string) *Throttler {
5048
f.throttlersMu.Lock()
5149
defer f.throttlersMu.Unlock()
5250
if t, ok := f.throttlers[destID]; ok {
53-
if f.adaptiveRateLimiter != nil {
54-
t.config.limit = f.adaptiveRateLimiter(destName, destID, t.config.limit)
51+
if f.adaptive != nil {
52+
t.config.limit = f.adaptive.Limit(destName, destID, t.config.limit)
5553
}
5654
return t
5755
}
5856

5957
var conf throttlingConfig
6058
conf.readThrottlingConfig(destName, destID)
61-
if f.adaptiveRateLimiter != nil {
62-
conf.limit = f.adaptiveRateLimiter(destName, destID, conf.limit)
59+
if f.adaptive != nil {
60+
conf.limit = f.adaptive.Limit(destName, destID, conf.limit)
6361
}
6462
f.throttlers[destID] = &Throttler{
6563
limiter: f.limiter,
@@ -71,10 +69,10 @@ func (f *Factory) Get(destName, destID string) *Throttler {
7169
func (f *Factory) SetLimitReached(destID string) {
7270
f.throttlersMu.Lock()
7371
defer f.throttlersMu.Unlock()
74-
f.limitReachedPerDestination <- destID
72+
f.adaptive.SetLimitReached(destID)
7573
}
7674

77-
func (f *Factory) initThrottlerFactory() error {
75+
func (f *Factory) initThrottlerFactory(config *config.Config) error {
7876
var redisClient *redis.Client
7977
if config.IsSet("Router.throttler.redis.addr") {
8078
redisClient = redis.NewClient(&redis.Options{
@@ -86,7 +84,7 @@ func (f *Factory) initThrottlerFactory() error {
8684

8785
throttlingAlgorithm := throttlingAlgoTypeGCRA
8886
if config.GetBool("Router.throttler.adaptiveRateLimit.enabled", false) {
89-
f.adaptiveRateLimiter = SetupRouterAdaptiveRateLimiter(context.Background(), f.limitReachedPerDestination)
87+
f.adaptive = NewAdaptive(config)
9088
} else {
9189
throttlingAlgorithm = config.GetString("Router.throttler.algorithm", throttlingAlgoTypeGCRA)
9290
if throttlingAlgorithm == throttlingAlgoTypeRedisGCRA || throttlingAlgorithm == throttlingAlgoTypeRedisSortedSet {

0 commit comments

Comments
 (0)