Skip to content

feat: support for adaptive rate limiting [PIPE-481] #4160

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 18 commits into from
Dec 6, 2023
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion app/apphandlers/embeddedAppHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options)
enrichers,
processor.WithAdaptiveLimit(adaptiveLimit),
)
throttlerFactory, err := rtThrottler.New(stats.Default)
throttlerFactory, err := rtThrottler.NewFactory(config, stats.Default)
if err != nil {
return fmt.Errorf("failed to create rt throttler factory: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion app/apphandlers/processorAppHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options
enrichers,
proc.WithAdaptiveLimit(adaptiveLimit),
)
throttlerFactory, err := throttler.New(stats.Default)
throttlerFactory, err := throttler.NewFactory(config.Default, stats.Default)
if err != nil {
return fmt.Errorf("failed to create throttler factory: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion router/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type Factory struct {
ProcErrorDB jobsdb.JobsDB
TransientSources transientsource.Service
RsourcesService rsources.JobService
ThrottlerFactory *throttler.Factory
ThrottlerFactory throttler.Factory
Debugger destinationdebugger.DestinationDebugger
AdaptiveLimit func(int64) int64
}
Expand Down
17 changes: 12 additions & 5 deletions router/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type Handle struct {
// external dependencies
jobsDB jobsdb.JobsDB
errorDB jobsdb.JobsDB
throttlerFactory *rtThrottler.Factory
throttlerFactory rtThrottler.Factory
backendConfig backendconfig.BackendConfig
Reporting reporter
transientSources transientsource.Service
Expand Down Expand Up @@ -305,6 +305,17 @@ func (rt *Handle) commitStatusList(workerJobStatuses *[]workerJobStatus) {
if err != nil {
rt.logger.Error("Unmarshal of job parameters failed. ", string(workerJobStatus.job.Parameters))
}
errorCode, err := strconv.Atoi(workerJobStatus.status.ErrorCode)
if err != nil {
errorCode = 200
}
if rt.throttlerFactory != nil {
rt.throttlerFactory.Get(rt.destType, parameters.DestinationID).ResponseCodeReceived(errorCode) // send response code to throttler
} else {
rt.logger.Debugf(`[%v Router] :: ThrottlerFactory is nil. Not throttling destination with ID %s`,
rt.destType, parameters.DestinationID,
)
}
// Update metrics maps
// REPORTING - ROUTER - START
workspaceID := workerJobStatus.status.WorkspaceId
Expand All @@ -319,10 +330,6 @@ func (rt *Handle) commitStatusList(workerJobStatuses *[]workerJobStatus) {
}
sd, ok := statusDetailsMap[key]
if !ok {
errorCode, err := strconv.Atoi(workerJobStatus.status.ErrorCode)
if err != nil {
errorCode = 200 // TODO handle properly
}
sampleEvent := workerJobStatus.job.EventPayload
if rt.transientSources.Apply(parameters.SourceID) {
sampleEvent = routerutils.EmptyPayload
Expand Down
5 changes: 4 additions & 1 deletion router/handle_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,10 @@ func (rt *Handle) Shutdown() {
rt.logger.Infof("Shutting down router: %s", rt.destType)
rt.backgroundCancel()

<-rt.startEnded // wait for all workers to stop first
<-rt.startEnded // wait for all workers to stop first
if rt.throttlerFactory != nil {
rt.throttlerFactory.Shutdown()
}
close(rt.responseQ) // now it is safe to close the response channel
_ = rt.backgroundWait()
}
Expand Down
52 changes: 52 additions & 0 deletions router/throttler/adaptive.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package throttler

import (
"context"
"fmt"
)

type adaptiveThrottler struct {
limiter limiter
algorithm adaptiveAlgorithm
config adaptiveConfig
}

// CheckLimitReached returns true if we're not allowed to process the number of events we asked for with cost.
func (t *adaptiveThrottler) CheckLimitReached(key string, cost int64) (limited bool, retErr error) {
if !t.config.enabled {
return false, nil
}
if t.config.minLimit.Load() > t.config.maxLimit.Load() {
return false, fmt.Errorf("minLimit %d is greater than maxLimit %d", t.config.minLimit.Load(), t.config.maxLimit.Load())
}
t.computeLimit()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There might be a potential data race issue with the computeLimit method being called from CheckLimitReached and getLimit. Ensure that the computeLimit method is thread-safe or consider adding synchronization mechanisms.

Also applies to: 50-50

allowed, _, err := t.limiter.Allow(context.TODO(), cost, t.config.limit, getWindowInSecs(t.config.window.Load()), key)
if err != nil {
return false, fmt.Errorf("could not limit: %w", err)
}
if !allowed {
return true, nil // no token to return when limited
}
return false, nil
}

func (t *adaptiveThrottler) computeLimit() {
if t.config.limit == 0 {
t.config.limit = t.config.maxLimit.Load()
}
t.config.limit += int64(float64(t.config.limit) * t.algorithm.LimitFactor())
t.config.limit = max(t.config.minLimit.Load(), min(t.config.limit, t.config.maxLimit.Load()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The computeLimit method modifies t.config.limit which could be accessed concurrently, leading to a race condition. Consider adding concurrency control, such as a mutex, to protect this shared state.

+ import (
+ 	"sync"
+ )

type adaptiveThrottler struct {
+ 	mu        sync.Mutex
	limiter   limiter
	algorithm adaptiveAlgorithm
	config    adaptiveThrottleConfig
}

func (t *adaptiveThrottler) computeLimit() {
+ 	t.mu.Lock()
+ 	defer t.mu.Unlock()
	if t.config.limit == 0 {
		t.config.limit = t.config.maxLimit.Load()
	}
	t.config.limit += int64(float64(t.config.limit) * t.algorithm.LimitFactor())
	t.config.limit = max(t.config.minLimit.Load(), min(t.config.limit, t.config.maxLimit.Load()))
}

Committable suggestion

IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
func (t *adaptiveThrottler) computeLimit() {
if t.config.limit == 0 {
t.config.limit = t.config.maxLimit.Load()
}
t.config.limit += int64(float64(t.config.limit) * t.algorithm.LimitFactor())
t.config.limit = max(t.config.minLimit.Load(), min(t.config.limit, t.config.maxLimit.Load()))
import (
"sync"
)
type adaptiveThrottler struct {
mu sync.Mutex
limiter limiter
algorithm adaptiveAlgorithm
config adaptiveThrottleConfig
}
func (t *adaptiveThrottler) computeLimit() {
t.mu.Lock()
defer t.mu.Unlock()
if t.config.limit == 0 {
t.config.limit = t.config.maxLimit.Load()
}
t.config.limit += int64(float64(t.config.limit) * t.algorithm.LimitFactor())
t.config.limit = max(t.config.minLimit.Load(), min(t.config.limit, t.config.maxLimit.Load()))
}

}

func (t *adaptiveThrottler) ResponseCodeReceived(code int) {
t.algorithm.ResponseCodeReceived(code)
}

func (t *adaptiveThrottler) ShutDown() {
t.algorithm.ShutDown()
}

func (t *adaptiveThrottler) getLimit() int64 {
t.computeLimit()
return t.config.limit
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider optimizing the getLimit method to avoid unnecessary recomputation of the limit if it is already up to date.

}
102 changes: 102 additions & 0 deletions router/throttler/adaptiveAlgorithmCounter/algorithm.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package adaptivethrottlercounter

import (
"context"
"net/http"
"sync"
"time"

"github.com/rudderlabs/rudder-go-kit/config"
)

type timer struct {
frequency time.Duration
limitReached bool
mu sync.Mutex
limitSet bool
cancel context.CancelFunc
}

type Adaptive struct {
shortTimer *timer
longTimer *timer
decreaseLimitPercentage *config.Reloadable[int64]
increaseChangePercentage *config.Reloadable[int64]
}

func New(config *config.Config) *Adaptive {
shortTimeFrequency := config.GetDuration("Router.throttler.adaptive.shortTimeFrequency", 5, time.Second)
longTimeFrequency := config.GetDuration("Router.throttler.adaptive.longTimeFrequency", 15, time.Second)

shortTimer := &timer{
frequency: shortTimeFrequency,
}
longTimer := &timer{
frequency: longTimeFrequency,
}

go shortTimer.run()
go longTimer.run()

return &Adaptive{
shortTimer: shortTimer,
longTimer: longTimer,
decreaseLimitPercentage: config.GetReloadableInt64Var(30, 1, "Router.throttler.adaptive.decreaseLimitPercentage"),
increaseChangePercentage: config.GetReloadableInt64Var(10, 1, "Router.throttler.adaptive.increaseLimitPercentage"),
}
}

func (a *Adaptive) LimitFactor() float64 {
if a.shortTimer.getLimitReached() && !a.shortTimer.limitSet {
a.shortTimer.limitSet = true
return float64(-a.decreaseLimitPercentage.Load()) / 100
} else if !a.longTimer.getLimitReached() && !a.longTimer.limitSet {
a.longTimer.limitSet = true
return float64(a.increaseChangePercentage.Load()) / 100
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The limitSet field is being accessed and modified without synchronization, which could lead to data races. Ensure that the mutex lock is acquired before setting limitSet.

func (a *Adaptive) LimitFactor() float64 {
	if a.shortTimer.getLimitReached() && !a.shortTimer.limitSet {
+		a.shortTimer.mu.Lock()
		a.shortTimer.limitSet = true
+		a.shortTimer.mu.Unlock()
		return float64(-a.decreaseLimitPercentage.Load()) / 100
	} else if !a.longTimer.getLimitReached() && !a.longTimer.limitSet {
+		a.longTimer.mu.Lock()
		a.longTimer.limitSet = true
+		a.longTimer.mu.Unlock()
		return float64(a.increaseChangePercentage.Load()) / 100
	}
	return 0.0
}

Committable suggestion

IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
if a.shortTimer.getLimitReached() && !a.shortTimer.limitSet {
a.shortTimer.limitSet = true
return float64(-a.decreaseLimitPercentage.Load()) / 100
} else if !a.longTimer.getLimitReached() && !a.longTimer.limitSet {
a.longTimer.limitSet = true
return float64(a.increaseChangePercentage.Load()) / 100
if a.shortTimer.getLimitReached() && !a.shortTimer.limitSet {
a.shortTimer.mu.Lock()
a.shortTimer.limitSet = true
a.shortTimer.mu.Unlock()
return float64(-a.decreaseLimitPercentage.Load()) / 100
} else if !a.longTimer.getLimitReached() && !a.longTimer.limitSet {
a.longTimer.mu.Lock()
a.longTimer.limitSet = true
a.longTimer.mu.Unlock()
return float64(a.increaseChangePercentage.Load()) / 100

}
return 0.0
}

func (a *Adaptive) ResponseCodeReceived(code int) {
if code == http.StatusTooManyRequests {
a.shortTimer.updateLimitReached()
a.longTimer.updateLimitReached()
}
}

func (a *Adaptive) ShutDown() {
a.shortTimer.cancel()
a.longTimer.cancel()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before calling cancel on the timers, check if it is not nil to avoid a potential nil pointer dereference.

func (a *Adaptive) ShutDown() {
+	if a.shortTimer.cancel != nil {
	    a.shortTimer.cancel()
+	}
+	if a.longTimer.cancel != nil {
	    a.longTimer.cancel()
+	}
}

Committable suggestion

IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
a.shortTimer.cancel()
a.longTimer.cancel()
if a.shortTimer.cancel != nil {
a.shortTimer.cancel()
}
if a.longTimer.cancel != nil {
a.longTimer.cancel()
}

}

func (t *timer) run() {
ctx, cancel := context.WithCancel(context.Background())
t.cancel = cancel
for {
select {
case <-ctx.Done():
return
case <-time.After(t.frequency):
t.resetLimitReached()
}
}
}

func (t *timer) updateLimitReached() {
t.mu.Lock()
defer t.mu.Unlock()
t.limitReached = true
}

func (t *timer) getLimitReached() bool {
t.mu.Lock()
defer t.mu.Unlock()
return t.limitReached
}

func (t *timer) resetLimitReached() {
t.mu.Lock()
defer t.mu.Unlock()
t.limitReached = false
t.limitSet = false
}
33 changes: 33 additions & 0 deletions router/throttler/adaptiveAlgorithmCounter/algorithm_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package adaptivethrottlercounter

import (
"math"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/rudderlabs/rudder-go-kit/config"
)

const float64EqualityThreshold = 1e-9

func TestAdaptiveRateLimit(t *testing.T) {
config := config.New()
config.Set("Router.throttler.adaptive.shortTimeFrequency", 1*time.Second)
config.Set("Router.throttler.adaptive.longTimeFrequency", 2*time.Second)
al := New(config)

t.Run("when there is a 429 in the last shortTimeFrequency", func(t *testing.T) {
al.ResponseCodeReceived(429)
require.Eventually(t, func() bool {
return math.Abs(al.LimitFactor()-float64(-0.3)) < float64EqualityThreshold // reduces by 30% since there is an error in the last 1 second
}, 2*time.Second, 10*time.Millisecond)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test description "when there is a 429 in the last shortTimeFrequency" could be more descriptive. It should clarify that it expects a reduction in the limit factor due to a 429 response.

})

t.Run("when there are no 429 ins the last longTimeFrequency", func(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a typo in the test description; "ins" should be "in".

- t.Run("when there are no 429 ins the last longTimeFrequency", func(t *testing.T) {
+ t.Run("when there are no 429s in the last longTimeFrequency", func(t *testing.T) {

Committable suggestion

IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
t.Run("when there are no 429 ins the last longTimeFrequency", func(t *testing.T) {
t.Run("when there are no 429s in the last longTimeFrequency", func(t *testing.T) {

require.Eventually(t, func() bool {
return math.Abs(al.LimitFactor()-float64(+0.1)) < float64EqualityThreshold // increases by 10% since there is no error in the last 2 seconds
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The state of the adaptive limiter is not reset between test cases, which could lead to the second test case being affected by the first. Consider resetting the state or creating a new instance for each test case to ensure they are independent.

}, 3*time.Second, 10*time.Millisecond)
})
}
29 changes: 29 additions & 0 deletions router/throttler/algorithm.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package throttler

import (
"github.com/rudderlabs/rudder-go-kit/config"
adaptiveAlgorithmCounter "github.com/rudderlabs/rudder-server/router/throttler/adaptiveAlgorithmCounter"
)

type adaptiveAlgorithm interface {
// ResponseCodeReceived is called when a response is received from the destination
ResponseCodeReceived(code int)
// ShutDown is called when the throttler is shutting down
ShutDown()
// limitFactor returns a factor between 0 and 1 that is used to multiply the limit
LimitFactor() float64
}

const (
throttlingAdaptiveAlgoTypeCounter = "counter"
)

func newAdaptiveAlgorithm(config *config.Config) adaptiveAlgorithm {
name := config.GetString("Router.throttler.adaptive.algorithm", "")
switch name {
case throttlingAdaptiveAlgoTypeCounter:
return adaptiveAlgorithmCounter.New(config)
default:
return &noopAdaptiveAlgorithm{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding error handling or logging when an unknown algorithm type is specified in the configuration, to aid in debugging configuration issues.

	default:
+		log.Warnf("Unknown adaptive algorithm type: %s", name)
		return &noopAdaptiveAlgorithm{}

Committable suggestion

IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
default:
return &noopAdaptiveAlgorithm{}
default:
log.Warnf("Unknown adaptive algorithm type: %s", name)
return &noopAdaptiveAlgorithm{}

}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The newAdaptiveAlgorithm function should check if the config parameter is nil before attempting to access its methods to prevent potential nil pointer dereference.

func newAdaptiveAlgorithm(config *config.Config) adaptiveAlgorithm {
+	if config == nil {
+		return &noopAdaptiveAlgorithm{}
+	}
	name := config.GetString("Router.throttler.adaptive.algorithm", "")
	// ... rest of the function
}

Committable suggestion

IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
func newAdaptiveAlgorithm(config *config.Config) adaptiveAlgorithm {
name := config.GetString("Router.throttler.adaptive.algorithm", "")
switch name {
case throttlingAdaptiveAlgoTypeCounter:
return adaptiveAlgorithmCounter.New(config)
default:
return &noopAdaptiveAlgorithm{}
}
func newAdaptiveAlgorithm(config *config.Config) adaptiveAlgorithm {
if config == nil {
return &noopAdaptiveAlgorithm{}
}
name := config.GetString("Router.throttler.adaptive.algorithm", "")
switch name {
case throttlingAdaptiveAlgoTypeCounter:
return adaptiveAlgorithmCounter.New(config)
default:
return &noopAdaptiveAlgorithm{}
}
}

}
15 changes: 15 additions & 0 deletions router/throttler/algorithm_noop.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package throttler

type noopAdaptiveAlgorithm struct{}

func (ba *noopAdaptiveAlgorithm) ResponseCodeReceived(code int) {
// no-op
}

func (ba *noopAdaptiveAlgorithm) ShutDown() {
// no-op
}

func (ba *noopAdaptiveAlgorithm) LimitFactor() float64 {
return 1.0
}
50 changes: 50 additions & 0 deletions router/throttler/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package throttler

import (
"fmt"
"time"

"github.com/rudderlabs/rudder-go-kit/config"
)

type normalConfig struct {
enabled bool
limit int64
window time.Duration
}

func (c *normalConfig) readThrottlingConfig(config *config.Config, destName, destID string) {
if config.IsSet(fmt.Sprintf(`Router.throttler.%s.%s.limit`, destName, destID)) {
c.limit = config.GetInt64(fmt.Sprintf(`Router.throttler.%s.%s.limit`, destName, destID), 0)
} else {
c.limit = config.GetInt64(fmt.Sprintf(`Router.throttler.%s.limit`, destName), 0)
}

if config.IsSet(fmt.Sprintf(`Router.throttler.%s.%s.timeWindow`, destName, destID)) {
c.window = config.GetDuration(fmt.Sprintf(`Router.throttler.%s.%s.timeWindow`, destName, destID), 0, time.Second)
} else {
c.window = config.GetDuration(fmt.Sprintf(`Router.throttler.%s.timeWindow`, destName), 0, time.Second)
}

// enable dest throttler
if c.limit > 0 && c.window > 0 {
c.enabled = true
}
}

type adaptiveConfig struct {
enabled bool
limit int64
window *config.Reloadable[time.Duration]
minLimit *config.Reloadable[int64]
maxLimit *config.Reloadable[int64]
}

func (c *adaptiveConfig) readThrottlingConfig(config *config.Config, destName, destID string) {
c.window = config.GetReloadableDurationVar(1, time.Second, fmt.Sprintf(`Router.throttler.adaptive.%s.timeWindow`, destID), fmt.Sprintf(`Router.throttler.adaptive.%s.timeWindow`, destName), `Router.throttler.adaptive.timeWindow`, fmt.Sprintf(`Router.throttler.%s.%s.timeWindow`, destName, destID), fmt.Sprintf(`Router.throttler.%s.timeWindow`, destName))
c.minLimit = config.GetReloadableInt64Var(1, 1, fmt.Sprintf(`Router.throttler.adaptive.%s.minLimit`, destID), fmt.Sprintf(`Router.throttler.adaptive.%s.minLimit`, destName), `Router.throttler.adaptive.minLimit`, fmt.Sprintf(`Router.throttler.%s.%s.limit`, destName, destID), fmt.Sprintf(`Router.throttler.%s.limit`, destName))
c.maxLimit = config.GetReloadableInt64Var(250, 1, fmt.Sprintf(`Router.throttler.adaptive.%s.maxLimit`, destID), fmt.Sprintf(`Router.throttler.adaptive.%s.maxLimit`, destName), `Router.throttler.adaptive.maxLimit`, fmt.Sprintf(`Router.throttler.%s.%s.limit`, destName, destID), fmt.Sprintf(`Router.throttler.%s.limit`, destName))
if c.window.Load() > 0 {
c.enabled = true
}
}
Loading