Skip to content

Commit d711148

Browse files
authored
feat: support for adaptive rate limiting [PIPE-481] (#4160)
1 parent 57f1af1 commit d711148

20 files changed

+789
-184
lines changed

app/apphandlers/embeddedAppHandler.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -271,7 +271,7 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options)
271271
enrichers,
272272
processor.WithAdaptiveLimit(adaptiveLimit),
273273
)
274-
throttlerFactory, err := rtThrottler.New(stats.Default)
274+
throttlerFactory, err := rtThrottler.NewFactory(config, stats.Default)
275275
if err != nil {
276276
return fmt.Errorf("failed to create rt throttler factory: %w", err)
277277
}

app/apphandlers/processorAppHandler.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,7 @@ func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options
273273
enrichers,
274274
proc.WithAdaptiveLimit(adaptiveLimit),
275275
)
276-
throttlerFactory, err := throttler.New(stats.Default)
276+
throttlerFactory, err := throttler.NewFactory(config.Default, stats.Default)
277277
if err != nil {
278278
return fmt.Errorf("failed to create throttler factory: %w", err)
279279
}

app/cluster/integration_test.go

+2
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import (
3434
"github.com/rudderlabs/rudder-server/router"
3535
"github.com/rudderlabs/rudder-server/router/batchrouter"
3636
routermanager "github.com/rudderlabs/rudder-server/router/manager"
37+
"github.com/rudderlabs/rudder-server/router/throttler"
3738
destinationdebugger "github.com/rudderlabs/rudder-server/services/debugger/destination"
3839
transformationdebugger "github.com/rudderlabs/rudder-server/services/debugger/transformation"
3940
"github.com/rudderlabs/rudder-server/services/fileuploader"
@@ -227,6 +228,7 @@ func TestDynamicClusterManager(t *testing.T) {
227228
TransientSources: transientsource.NewEmptyService(),
228229
RsourcesService: mockRsourcesService,
229230
TransformerFeaturesService: transformer.NewNoOpService(),
231+
ThrottlerFactory: throttler.NewNoOpThrottlerFactory(),
230232
}
231233
brtFactory := &batchrouter.Factory{
232234
Reporting: &reporting.NOOP{},

router/factory.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -23,16 +23,15 @@ type Factory struct {
2323
TransientSources transientsource.Service
2424
RsourcesService rsources.JobService
2525
TransformerFeaturesService transformerFeaturesService.FeaturesService
26-
ThrottlerFactory *throttler.Factory
26+
ThrottlerFactory throttler.Factory
2727
Debugger destinationdebugger.DestinationDebugger
2828
AdaptiveLimit func(int64) int64
2929
}
3030

3131
func (f *Factory) New(destination *backendconfig.DestinationT) *Handle {
3232
r := &Handle{
33-
Reporting: f.Reporting,
34-
throttlerFactory: f.ThrottlerFactory,
35-
adaptiveLimit: f.AdaptiveLimit,
33+
Reporting: f.Reporting,
34+
adaptiveLimit: f.AdaptiveLimit,
3635
}
3736
r.Setup(
3837
destination.DestinationDefinition,
@@ -45,6 +44,7 @@ func (f *Factory) New(destination *backendconfig.DestinationT) *Handle {
4544
f.RsourcesService,
4645
f.TransformerFeaturesService,
4746
f.Debugger,
47+
f.ThrottlerFactory,
4848
)
4949
return r
5050
}

router/handle.go

+9-11
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ type Handle struct {
5050
// external dependencies
5151
jobsDB jobsdb.JobsDB
5252
errorDB jobsdb.JobsDB
53-
throttlerFactory *rtThrottler.Factory
53+
throttlerFactory rtThrottler.Factory
5454
backendConfig backendconfig.BackendConfig
5555
Reporting reporter
5656
transientSources transientsource.Service
@@ -228,7 +228,7 @@ func (rt *Handle) pickup(ctx context.Context, partition string, workers []*worke
228228
firstJob = job
229229
}
230230
lastJob = job
231-
slot, err := rt.findWorkerSlot(workers, job, blockedOrderKeys)
231+
slot, err := rt.findWorkerSlot(ctx, workers, job, blockedOrderKeys)
232232
if err == nil {
233233
status := jobsdb.JobStatusT{
234234
JobID: job.JobID,
@@ -306,6 +306,8 @@ func (rt *Handle) commitStatusList(workerJobStatuses *[]workerJobStatus) {
306306
if err != nil {
307307
rt.logger.Error("Unmarshal of job parameters failed. ", string(workerJobStatus.job.Parameters))
308308
}
309+
errorCode, _ := strconv.Atoi(workerJobStatus.status.ErrorCode)
310+
rt.throttlerFactory.Get(rt.destType, parameters.DestinationID).ResponseCodeReceived(errorCode) // send response code to throttler
309311
// Update metrics maps
310312
// REPORTING - ROUTER - START
311313
workspaceID := workerJobStatus.status.WorkspaceId
@@ -320,10 +322,6 @@ func (rt *Handle) commitStatusList(workerJobStatuses *[]workerJobStatus) {
320322
}
321323
sd, ok := statusDetailsMap[key]
322324
if !ok {
323-
errorCode, err := strconv.Atoi(workerJobStatus.status.ErrorCode)
324-
if err != nil {
325-
errorCode = 200 // TODO handle properly
326-
}
327325
sampleEvent := workerJobStatus.job.EventPayload
328326
if rt.transientSources.Apply(parameters.SourceID) {
329327
sampleEvent = routerutils.EmptyPayload
@@ -487,7 +485,7 @@ func (rt *Handle) getQueryParams(partition string, pickUpCount int) jobsdb.GetQu
487485
return params
488486
}
489487

490-
func (rt *Handle) findWorkerSlot(workers []*worker, job *jobsdb.JobT, blockedOrderKeys map[string]struct{}) (*workerSlot, error) {
488+
func (rt *Handle) findWorkerSlot(ctx context.Context, workers []*worker, job *jobsdb.JobT, blockedOrderKeys map[string]struct{}) (*workerSlot, error) {
491489
if rt.backgroundCtx.Err() != nil {
492490
return nil, types.ErrContextCancelled
493491
}
@@ -516,7 +514,7 @@ func (rt *Handle) findWorkerSlot(workers []*worker, job *jobsdb.JobT, blockedOrd
516514
if rt.shouldBackoff(job) {
517515
return nil, types.ErrJobBackoff
518516
}
519-
if rt.shouldThrottle(job, parameters) {
517+
if rt.shouldThrottle(ctx, job, parameters) {
520518
return nil, types.ErrDestinationThrottled
521519
}
522520

@@ -557,7 +555,7 @@ func (rt *Handle) findWorkerSlot(workers []*worker, job *jobsdb.JobT, blockedOrd
557555
return nil, types.ErrBarrierExists
558556
}
559557
rt.logger.Debugf("EventOrder: job %d of orderKey %s is allowed to be processed", job.JobID, orderKey)
560-
if rt.shouldThrottle(job, parameters) {
558+
if rt.shouldThrottle(ctx, job, parameters) {
561559
blockedOrderKeys[orderKey] = struct{}{}
562560
worker.barrier.Leave(orderKey, job.JobID)
563561
slot.Release()
@@ -571,7 +569,7 @@ func (*Handle) shouldBackoff(job *jobsdb.JobT) bool {
571569
return job.LastJobStatus.JobState == jobsdb.Failed.State && job.LastJobStatus.AttemptNum > 0 && time.Until(job.LastJobStatus.RetryTime) > 0
572570
}
573571

574-
func (rt *Handle) shouldThrottle(job *jobsdb.JobT, parameters routerutils.JobParameters) (limited bool) {
572+
func (rt *Handle) shouldThrottle(ctx context.Context, job *jobsdb.JobT, parameters routerutils.JobParameters) (limited bool) {
575573
if rt.throttlerFactory == nil {
576574
// throttlerFactory could be nil when throttling is disabled or misconfigured.
577575
// in case of misconfiguration, logging errors are emitted.
@@ -584,7 +582,7 @@ func (rt *Handle) shouldThrottle(job *jobsdb.JobT, parameters routerutils.JobPar
584582
throttler := rt.throttlerFactory.Get(rt.destType, parameters.DestinationID)
585583
throttlingCost := rt.getThrottlingCost(job)
586584

587-
limited, err := throttler.CheckLimitReached(parameters.DestinationID, throttlingCost)
585+
limited, err := throttler.CheckLimitReached(ctx, parameters.DestinationID, throttlingCost)
588586
if err != nil {
589587
// we can't throttle, let's hit the destination, worst case we get a 429
590588
rt.throttlingErrorStat.Count(1)

router/handle_lifecycle.go

+5-1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"github.com/rudderlabs/rudder-server/router/internal/eventorder"
2323
"github.com/rudderlabs/rudder-server/router/internal/partition"
2424
"github.com/rudderlabs/rudder-server/router/isolation"
25+
"github.com/rudderlabs/rudder-server/router/throttler"
2526
"github.com/rudderlabs/rudder-server/router/transformer"
2627
"github.com/rudderlabs/rudder-server/router/types"
2728
routerutils "github.com/rudderlabs/rudder-server/router/utils"
@@ -47,9 +48,11 @@ func (rt *Handle) Setup(
4748
rsourcesService rsources.JobService,
4849
transformerFeaturesService transformerFeaturesService.FeaturesService,
4950
debugger destinationdebugger.DestinationDebugger,
51+
throttlerFactory throttler.Factory,
5052
) {
5153
rt.backendConfig = backendConfig
5254
rt.debugger = debugger
55+
rt.throttlerFactory = throttlerFactory
5356

5457
destType := destinationDefinition.Name
5558
rt.logger = log.Child(destType)
@@ -361,7 +364,8 @@ func (rt *Handle) Shutdown() {
361364
rt.logger.Infof("Shutting down router: %s", rt.destType)
362365
rt.backgroundCancel()
363366

364-
<-rt.startEnded // wait for all workers to stop first
367+
<-rt.startEnded // wait for all workers to stop first
368+
rt.throttlerFactory.Shutdown()
365369
close(rt.responseQ) // now it is safe to close the response channel
366370
_ = rt.backgroundWait()
367371
}

router/manager/manager_test.go

+2
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
mocksBackendConfig "github.com/rudderlabs/rudder-server/mocks/backend-config"
2929
"github.com/rudderlabs/rudder-server/router"
3030
"github.com/rudderlabs/rudder-server/router/batchrouter"
31+
"github.com/rudderlabs/rudder-server/router/throttler"
3132
"github.com/rudderlabs/rudder-server/services/rsources"
3233
"github.com/rudderlabs/rudder-server/services/transformer"
3334
"github.com/rudderlabs/rudder-server/services/transientsource"
@@ -205,6 +206,7 @@ func TestRouterManager(t *testing.T) {
205206
ProcErrorDB: errDB,
206207
TransientSources: transientsource.NewEmptyService(),
207208
RsourcesService: mockRsourcesService,
209+
ThrottlerFactory: throttler.NewNoOpThrottlerFactory(),
208210
TransformerFeaturesService: transformer.NewNoOpService(),
209211
}
210212
brtFactory := &batchrouter.Factory{

0 commit comments

Comments
 (0)