Skip to content

Commit

Permalink
address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
BonapartePC committed Nov 23, 2023
1 parent 1204bb0 commit d8302b1
Show file tree
Hide file tree
Showing 17 changed files with 568 additions and 369 deletions.
2 changes: 1 addition & 1 deletion app/apphandlers/embeddedAppHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options)
enrichers,
processor.WithAdaptiveLimit(adaptiveLimit),
)
throttlerFactory, err := rtThrottler.New(stats.Default, config)
throttlerFactory, err := rtThrottler.NewFactory(config, stats.Default)
if err != nil {
return fmt.Errorf("failed to create rt throttler factory: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion app/apphandlers/processorAppHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options
enrichers,
proc.WithAdaptiveLimit(adaptiveLimit),
)
throttlerFactory, err := throttler.New(stats.Default, config.Default)
throttlerFactory, err := throttler.NewFactory(config.Default, stats.Default)
if err != nil {
return fmt.Errorf("failed to create throttler factory: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion router/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type Factory struct {
ProcErrorDB jobsdb.JobsDB
TransientSources transientsource.Service
RsourcesService rsources.JobService
ThrottlerFactory *throttler.Factory
ThrottlerFactory throttler.Factory
Debugger destinationdebugger.DestinationDebugger
AdaptiveLimit func(int64) int64
}
Expand Down
18 changes: 10 additions & 8 deletions router/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ type Handle struct {
// external dependencies
jobsDB jobsdb.JobsDB
errorDB jobsdb.JobsDB
throttlerFactory *rtThrottler.Factory
throttlerFactory rtThrottler.Factory
backendConfig backendconfig.BackendConfig
Reporting reporter
transientSources transientsource.Service
Expand Down Expand Up @@ -292,6 +292,15 @@ func (rt *Handle) commitStatusList(workerJobStatuses *[]workerJobStatus) {
if err != nil {
rt.logger.Error("Unmarshal of job parameters failed. ", string(workerJobStatus.job.Parameters))
}
errorCode, err := strconv.Atoi(workerJobStatus.status.ErrorCode)
if err != nil {
errorCode = 200
}
if rt.throttlerFactory != nil {
rt.throttlerFactory.Get(rt.destType, parameters.DestinationID).ResponseCodeReceived(errorCode) // send response code to throttler
} else {
rt.logger.Debugf("[%v Router] :: ThrottlerFactory is nil. Not sending response code to throttler", rt.destType)
}
// Update metrics maps
// REPORTING - ROUTER - START
workspaceID := workerJobStatus.status.WorkspaceId
Expand All @@ -306,10 +315,6 @@ func (rt *Handle) commitStatusList(workerJobStatuses *[]workerJobStatus) {
}
sd, ok := statusDetailsMap[key]
if !ok {
errorCode, err := strconv.Atoi(workerJobStatus.status.ErrorCode)
if err != nil {
errorCode = 200 // TODO handle properly
}
sampleEvent := workerJobStatus.job.EventPayload
if rt.transientSources.Apply(parameters.SourceID) {
sampleEvent = routerutils.EmptyPayload
Expand All @@ -324,9 +329,6 @@ func (rt *Handle) commitStatusList(workerJobStatuses *[]workerJobStatus) {
if workerJobStatus.status.AttemptNum == 1 {
sd.Count++
}
if workerJobStatus.status.ErrorCode == strconv.Itoa(http.StatusTooManyRequests) {
rt.throttlerFactory.SetLimitReached(parameters.DestinationID)
}
}
case jobsdb.Succeeded.State, jobsdb.Filtered.State:
routerWorkspaceJobStatusCount[workspaceID]++
Expand Down
2 changes: 1 addition & 1 deletion router/handle_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ func (rt *Handle) Shutdown() {
rt.backgroundCancel()

<-rt.startEnded // wait for all workers to stop first
rt.throttlerFactory.ShutDown()
rt.throttlerFactory.Shutdown()
close(rt.responseQ) // now it is safe to close the response channel
_ = rt.backgroundWait()
}
Expand Down
127 changes: 23 additions & 104 deletions router/throttler/adaptive.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,122 +3,41 @@ package throttler
import (
"context"
"fmt"
"sync"
"time"

"github.com/rudderlabs/rudder-go-kit/config"
)

type timer struct {
frequency *config.Reloadable[time.Duration]
limitReached map[string]bool
mu sync.Mutex
limitSet bool
cancel context.CancelFunc
}

type adaptiveConfig struct {
enabled bool
minLimit *config.Reloadable[int64]
maxLimit *config.Reloadable[int64]
minChangePercentage *config.Reloadable[int64]
maxChangePercentage *config.Reloadable[int64]
type adaptiveThrottler struct {
limiter limiter
algorithm adaptiveAlgorithm
config adaptiveConfig
}

type Adaptive struct {
shortTimer *timer
longTimer *timer
config adaptiveConfig
}

func NewAdaptive(config *config.Config) *Adaptive {
shortTimeFrequency := config.GetReloadableDurationVar(5, time.Second, "Router.throttler.adaptiveRateLimit.shortTimeFrequency")
longTimeFrequency := config.GetReloadableDurationVar(15, time.Second, "Router.throttler.adaptiveRateLimit.longTimeFrequency")

shortTimer := &timer{
frequency: shortTimeFrequency,
limitReached: make(map[string]bool),
// CheckLimitReached returns true if we're not allowed to process the number of events we asked for with cost.
func (t *adaptiveThrottler) CheckLimitReached(key string, cost int64) (limited bool, retErr error) {
if t.config.minLimit.Load() > t.config.maxLimit.Load() {
return false, fmt.Errorf("minLimit %d is greater than maxLimit %d", t.config.minLimit.Load(), t.config.maxLimit.Load())
}
longTimer := &timer{
frequency: longTimeFrequency,
limitReached: make(map[string]bool),
}

go shortTimer.run()
go longTimer.run()

return &Adaptive{
shortTimer: shortTimer,
longTimer: longTimer,
ctx := context.TODO()
t.config.limit += int64(float64(t.config.limit) * t.algorithm.LimitFactor())
t.config.limit = max(t.config.minLimit.Load(), min(t.config.limit, t.config.maxLimit.Load()))
allowed, _, err := t.limiter.Allow(ctx, cost, t.config.limit, getWindowInSecs(t.config.window), key)
if err != nil {
return false, fmt.Errorf("could not limit: %w", err)
}
}

func (a *Adaptive) loadConfig(destName, destID string) {
a.config.enabled = config.GetBoolVar(true, fmt.Sprintf(`Router.throttler.adaptiveRateLimit.%s.%s.enabled`, destName, destID), fmt.Sprintf(`Router.throttler.adaptiveRateLimit.%s.enabled`, destName), `Router.throttler.adaptiveRateLimit.enabled`)
a.config.minLimit = config.GetReloadableInt64Var(1, 1, fmt.Sprintf(`Router.throttler.adaptiveRateLimit.%s.%s.minLimit`, destName, destID), fmt.Sprintf(`Router.throttler.adaptiveRateLimit.%s.minLimit`, destName), `Router.throttler.adaptiveRateLimit.minLimit`, fmt.Sprintf(`Router.throttler.%s.%s.limit`, destName, destID), fmt.Sprintf(`Router.throttler.%s.limit`, destName))
a.config.maxLimit = config.GetReloadableInt64Var(250, 1, fmt.Sprintf(`Router.throttler.adaptiveRateLimit.%s.%s.maxLimit`, destName, destID), fmt.Sprintf(`Router.throttler.adaptiveRateLimit.%s.maxLimit`, destName), `Router.throttler.adaptiveRateLimit.maxLimit`, fmt.Sprintf(`Router.throttler.%s.%s.limit`, destName, destID), fmt.Sprintf(`Router.throttler.%s.limit`, destName))
a.config.minChangePercentage = config.GetReloadableInt64Var(30, 1, fmt.Sprintf(`Router.throttler.adaptiveRateLimit.%s.%s.minChangePercentage`, destName, destID), fmt.Sprintf(`Router.throttler.adaptiveRateLimit.%s.minChangePercentage`, destName), `Router.throttler.adaptiveRateLimit.minChangePercentage`)
a.config.maxChangePercentage = config.GetReloadableInt64Var(10, 1, fmt.Sprintf(`Router.throttler.adaptiveRateLimit.%s.%s.maxChangePercentage`, destName, destID), fmt.Sprintf(`Router.throttler.adaptiveRateLimit.%s.maxChangePercentage`, destName), `Router.throttler.adaptiveRateLimit.maxChangePercentage`)
}

func (a *Adaptive) Limit(destName, destID string, limit int64) int64 {
a.loadConfig(destName, destID)
if !a.config.enabled || a.config.minLimit.Load() > a.config.maxLimit.Load() {
return limit
}
if limit <= 0 {
limit = a.config.maxLimit.Load()
}
newLimit := limit
if a.shortTimer.getLimitReached(destID) && !a.shortTimer.limitSet {
newLimit = limit - (limit * a.config.minChangePercentage.Load() / 100)
a.shortTimer.limitSet = true
} else if !a.longTimer.getLimitReached(destID) && !a.longTimer.limitSet {
newLimit = limit + (limit * a.config.maxChangePercentage.Load() / 100)
a.longTimer.limitSet = true
}
newLimit = max(a.config.minLimit.Load(), min(newLimit, a.config.maxLimit.Load()))
return newLimit
}

func (a *Adaptive) SetLimitReached(destID string) {
a.shortTimer.updateLimitReached(destID)
a.longTimer.updateLimitReached(destID)
}

func (a *Adaptive) ShutDown() {
a.shortTimer.cancel()
a.longTimer.cancel()
}

func (t *timer) run() {
ctx, cancel := context.WithCancel(context.Background())
t.cancel = cancel
for {
select {
case <-ctx.Done():
return
case <-time.After(t.frequency.Load()):
t.resetLimitReached()
}
if !allowed {
return true, nil // no token to return when limited
}
return false, nil
}

func (t *timer) updateLimitReached(destID string) {
t.mu.Lock()
defer t.mu.Unlock()
t.limitReached[destID] = true
func (t *adaptiveThrottler) ResponseCodeReceived(code int) {
t.algorithm.ResponseCodeReceived(code)
}

func (t *timer) getLimitReached(destID string) bool {
t.mu.Lock()
defer t.mu.Unlock()
return t.limitReached[destID]
func (t *adaptiveThrottler) ShutDown() {
t.algorithm.ShutDown()
}

func (t *timer) resetLimitReached() {
t.mu.Lock()
defer t.mu.Unlock()
clear(t.limitReached)
t.limitSet = false
func (t *adaptiveThrottler) getLimit() int64 {
return t.config.limit
}
100 changes: 100 additions & 0 deletions router/throttler/adaptiveAlgorithmCounter/algorithm.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package adaptivethrottlercounter

import (
"context"
"net/http"
"sync"
"time"

"github.com/rudderlabs/rudder-go-kit/config"
)

type timer struct {
frequency *config.Reloadable[time.Duration]
limitReached bool
mu sync.Mutex
limitSet bool
cancel context.CancelFunc
}

type Adaptive struct {
shortTimer *timer
longTimer *timer
}

func New(config *config.Config) *Adaptive {
shortTimeFrequency := config.GetReloadableDurationVar(5, time.Second, "Router.throttler.adaptiveRateLimit.shortTimeFrequency")
longTimeFrequency := config.GetReloadableDurationVar(15, time.Second, "Router.throttler.adaptiveRateLimit.longTimeFrequency")

shortTimer := &timer{
frequency: shortTimeFrequency,
}
longTimer := &timer{
frequency: longTimeFrequency,
}

go shortTimer.run()
go longTimer.run()

return &Adaptive{
shortTimer: shortTimer,
longTimer: longTimer,
}
}

func (a *Adaptive) LimitFactor() float64 {
decreaseLimitPercentage := config.GetInt64("Router.throttler.adaptiveRateLimit.decreaseLimitPercentage", 30)
increaseChangePercentage := config.GetInt64("Router.throttler.adaptiveRateLimit.increaseChangePercentage", 10)
if a.shortTimer.getLimitReached() && !a.shortTimer.limitSet {
a.shortTimer.limitSet = true
return float64(-decreaseLimitPercentage) / 100
} else if !a.longTimer.getLimitReached() && !a.longTimer.limitSet {
a.longTimer.limitSet = true
return float64(increaseChangePercentage) / 100
}
return 0.0
}

func (a *Adaptive) ResponseCodeReceived(code int) {
if code == http.StatusTooManyRequests {
a.shortTimer.updateLimitReached()
a.longTimer.updateLimitReached()
}
}

func (a *Adaptive) ShutDown() {
a.shortTimer.cancel()
a.longTimer.cancel()
}

func (t *timer) run() {
ctx, cancel := context.WithCancel(context.Background())
t.cancel = cancel
for {
select {
case <-ctx.Done():
return
case <-time.After(t.frequency.Load()):
t.resetLimitReached()
}
}
}

func (t *timer) updateLimitReached() {
t.mu.Lock()
defer t.mu.Unlock()
t.limitReached = true
}

func (t *timer) getLimitReached() bool {
t.mu.Lock()
defer t.mu.Unlock()
return t.limitReached
}

func (t *timer) resetLimitReached() {
t.mu.Lock()
defer t.mu.Unlock()
t.limitReached = false
t.limitSet = false
}
30 changes: 30 additions & 0 deletions router/throttler/adaptiveAlgorithmCounter/algorithm_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package adaptivethrottlercounter

import (
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/rudderlabs/rudder-go-kit/config"
)

func TestAdaptiveRateLimit(t *testing.T) {
config := config.New() // TODO: change to config.New()
config.Set("Router.throttler.adaptiveRateLimit.shortTimeFrequency", 1*time.Second)
config.Set("Router.throttler.adaptiveRateLimit.longTimeFrequency", 2*time.Second)
al := New(config)

t.Run("when there is a 429 in the last shortTimeFrequency", func(t *testing.T) {
al.ResponseCodeReceived(429)
require.Eventually(t, func() bool {
return al.LimitFactor() == float64(-0.3) // reduces by 30% since there is an error in the last 1 second
}, time.Second, 10*time.Millisecond)
})

t.Run("when there are no 429 ins the last longTimeFrequency", func(t *testing.T) {
require.Eventually(t, func() bool {
return al.LimitFactor() == float64(+0.1) // increases by 10% since there is no error in the last 2 seconds
}, 2*time.Second, 10*time.Millisecond)
})
}
Loading

0 comments on commit d8302b1

Please sign in to comment.