From 7433c177e2bb6305a7792065107d90644af7b36f Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Fri, 12 Mar 2021 14:27:51 +0100 Subject: [PATCH] Revert "Prevent failed ingestion from affecting rate limiting in distributor. (#3825)" This reverts commit 9ba848b46c8f0b60302d422bdebf9c5e22f9f301. Signed-off-by: Marco Pracucci --- CHANGELOG.md | 1 + pkg/distributor/distributor.go | 5 +- pkg/distributor/distributor_test.go | 19 +------- pkg/util/limiter/rate_limiter.go | 37 ++------------ pkg/util/limiter/rate_limiter_test.go | 70 ++++++--------------------- 5 files changed, 21 insertions(+), 111 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 59d89a2bcc9..7a318547e5f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ * `cortex_ruler_clients` * `cortex_ruler_client_request_duration_seconds` * [ENHANCEMENT] Query-frontend/scheduler: added querier forget delay (`-query-frontend.querier-forget-delay` and `-query-scheduler.querier-forget-delay`) to mitigate the blast radius in the event queriers crash because of a repeatedly sent "query of death" when shuffle-sharding is enabled. #3901 +* [BUGFIX] Distributor: reverted changes done to rate limiting in #3825. #3948 ## 1.8.0 in progress diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 73c524ce669..47975ad5456 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -588,8 +588,7 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co } totalN := validatedSamples + len(validatedMetadata) - rateOK, rateReservation := d.ingestionRateLimiter.AllowN(now, userID, totalN) - if !rateOK { + if !d.ingestionRateLimiter.AllowN(now, userID, totalN) { // Ensure the request slice is reused if the request is rate limited. cortexpb.ReuseSlice(req.Timeseries) @@ -641,8 +640,6 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co return d.send(localCtx, ingester, timeseries, metadata, req.Source) }, func() { cortexpb.ReuseSlice(req.Timeseries) }) if err != nil { - // Ingestion failed, so roll-back the reservation from the rate limiter. - rateReservation.CancelAt(now) return nil, err } return &cortexpb.WriteResponse{}, firstPartialErr diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 6b392d1b24f..3357e2ac7c5 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -363,7 +363,6 @@ func TestDistributor_PushIngestionRateLimiter(t *testing.T) { ingestionRateStrategy string ingestionRate float64 ingestionBurstSize int - ingestionFailing bool pushes []testPush }{ "local strategy: limit should be set to each distributor": { @@ -408,17 +407,6 @@ func TestDistributor_PushIngestionRateLimiter(t *testing.T) { {metadata: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (5) exceeded while adding 0 samples and 1 metadata")}, }, }, - "unhappy ingesters: rate limit should be unaffected when ingestion fails": { - distributors: 1, - ingestionRateStrategy: validation.LocalIngestionRateStrategy, - ingestionRate: 10, - ingestionBurstSize: 10, - ingestionFailing: true, - pushes: []testPush{ - {samples: 10, expectedError: errFail}, - {samples: 10, expectedError: errFail}, - }, - }, } for testName, testData := range tests { @@ -431,15 +419,10 @@ func TestDistributor_PushIngestionRateLimiter(t *testing.T) { limits.IngestionRate = testData.ingestionRate limits.IngestionBurstSize = testData.ingestionBurstSize - happyIngesters := 3 - if testData.ingestionFailing { - happyIngesters = 0 - } - // Start all expected distributors distributors, _, r, _ := prepare(t, prepConfig{ numIngesters: 3, - happyIngesters: happyIngesters, + happyIngesters: 3, numDistributors: testData.distributors, shardByAllLabels: true, limits: limits, diff --git a/pkg/util/limiter/rate_limiter.go b/pkg/util/limiter/rate_limiter.go index f2fa22b1178..48fc6a42623 100644 --- a/pkg/util/limiter/rate_limiter.go +++ b/pkg/util/limiter/rate_limiter.go @@ -26,17 +26,6 @@ type RateLimiter struct { tenants map[string]*tenantLimiter } -// Reservation is similar to rate.Reservation but excludes interfaces which do -// not make sense to expose, because we are following the semantics of AllowN, -// being an immediate reservation, i.e. not delayed into the future. -type Reservation interface { - // CancelAt returns the reservation to the rate limiter for use by other - // requests. Note that typically the reservation should be canceled with - // the same timestamp it was requested with, or not all the tokens - // consumed will be returned. - CancelAt(now time.Time) -} - type tenantLimiter struct { limiter *rate.Limiter recheckAt time.Time @@ -53,29 +42,9 @@ func NewRateLimiter(strategy RateLimiterStrategy, recheckPeriod time.Duration) * } } -// AllowN reports whether n tokens may be consumed happen at time now. The -// reservation of tokens can be canceled using CancelAt on the returned object. -func (l *RateLimiter) AllowN(now time.Time, tenantID string, n int) (bool, Reservation) { - - // Using ReserveN allows cancellation of the reservation, but - // the semantics are subtly different to AllowN. - r := l.getTenantLimiter(now, tenantID).ReserveN(now, n) - if !r.OK() { - return false, nil - } - - // ReserveN will still return OK if the necessary tokens are - // available in the future, and tells us this time delay. In - // order to mimic the semantics of AllowN, we must check that - // there is no delay before we can use them. - if r.DelayFrom(now) > 0 { - // Having decided not to use the reservation, return the - // tokens to the rate limiter. - r.CancelAt(now) - return false, nil - } - - return true, r +// AllowN reports whether n tokens may be consumed happen at time now. +func (l *RateLimiter) AllowN(now time.Time, tenantID string, n int) bool { + return l.getTenantLimiter(now, tenantID).AllowN(now, n) } // Limit returns the currently configured maximum overall tokens rate. diff --git a/pkg/util/limiter/rate_limiter_test.go b/pkg/util/limiter/rate_limiter_test.go index 7d76b20a0d2..907624c10cc 100644 --- a/pkg/util/limiter/rate_limiter_test.go +++ b/pkg/util/limiter/rate_limiter_test.go @@ -45,60 +45,24 @@ func TestRateLimiter_AllowN(t *testing.T) { now := time.Now() // Tenant #1 - assert.Equal(t, true, isOK(limiter.AllowN(now, "tenant-1", 8))) - assert.Equal(t, true, isOK(limiter.AllowN(now, "tenant-1", 10))) - assert.Equal(t, false, isOK(limiter.AllowN(now, "tenant-1", 3))) - assert.Equal(t, true, isOK(limiter.AllowN(now, "tenant-1", 2))) + assert.Equal(t, true, limiter.AllowN(now, "tenant-1", 8)) + assert.Equal(t, true, limiter.AllowN(now, "tenant-1", 10)) + assert.Equal(t, false, limiter.AllowN(now, "tenant-1", 3)) + assert.Equal(t, true, limiter.AllowN(now, "tenant-1", 2)) - assert.Equal(t, true, isOK(limiter.AllowN(now.Add(time.Second), "tenant-1", 8))) - assert.Equal(t, false, isOK(limiter.AllowN(now.Add(time.Second), "tenant-1", 3))) - assert.Equal(t, true, isOK(limiter.AllowN(now.Add(time.Second), "tenant-1", 2))) + assert.Equal(t, true, limiter.AllowN(now.Add(time.Second), "tenant-1", 8)) + assert.Equal(t, false, limiter.AllowN(now.Add(time.Second), "tenant-1", 3)) + assert.Equal(t, true, limiter.AllowN(now.Add(time.Second), "tenant-1", 2)) // Tenant #2 - assert.Equal(t, true, isOK(limiter.AllowN(now, "tenant-2", 18))) - assert.Equal(t, true, isOK(limiter.AllowN(now, "tenant-2", 20))) - assert.Equal(t, false, isOK(limiter.AllowN(now, "tenant-2", 3))) - assert.Equal(t, true, isOK(limiter.AllowN(now, "tenant-2", 2))) - - assert.Equal(t, true, isOK(limiter.AllowN(now.Add(time.Second), "tenant-2", 18))) - assert.Equal(t, false, isOK(limiter.AllowN(now.Add(time.Second), "tenant-2", 3))) - assert.Equal(t, true, isOK(limiter.AllowN(now.Add(time.Second), "tenant-2", 2))) -} - -func TestRateLimiter_AllowNCancelation(t *testing.T) { - strategy := &staticLimitStrategy{tenants: map[string]struct { - limit float64 - burst int - }{ - "tenant-1": {limit: 10, burst: 20}, - }} - - limiter := NewRateLimiter(strategy, 10*time.Second) - now := time.Now() - - assert.Equal(t, true, isOK(limiter.AllowN(now, "tenant-1", 12))) - assert.Equal(t, false, isOK(limiter.AllowN(now, "tenant-1", 9))) - - ok1, r1 := limiter.AllowN(now, "tenant-1", 8) - assert.Equal(t, true, ok1) - r1.CancelAt(now) - - assert.Equal(t, true, isOK(limiter.AllowN(now, "tenant-1", 8))) - - // +10 tokens (1s) - nowPlus := now.Add(time.Second) - - assert.Equal(t, true, isOK(limiter.AllowN(nowPlus, "tenant-1", 6))) - assert.Equal(t, false, isOK(limiter.AllowN(nowPlus, "tenant-1", 5))) - - ok2, r2 := limiter.AllowN(nowPlus, "tenant-1", 4) - assert.Equal(t, true, ok2) - r2.CancelAt(nowPlus) - - assert.Equal(t, true, isOK(limiter.AllowN(nowPlus, "tenant-1", 2))) - assert.Equal(t, false, isOK(limiter.AllowN(nowPlus, "tenant-1", 3))) - assert.Equal(t, true, isOK(limiter.AllowN(nowPlus, "tenant-1", 2))) - assert.Equal(t, false, isOK(limiter.AllowN(nowPlus, "tenant-1", 1))) + assert.Equal(t, true, limiter.AllowN(now, "tenant-2", 18)) + assert.Equal(t, true, limiter.AllowN(now, "tenant-2", 20)) + assert.Equal(t, false, limiter.AllowN(now, "tenant-2", 3)) + assert.Equal(t, true, limiter.AllowN(now, "tenant-2", 2)) + + assert.Equal(t, true, limiter.AllowN(now.Add(time.Second), "tenant-2", 18)) + assert.Equal(t, false, limiter.AllowN(now.Add(time.Second), "tenant-2", 3)) + assert.Equal(t, true, limiter.AllowN(now.Add(time.Second), "tenant-2", 2)) } func BenchmarkRateLimiter_CustomMultiTenant(b *testing.B) { @@ -163,7 +127,3 @@ func (s *staticLimitStrategy) Burst(tenantID string) int { return tenant.burst } - -func isOK(ok bool, r Reservation) bool { - return ok -}