Skip to content

feat: support for adaptive rate limiting [PIPE-481] #4160

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 18 commits into from
Dec 6, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions router/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,9 @@ func (rt *Handle) commitStatusList(workerJobStatuses *[]workerJobStatus) {
if workerJobStatus.status.AttemptNum == 1 {
sd.Count++
}
if workerJobStatus.status.ErrorCode == strconv.Itoa(http.StatusTooManyRequests) {
rt.throttlerFactory.SetLimitReached(parameters.DestinationID)
}
}
case jobsdb.Succeeded.State, jobsdb.Filtered.State:
routerWorkspaceJobStatusCount[workspaceID]++
Expand Down
112 changes: 112 additions & 0 deletions router/throttler/adaptiveRateLimit.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package throttler

import (
"context"
"fmt"
"sync"
"time"

"github.com/rudderlabs/rudder-go-kit/config"
)

type timers struct {
timer time.Duration
limitReached map[string]bool
mu sync.Mutex
limitSet bool
}

func SetupRouterAdaptiveRateLimiter(ctx context.Context, errorCh <-chan string) func(destName, destID string, limit int64) int64 {
shortTimeFrequency := config.GetDuration("Router.throttler.adaptiveRateLimit.shortTimeFrequency", 5, time.Second)
longTimeFrequency := config.GetDuration("Router.throttler.adaptiveRateLimit.longTimeFrequency", 15, time.Second)

shortTimer := &timers{
timer: shortTimeFrequency,
limitReached: make(map[string]bool),
}
longTimer := &timers{
timer: longTimeFrequency,
limitReached: make(map[string]bool),
}

go shortTimer.runLoop(ctx)
go longTimer.runLoop(ctx)

go func() {
for {
select {
case <-ctx.Done():
return
case value := <-errorCh:

shortTimer.updateLimitReached(value)
longTimer.updateLimitReached(value)
}
}
}()

limiter := func(destName, destID string, limit int64) int64 {
enabled := config.GetBoolVar(true, fmt.Sprintf(`Router.throttler.adaptiveRateLimit.%s.%s.enabled`, destName, destID), fmt.Sprintf(`Router.throttler.adaptiveRateLimit.%s.enabled`, destName), `Router.throttler.adaptiveRateLimit.enabled`)

if !enabled {
return limit
}
minLimit := config.GetInt64Var(1, 1, fmt.Sprintf(`Router.throttler.adaptiveRateLimit.%s.%s.minLimit`, destName, destID), fmt.Sprintf(`Router.throttler.adaptiveRateLimit.%s.minLimit`, destName), `Router.throttler.adaptiveRateLimit.minLimit`, fmt.Sprintf(`Router.throttler.%s.%s.limit`, destName, destID), fmt.Sprintf(`Router.throttler.%s.limit`, destName))
maxLimit := config.GetInt64Var(250, 1, fmt.Sprintf(`Router.throttler.adaptiveRateLimit.%s.%s.maxLimit`, destName, destID), fmt.Sprintf(`Router.throttler.adaptiveRateLimit.%s.maxLimit`, destName), `Router.throttler.adaptiveRateLimit.maxLimit`, fmt.Sprintf(`Router.throttler.%s.%s.limit`, destName, destID), fmt.Sprintf(`Router.throttler.%s.limit`, destName))

if minLimit > maxLimit {
return limit
}
minChangePercentage := config.GetInt64Var(30, 1, fmt.Sprintf(`Router.throttler.adaptiveRateLimit.%s.%s.minChangePercentage`, destName, destID), fmt.Sprintf(`Router.throttler.adaptiveRateLimit.%s.minChangePercentage`, destName), `Router.throttler.adaptiveRateLimit.minChangePercentage`)
maxChangePercentage := config.GetInt64Var(10, 1, fmt.Sprintf(`Router.throttler.adaptiveRateLimit.%s.%s.maxChangePercentage`, destName, destID), fmt.Sprintf(`Router.throttler.adaptiveRateLimit.%s.maxChangePercentage`, destName), `Router.throttler.adaptiveRateLimit.maxChangePercentage`)
if limit <= 0 {
limit = maxLimit
}
newLimit := limit
if shortTimer.getLimitReached(destID) && !shortTimer.limitSet {

newLimit = limit - (limit * minChangePercentage / 100)
shortTimer.limitSet = true
} else if !longTimer.getLimitReached(destID) && !longTimer.limitSet {
newLimit = limit + (limit * maxChangePercentage / 100)
longTimer.limitSet = true
}
newLimit = max(minLimit, min(newLimit, maxLimit))
return newLimit
}
return limiter
}

func (t *timers) runLoop(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case <-time.After(t.timer):

t.resetLimitReached()
}
}
}

func (t *timers) updateLimitReached(destID string) {
t.mu.Lock()
defer t.mu.Unlock()

t.limitReached[destID] = true
}

func (t *timers) getLimitReached(destID string) bool {
t.mu.Lock()
defer t.mu.Unlock()

return t.limitReached[destID]
}

func (t *timers) resetLimitReached() {
t.mu.Lock()
defer t.mu.Unlock()

clear(t.limitReached)
t.limitSet = false
}
94 changes: 94 additions & 0 deletions router/throttler/adaptiveRateLimit_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package throttler

import (
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/rudderlabs/rudder-go-kit/config"
)

func TestAdaptiveRateLimit(t *testing.T) {
config.Set("Router.throttler.adaptiveRateLimit.enabled", true)
config.Set("Router.throttler.adaptiveRateLimit.shortTimeFrequency", 1*time.Second)
config.Set("Router.throttler.adaptiveRateLimit.longTimeFrequency", 2*time.Second)
f, err := New(nil)
require.NoError(t, err)
require.Equal(t, int64(250), f.Get("destName", "dest1").config.limit)

t.Run("short timer", func(t *testing.T) {
f.SetLimitReached("dest1")
require.Eventually(t, func() bool {
return int64(175) == f.Get("destName", "dest1").config.limit // reduces by 30% since there is an error in the last 1 second
}, time.Second, 10*time.Millisecond)
})

t.Run("long timer", func(t *testing.T) {
require.Eventually(t, func() bool {
return int64(192) == f.Get("destName", "dest1").config.limit // increases by 10% since there is no error in the last 2 seconds
}, 2*time.Second, 10*time.Millisecond)
})

t.Run("min limit", func(t *testing.T) {
config.Set("Router.throttler.adaptiveRateLimit.destName.dest1.minLimit", 200)
require.Eventually(t, func() bool {
return int64(200) == f.Get("destName", "dest1").config.limit // will not reduce below 200
}, 1*time.Second, 10*time.Millisecond)
})

t.Run("max limit", func(t *testing.T) {
config.Set("Router.throttler.adaptiveRateLimit.destName.dest1.maxLimit", 210)
require.Eventually(t, func() bool {
return int64(210) == f.Get("destName", "dest1").config.limit // will not increase above 210
}, 2*time.Second, 10*time.Millisecond)
})

t.Run("min change percentage", func(t *testing.T) {
f.SetLimitReached("dest1")
config.Set("Router.throttler.adaptiveRateLimit.destName.dest1.minLimit", 100)
config.Set("Router.throttler.adaptiveRateLimit.destName.dest1.minChangePercentage", 50)
require.Eventually(t, func() bool {
return int64(105) == f.Get("destName", "dest1").config.limit // will reduce by 50% since there is an error in the last 1 second
}, 1*time.Second, 10*time.Millisecond)
})

t.Run("max change percentage", func(t *testing.T) {
config.Set("Router.throttler.adaptiveRateLimit.destName.dest1.maxChangePercentage", 20)
require.Eventually(t, func() bool {
return int64(126) == f.Get("destName", "dest1").config.limit // will increase by 20% since there is no error in the last 2 seconds
}, 2*time.Second, 10*time.Millisecond)
})

t.Run("adaptive rate limit disabled", func(t *testing.T) {
config.Set("Router.throttler.adaptiveRateLimit.enabled", false)
f.SetLimitReached("dest1")
require.Eventually(t, func() bool {
return int64(126) == f.Get("destName", "dest1").config.limit // will not change since adaptive rate limiter is disabled
}, 2*time.Second, 10*time.Millisecond)
config.Set("Router.throttler.adaptiveRateLimit.enabled", true)
})

t.Run("destination id adaptive rate limit disabled", func(t *testing.T) {
config.Set("Router.throttler.adaptiveRateLimit.destName.dest1.enabled", false)
config.Set("Router.throttler.adaptiveRateLimit.destName.dest2.enabled", true)
f.SetLimitReached("dest1")
f.SetLimitReached("dest2")
require.Eventually(t, func() bool {
return int64(126) == f.Get("destName", "dest1").config.limit // will not change since adaptive rate limiter is disabled for destination
}, 2*time.Second, 10*time.Millisecond)
require.Eventually(t, func() bool {
return int64(175) == f.Get("destName", "dest2").config.limit // will reduce by 30% since there is an error in the last 1 second
}, time.Second, 10*time.Millisecond)
})

t.Run("destination name adaptive rate limit disabled", func(t *testing.T) {
config.Set("Router.throttler.adaptiveRateLimit.destName.enabled", false)
require.Eventually(t, func() bool {
return int64(126) == f.Get("destName", "dest1").config.limit // will not change since adaptive rate limiter is disabled for destination
}, 2*time.Second, 10*time.Millisecond)
require.Eventually(t, func() bool {
return int64(192) == f.Get("destName", "dest2").config.limit // will not change since adaptive rate limiter is disabled for destination
}, 2*time.Second, 10*time.Millisecond)
})
}
40 changes: 30 additions & 10 deletions router/throttler/throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,20 @@ type limiter interface {
}

type Factory struct {
Stats stats.Stats
limiter limiter
throttlers map[string]*Throttler // map key is the destinationID
throttlersMu sync.Mutex
Stats stats.Stats
limiter limiter
throttlers map[string]*Throttler // map key is the destinationID
throttlersMu sync.Mutex
limitReachedPerDestination chan string // channel to send destinationID when limit is reached
adaptiveRateLimiter func(destName, destID string, limit int64) int64
}

// New constructs a new Throttler Factory
func New(stats stats.Stats) (*Factory, error) {
f := Factory{
Stats: stats,
throttlers: make(map[string]*Throttler),
Stats: stats,
throttlers: make(map[string]*Throttler),
limitReachedPerDestination: make(chan string),
}
if err := f.initThrottlerFactory(); err != nil {
return nil, err
Expand All @@ -47,18 +50,30 @@ func (f *Factory) Get(destName, destID string) *Throttler {
f.throttlersMu.Lock()
defer f.throttlersMu.Unlock()
if t, ok := f.throttlers[destID]; ok {
if f.adaptiveRateLimiter != nil {
t.config.limit = f.adaptiveRateLimiter(destName, destID, t.config.limit)
}
return t
}

var conf throttlingConfig
conf.readThrottlingConfig(destName, destID)
if f.adaptiveRateLimiter != nil {
conf.limit = f.adaptiveRateLimiter(destName, destID, conf.limit)
}
f.throttlers[destID] = &Throttler{
limiter: f.limiter,
config: conf,
}
return f.throttlers[destID]
}

func (f *Factory) SetLimitReached(destID string) {
f.throttlersMu.Lock()
defer f.throttlersMu.Unlock()
f.limitReachedPerDestination <- destID
}

func (f *Factory) initThrottlerFactory() error {
var redisClient *redis.Client
if config.IsSet("Router.throttler.redis.addr") {
Expand All @@ -69,10 +84,15 @@ func (f *Factory) initThrottlerFactory() error {
})
}

throttlingAlgorithm := config.GetString("Router.throttler.algorithm", throttlingAlgoTypeGCRA)
if throttlingAlgorithm == throttlingAlgoTypeRedisGCRA || throttlingAlgorithm == throttlingAlgoTypeRedisSortedSet {
if redisClient == nil {
return fmt.Errorf("redis client is nil with algorithm %s", throttlingAlgorithm)
throttlingAlgorithm := throttlingAlgoTypeGCRA
if config.GetBool("Router.throttler.adaptiveRateLimit.enabled", false) {
f.adaptiveRateLimiter = SetupRouterAdaptiveRateLimiter(context.Background(), f.limitReachedPerDestination)
} else {
throttlingAlgorithm = config.GetString("Router.throttler.algorithm", throttlingAlgoTypeGCRA)
if throttlingAlgorithm == throttlingAlgoTypeRedisGCRA || throttlingAlgorithm == throttlingAlgoTypeRedisSortedSet {
if redisClient == nil {
return fmt.Errorf("redis client is nil with algorithm %s", throttlingAlgorithm)
}
}
}

Expand Down