Skip to content

feat: support for adaptive rate limiting [PIPE-481] #4160

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 18 commits into from
Dec 6, 2023
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion app/apphandlers/embeddedAppHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options)
enrichers,
processor.WithAdaptiveLimit(adaptiveLimit),
)
throttlerFactory, err := rtThrottler.New(stats.Default)
throttlerFactory, err := rtThrottler.NewFactory(config, stats.Default)
if err != nil {
return fmt.Errorf("failed to create rt throttler factory: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion app/apphandlers/processorAppHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options
enrichers,
proc.WithAdaptiveLimit(adaptiveLimit),
)
throttlerFactory, err := throttler.New(stats.Default)
throttlerFactory, err := throttler.NewFactory(config.Default, stats.Default)
if err != nil {
return fmt.Errorf("failed to create throttler factory: %w", err)
}
Expand Down
2 changes: 2 additions & 0 deletions app/cluster/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/rudderlabs/rudder-server/router"
"github.com/rudderlabs/rudder-server/router/batchrouter"
routermanager "github.com/rudderlabs/rudder-server/router/manager"
"github.com/rudderlabs/rudder-server/router/throttler"
destinationdebugger "github.com/rudderlabs/rudder-server/services/debugger/destination"
transformationdebugger "github.com/rudderlabs/rudder-server/services/debugger/transformation"
"github.com/rudderlabs/rudder-server/services/fileuploader"
Expand Down Expand Up @@ -227,6 +228,7 @@ func TestDynamicClusterManager(t *testing.T) {
TransientSources: transientsource.NewEmptyService(),
RsourcesService: mockRsourcesService,
TransformerFeaturesService: transformer.NewNoOpService(),
ThrottlerFactory: throttler.NewNoOpThrottlerFactory(),
}
brtFactory := &batchrouter.Factory{
Reporting: &reporting.NOOP{},
Expand Down
8 changes: 4 additions & 4 deletions router/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,15 @@ type Factory struct {
TransientSources transientsource.Service
RsourcesService rsources.JobService
TransformerFeaturesService transformerFeaturesService.FeaturesService
ThrottlerFactory *throttler.Factory
ThrottlerFactory throttler.Factory
Debugger destinationdebugger.DestinationDebugger
AdaptiveLimit func(int64) int64
}

func (f *Factory) New(destination *backendconfig.DestinationT) *Handle {
r := &Handle{
Reporting: f.Reporting,
throttlerFactory: f.ThrottlerFactory,
adaptiveLimit: f.AdaptiveLimit,
Reporting: f.Reporting,
adaptiveLimit: f.AdaptiveLimit,
}
r.Setup(
destination.DestinationDefinition,
Expand All @@ -45,6 +44,7 @@ func (f *Factory) New(destination *backendconfig.DestinationT) *Handle {
f.RsourcesService,
f.TransformerFeaturesService,
f.Debugger,
f.ThrottlerFactory,
)
return r
}
Expand Down
20 changes: 9 additions & 11 deletions router/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type Handle struct {
// external dependencies
jobsDB jobsdb.JobsDB
errorDB jobsdb.JobsDB
throttlerFactory *rtThrottler.Factory
throttlerFactory rtThrottler.Factory
backendConfig backendconfig.BackendConfig
Reporting reporter
transientSources transientsource.Service
Expand Down Expand Up @@ -228,7 +228,7 @@ func (rt *Handle) pickup(ctx context.Context, partition string, workers []*worke
firstJob = job
}
lastJob = job
slot, err := rt.findWorkerSlot(workers, job, blockedOrderKeys)
slot, err := rt.findWorkerSlot(ctx, workers, job, blockedOrderKeys)
if err == nil {
status := jobsdb.JobStatusT{
JobID: job.JobID,
Expand Down Expand Up @@ -306,6 +306,8 @@ func (rt *Handle) commitStatusList(workerJobStatuses *[]workerJobStatus) {
if err != nil {
rt.logger.Error("Unmarshal of job parameters failed. ", string(workerJobStatus.job.Parameters))
}
errorCode, _ := strconv.Atoi(workerJobStatus.status.ErrorCode)
rt.throttlerFactory.Get(rt.destType, parameters.DestinationID).ResponseCodeReceived(errorCode) // send response code to throttler
// Update metrics maps
// REPORTING - ROUTER - START
workspaceID := workerJobStatus.status.WorkspaceId
Expand All @@ -320,10 +322,6 @@ func (rt *Handle) commitStatusList(workerJobStatuses *[]workerJobStatus) {
}
sd, ok := statusDetailsMap[key]
if !ok {
errorCode, err := strconv.Atoi(workerJobStatus.status.ErrorCode)
if err != nil {
errorCode = 200 // TODO handle properly
}
sampleEvent := workerJobStatus.job.EventPayload
if rt.transientSources.Apply(parameters.SourceID) {
sampleEvent = routerutils.EmptyPayload
Expand Down Expand Up @@ -487,7 +485,7 @@ func (rt *Handle) getQueryParams(partition string, pickUpCount int) jobsdb.GetQu
return params
}

func (rt *Handle) findWorkerSlot(workers []*worker, job *jobsdb.JobT, blockedOrderKeys map[string]struct{}) (*workerSlot, error) {
func (rt *Handle) findWorkerSlot(ctx context.Context, workers []*worker, job *jobsdb.JobT, blockedOrderKeys map[string]struct{}) (*workerSlot, error) {
if rt.backgroundCtx.Err() != nil {
return nil, types.ErrContextCancelled
}
Expand Down Expand Up @@ -516,7 +514,7 @@ func (rt *Handle) findWorkerSlot(workers []*worker, job *jobsdb.JobT, blockedOrd
if rt.shouldBackoff(job) {
return nil, types.ErrJobBackoff
}
if rt.shouldThrottle(job, parameters) {
if rt.shouldThrottle(ctx, job, parameters) {
return nil, types.ErrDestinationThrottled
}

Expand Down Expand Up @@ -557,7 +555,7 @@ func (rt *Handle) findWorkerSlot(workers []*worker, job *jobsdb.JobT, blockedOrd
return nil, types.ErrBarrierExists
}
rt.logger.Debugf("EventOrder: job %d of orderKey %s is allowed to be processed", job.JobID, orderKey)
if rt.shouldThrottle(job, parameters) {
if rt.shouldThrottle(ctx, job, parameters) {
blockedOrderKeys[orderKey] = struct{}{}
worker.barrier.Leave(orderKey, job.JobID)
slot.Release()
Expand All @@ -571,7 +569,7 @@ func (*Handle) shouldBackoff(job *jobsdb.JobT) bool {
return job.LastJobStatus.JobState == jobsdb.Failed.State && job.LastJobStatus.AttemptNum > 0 && time.Until(job.LastJobStatus.RetryTime) > 0
}

func (rt *Handle) shouldThrottle(job *jobsdb.JobT, parameters routerutils.JobParameters) (limited bool) {
func (rt *Handle) shouldThrottle(ctx context.Context, job *jobsdb.JobT, parameters routerutils.JobParameters) (limited bool) {
if rt.throttlerFactory == nil {
// throttlerFactory could be nil when throttling is disabled or misconfigured.
// in case of misconfiguration, logging errors are emitted.
Expand All @@ -584,7 +582,7 @@ func (rt *Handle) shouldThrottle(job *jobsdb.JobT, parameters routerutils.JobPar
throttler := rt.throttlerFactory.Get(rt.destType, parameters.DestinationID)
throttlingCost := rt.getThrottlingCost(job)

limited, err := throttler.CheckLimitReached(parameters.DestinationID, throttlingCost)
limited, err := throttler.CheckLimitReached(ctx, parameters.DestinationID, throttlingCost)
if err != nil {
// we can't throttle, let's hit the destination, worst case we get a 429
rt.throttlingErrorStat.Count(1)
Expand Down
6 changes: 5 additions & 1 deletion router/handle_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/rudderlabs/rudder-server/router/internal/eventorder"
"github.com/rudderlabs/rudder-server/router/internal/partition"
"github.com/rudderlabs/rudder-server/router/isolation"
"github.com/rudderlabs/rudder-server/router/throttler"
"github.com/rudderlabs/rudder-server/router/transformer"
"github.com/rudderlabs/rudder-server/router/types"
routerutils "github.com/rudderlabs/rudder-server/router/utils"
Expand All @@ -47,9 +48,11 @@ func (rt *Handle) Setup(
rsourcesService rsources.JobService,
transformerFeaturesService transformerFeaturesService.FeaturesService,
debugger destinationdebugger.DestinationDebugger,
throttlerFactory throttler.Factory,
) {
rt.backendConfig = backendConfig
rt.debugger = debugger
rt.throttlerFactory = throttlerFactory

destType := destinationDefinition.Name
rt.logger = log.Child(destType)
Expand Down Expand Up @@ -361,7 +364,8 @@ func (rt *Handle) Shutdown() {
rt.logger.Infof("Shutting down router: %s", rt.destType)
rt.backgroundCancel()

<-rt.startEnded // wait for all workers to stop first
<-rt.startEnded // wait for all workers to stop first
rt.throttlerFactory.Shutdown()
close(rt.responseQ) // now it is safe to close the response channel
_ = rt.backgroundWait()
}
Expand Down
2 changes: 2 additions & 0 deletions router/manager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
mocksBackendConfig "github.com/rudderlabs/rudder-server/mocks/backend-config"
"github.com/rudderlabs/rudder-server/router"
"github.com/rudderlabs/rudder-server/router/batchrouter"
"github.com/rudderlabs/rudder-server/router/throttler"
"github.com/rudderlabs/rudder-server/services/rsources"
"github.com/rudderlabs/rudder-server/services/transformer"
"github.com/rudderlabs/rudder-server/services/transientsource"
Expand Down Expand Up @@ -205,6 +206,7 @@ func TestRouterManager(t *testing.T) {
ProcErrorDB: errDB,
TransientSources: transientsource.NewEmptyService(),
RsourcesService: mockRsourcesService,
ThrottlerFactory: throttler.NewNoOpThrottlerFactory(),
TransformerFeaturesService: transformer.NewNoOpService(),
}
brtFactory := &batchrouter.Factory{
Expand Down
Loading