Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
* `cortex_ruler_clients`
* `cortex_ruler_client_request_duration_seconds`
* [ENHANCEMENT] Query-frontend/scheduler: added querier forget delay (`-query-frontend.querier-forget-delay` and `-query-scheduler.querier-forget-delay`) to mitigate the blast radius in the event queriers crash because of a repeatedly sent "query of death" when shuffle-sharding is enabled. #3901
* [BUGFIX] Distributor: reverted changes done to rate limiting in #3825. #3948

## 1.8.0 in progress

Expand Down
5 changes: 1 addition & 4 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -588,8 +588,7 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
}

totalN := validatedSamples + len(validatedMetadata)
rateOK, rateReservation := d.ingestionRateLimiter.AllowN(now, userID, totalN)
if !rateOK {
if !d.ingestionRateLimiter.AllowN(now, userID, totalN) {
// Ensure the request slice is reused if the request is rate limited.
cortexpb.ReuseSlice(req.Timeseries)

Expand Down Expand Up @@ -641,8 +640,6 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
return d.send(localCtx, ingester, timeseries, metadata, req.Source)
}, func() { cortexpb.ReuseSlice(req.Timeseries) })
if err != nil {
// Ingestion failed, so roll-back the reservation from the rate limiter.
rateReservation.CancelAt(now)
return nil, err
}
return &cortexpb.WriteResponse{}, firstPartialErr
Expand Down
19 changes: 1 addition & 18 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,6 @@ func TestDistributor_PushIngestionRateLimiter(t *testing.T) {
ingestionRateStrategy string
ingestionRate float64
ingestionBurstSize int
ingestionFailing bool
pushes []testPush
}{
"local strategy: limit should be set to each distributor": {
Expand Down Expand Up @@ -408,17 +407,6 @@ func TestDistributor_PushIngestionRateLimiter(t *testing.T) {
{metadata: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (5) exceeded while adding 0 samples and 1 metadata")},
},
},
"unhappy ingesters: rate limit should be unaffected when ingestion fails": {
distributors: 1,
ingestionRateStrategy: validation.LocalIngestionRateStrategy,
ingestionRate: 10,
ingestionBurstSize: 10,
ingestionFailing: true,
pushes: []testPush{
{samples: 10, expectedError: errFail},
{samples: 10, expectedError: errFail},
},
},
}

for testName, testData := range tests {
Expand All @@ -431,15 +419,10 @@ func TestDistributor_PushIngestionRateLimiter(t *testing.T) {
limits.IngestionRate = testData.ingestionRate
limits.IngestionBurstSize = testData.ingestionBurstSize

happyIngesters := 3
if testData.ingestionFailing {
happyIngesters = 0
}

// Start all expected distributors
distributors, _, r, _ := prepare(t, prepConfig{
numIngesters: 3,
happyIngesters: happyIngesters,
happyIngesters: 3,
numDistributors: testData.distributors,
shardByAllLabels: true,
limits: limits,
Expand Down
37 changes: 3 additions & 34 deletions pkg/util/limiter/rate_limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,6 @@ type RateLimiter struct {
tenants map[string]*tenantLimiter
}

// Reservation is similar to rate.Reservation but excludes interfaces which do
// not make sense to expose, because we are following the semantics of AllowN,
// being an immediate reservation, i.e. not delayed into the future.
type Reservation interface {
// CancelAt returns the reservation to the rate limiter for use by other
// requests. Note that typically the reservation should be canceled with
// the same timestamp it was requested with, or not all the tokens
// consumed will be returned.
CancelAt(now time.Time)
}

type tenantLimiter struct {
limiter *rate.Limiter
recheckAt time.Time
Expand All @@ -53,29 +42,9 @@ func NewRateLimiter(strategy RateLimiterStrategy, recheckPeriod time.Duration) *
}
}

// AllowN reports whether n tokens may be consumed happen at time now. The
// reservation of tokens can be canceled using CancelAt on the returned object.
func (l *RateLimiter) AllowN(now time.Time, tenantID string, n int) (bool, Reservation) {

// Using ReserveN allows cancellation of the reservation, but
// the semantics are subtly different to AllowN.
r := l.getTenantLimiter(now, tenantID).ReserveN(now, n)
if !r.OK() {
return false, nil
}

// ReserveN will still return OK if the necessary tokens are
// available in the future, and tells us this time delay. In
// order to mimic the semantics of AllowN, we must check that
// there is no delay before we can use them.
if r.DelayFrom(now) > 0 {
// Having decided not to use the reservation, return the
// tokens to the rate limiter.
r.CancelAt(now)
return false, nil
}

return true, r
// AllowN reports whether n tokens may be consumed happen at time now.
func (l *RateLimiter) AllowN(now time.Time, tenantID string, n int) bool {
return l.getTenantLimiter(now, tenantID).AllowN(now, n)
}

// Limit returns the currently configured maximum overall tokens rate.
Expand Down
70 changes: 15 additions & 55 deletions pkg/util/limiter/rate_limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,60 +45,24 @@ func TestRateLimiter_AllowN(t *testing.T) {
now := time.Now()

// Tenant #1
assert.Equal(t, true, isOK(limiter.AllowN(now, "tenant-1", 8)))
assert.Equal(t, true, isOK(limiter.AllowN(now, "tenant-1", 10)))
assert.Equal(t, false, isOK(limiter.AllowN(now, "tenant-1", 3)))
assert.Equal(t, true, isOK(limiter.AllowN(now, "tenant-1", 2)))
assert.Equal(t, true, limiter.AllowN(now, "tenant-1", 8))
assert.Equal(t, true, limiter.AllowN(now, "tenant-1", 10))
assert.Equal(t, false, limiter.AllowN(now, "tenant-1", 3))
assert.Equal(t, true, limiter.AllowN(now, "tenant-1", 2))

assert.Equal(t, true, isOK(limiter.AllowN(now.Add(time.Second), "tenant-1", 8)))
assert.Equal(t, false, isOK(limiter.AllowN(now.Add(time.Second), "tenant-1", 3)))
assert.Equal(t, true, isOK(limiter.AllowN(now.Add(time.Second), "tenant-1", 2)))
assert.Equal(t, true, limiter.AllowN(now.Add(time.Second), "tenant-1", 8))
assert.Equal(t, false, limiter.AllowN(now.Add(time.Second), "tenant-1", 3))
assert.Equal(t, true, limiter.AllowN(now.Add(time.Second), "tenant-1", 2))

// Tenant #2
assert.Equal(t, true, isOK(limiter.AllowN(now, "tenant-2", 18)))
assert.Equal(t, true, isOK(limiter.AllowN(now, "tenant-2", 20)))
assert.Equal(t, false, isOK(limiter.AllowN(now, "tenant-2", 3)))
assert.Equal(t, true, isOK(limiter.AllowN(now, "tenant-2", 2)))

assert.Equal(t, true, isOK(limiter.AllowN(now.Add(time.Second), "tenant-2", 18)))
assert.Equal(t, false, isOK(limiter.AllowN(now.Add(time.Second), "tenant-2", 3)))
assert.Equal(t, true, isOK(limiter.AllowN(now.Add(time.Second), "tenant-2", 2)))
}

func TestRateLimiter_AllowNCancelation(t *testing.T) {
strategy := &staticLimitStrategy{tenants: map[string]struct {
limit float64
burst int
}{
"tenant-1": {limit: 10, burst: 20},
}}

limiter := NewRateLimiter(strategy, 10*time.Second)
now := time.Now()

assert.Equal(t, true, isOK(limiter.AllowN(now, "tenant-1", 12)))
assert.Equal(t, false, isOK(limiter.AllowN(now, "tenant-1", 9)))

ok1, r1 := limiter.AllowN(now, "tenant-1", 8)
assert.Equal(t, true, ok1)
r1.CancelAt(now)

assert.Equal(t, true, isOK(limiter.AllowN(now, "tenant-1", 8)))

// +10 tokens (1s)
nowPlus := now.Add(time.Second)

assert.Equal(t, true, isOK(limiter.AllowN(nowPlus, "tenant-1", 6)))
assert.Equal(t, false, isOK(limiter.AllowN(nowPlus, "tenant-1", 5)))

ok2, r2 := limiter.AllowN(nowPlus, "tenant-1", 4)
assert.Equal(t, true, ok2)
r2.CancelAt(nowPlus)

assert.Equal(t, true, isOK(limiter.AllowN(nowPlus, "tenant-1", 2)))
assert.Equal(t, false, isOK(limiter.AllowN(nowPlus, "tenant-1", 3)))
assert.Equal(t, true, isOK(limiter.AllowN(nowPlus, "tenant-1", 2)))
assert.Equal(t, false, isOK(limiter.AllowN(nowPlus, "tenant-1", 1)))
assert.Equal(t, true, limiter.AllowN(now, "tenant-2", 18))
assert.Equal(t, true, limiter.AllowN(now, "tenant-2", 20))
assert.Equal(t, false, limiter.AllowN(now, "tenant-2", 3))
assert.Equal(t, true, limiter.AllowN(now, "tenant-2", 2))

assert.Equal(t, true, limiter.AllowN(now.Add(time.Second), "tenant-2", 18))
assert.Equal(t, false, limiter.AllowN(now.Add(time.Second), "tenant-2", 3))
assert.Equal(t, true, limiter.AllowN(now.Add(time.Second), "tenant-2", 2))
}

func BenchmarkRateLimiter_CustomMultiTenant(b *testing.B) {
Expand Down Expand Up @@ -163,7 +127,3 @@ func (s *staticLimitStrategy) Burst(tenantID string) int {

return tenant.burst
}

func isOK(ok bool, r Reservation) bool {
return ok
}