Skip to content

Commit 8fdc99f

Browse files
authored
Add limiter (#81)
* example limiter Signed-off-by: yeya24 <[email protected]> * address comments and add tests Signed-off-by: yeya24 <[email protected]> --------- Signed-off-by: yeya24 <[email protected]>
1 parent f89902f commit 8fdc99f

File tree

6 files changed

+560
-23
lines changed

6 files changed

+560
-23
lines changed

queryable/parquet_queryable.go

Lines changed: 45 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,19 @@ import (
3434
type ShardsFinderFunction func(ctx context.Context, mint, maxt int64) ([]storage.ParquetShard, error)
3535

3636
type queryableOpts struct {
37-
concurrency int
37+
concurrency int
38+
rowCountLimitFunc search.QuotaLimitFunc
39+
chunkBytesLimitFunc search.QuotaLimitFunc
40+
dataBytesLimitFunc search.QuotaLimitFunc
41+
materializedSeriesCallback search.MaterializedSeriesFunc
3842
}
3943

4044
var DefaultQueryableOpts = queryableOpts{
41-
concurrency: runtime.GOMAXPROCS(0),
45+
concurrency: runtime.GOMAXPROCS(0),
46+
rowCountLimitFunc: search.NoopQuotaLimitFunc,
47+
chunkBytesLimitFunc: search.NoopQuotaLimitFunc,
48+
dataBytesLimitFunc: search.NoopQuotaLimitFunc,
49+
materializedSeriesCallback: search.NoopMaterializedSeriesFunc,
4250
}
4351

4452
type QueryableOpts func(*queryableOpts)
@@ -50,6 +58,35 @@ func WithConcurrency(concurrency int) QueryableOpts {
5058
}
5159
}
5260

61+
// WithRowCountLimitFunc sets a callback function to get limit for matched row count.
62+
func WithRowCountLimitFunc(fn search.QuotaLimitFunc) QueryableOpts {
63+
return func(opts *queryableOpts) {
64+
opts.rowCountLimitFunc = fn
65+
}
66+
}
67+
68+
// WithChunkBytesLimitFunc sets a callback function to get limit for chunk column page bytes fetched.
69+
func WithChunkBytesLimitFunc(fn search.QuotaLimitFunc) QueryableOpts {
70+
return func(opts *queryableOpts) {
71+
opts.chunkBytesLimitFunc = fn
72+
}
73+
}
74+
75+
// WithDataBytesLimitFunc sets a callback function to get limit for data (including label and chunk)
76+
// column page bytes fetched.
77+
func WithDataBytesLimitFunc(fn search.QuotaLimitFunc) QueryableOpts {
78+
return func(opts *queryableOpts) {
79+
opts.dataBytesLimitFunc = fn
80+
}
81+
}
82+
83+
// WithMaterializedSeriesCallback sets a callback function to process the materialized series.
84+
func WithMaterializedSeriesCallback(fn search.MaterializedSeriesFunc) QueryableOpts {
85+
return func(opts *queryableOpts) {
86+
opts.materializedSeriesCallback = fn
87+
}
88+
}
89+
5390
type parquetQueryable struct {
5491
shardsFinder ShardsFinderFunction
5592
d *schema.PrometheusParquetChunksDecoder
@@ -191,8 +228,11 @@ func (p parquetQuerier) queryableShards(ctx context.Context, mint, maxt int64) (
191228
return nil, err
192229
}
193230
qBlocks := make([]*queryableShard, len(shards))
231+
rowCountQuota := search.NewQuota(p.opts.rowCountLimitFunc(ctx))
232+
chunkBytesQuota := search.NewQuota(p.opts.chunkBytesLimitFunc(ctx))
233+
dataBytesQuota := search.NewQuota(p.opts.dataBytesLimitFunc(ctx))
194234
for i, shard := range shards {
195-
qb, err := newQueryableShard(p.opts, shard, p.d)
235+
qb, err := newQueryableShard(p.opts, shard, p.d, rowCountQuota, chunkBytesQuota, dataBytesQuota)
196236
if err != nil {
197237
return nil, err
198238
}
@@ -207,12 +247,12 @@ type queryableShard struct {
207247
concurrency int
208248
}
209249

210-
func newQueryableShard(opts *queryableOpts, block storage.ParquetShard, d *schema.PrometheusParquetChunksDecoder) (*queryableShard, error) {
250+
func newQueryableShard(opts *queryableOpts, block storage.ParquetShard, d *schema.PrometheusParquetChunksDecoder, rowCountQuota *search.Quota, chunkBytesQuota *search.Quota, dataBytesQuota *search.Quota) (*queryableShard, error) {
211251
s, err := block.TSDBSchema()
212252
if err != nil {
213253
return nil, err
214254
}
215-
m, err := search.NewMaterializer(s, d, block, opts.concurrency)
255+
m, err := search.NewMaterializer(s, d, block, opts.concurrency, rowCountQuota, chunkBytesQuota, dataBytesQuota, opts.materializedSeriesCallback)
216256
if err != nil {
217257
return nil, err
218258
}

queryable/parquet_queryable_test.go

Lines changed: 123 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import (
3535

3636
"github.com/prometheus-community/parquet-common/convert"
3737
"github.com/prometheus-community/parquet-common/schema"
38+
"github.com/prometheus-community/parquet-common/search"
3839
"github.com/prometheus-community/parquet-common/storage"
3940
"github.com/prometheus-community/parquet-common/util"
4041
)
@@ -269,6 +270,126 @@ func TestQueryable(t *testing.T) {
269270
require.Equal(t, expectedLabelValues, lValues)
270271
})
271272
})
273+
274+
t.Run("RowCountQuota", func(t *testing.T) {
275+
// Test with limited row count quota
276+
limitedRowQuota := func(ctx context.Context) int64 {
277+
return 10 // Only allow 10 rows
278+
}
279+
queryable, err := createQueryable(shard, WithRowCountLimitFunc(limitedRowQuota))
280+
require.NoError(t, err)
281+
querier, err := queryable.Querier(data.MinTime, data.MaxTime)
282+
require.NoError(t, err)
283+
284+
// Try to query more rows than quota allows
285+
matchers := []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "unique", "unique_0")}
286+
ss := querier.Select(ctx, true, nil, matchers...)
287+
288+
// This should fail due to row count quota
289+
for ss.Next() {
290+
_ = ss.At()
291+
}
292+
require.Error(t, ss.Err())
293+
require.Contains(t, ss.Err().Error(), "would fetch too many rows")
294+
require.True(t, search.IsResourceExhausted(ss.Err()))
295+
296+
// Test with sufficient quota
297+
sufficientRowQuota := func(ctx context.Context) int64 {
298+
return 1000 // Allow 1000 rows
299+
}
300+
queryable, err = createQueryable(shard, WithRowCountLimitFunc(sufficientRowQuota))
301+
require.NoError(t, err)
302+
querier, err = queryable.Querier(data.MinTime, data.MaxTime)
303+
require.NoError(t, err)
304+
305+
ss = querier.Select(ctx, true, nil, matchers...)
306+
var series []prom_storage.Series
307+
for ss.Next() {
308+
series = append(series, ss.At())
309+
}
310+
require.NoError(t, ss.Err())
311+
require.NotEmpty(t, series)
312+
})
313+
314+
t.Run("ChunkBytesQuota", func(t *testing.T) {
315+
// Test with limited chunk bytes quota
316+
limitedChunkQuota := func(ctx context.Context) int64 {
317+
return 100 // Only allow 100 bytes
318+
}
319+
queryable, err := createQueryable(shard, WithChunkBytesLimitFunc(limitedChunkQuota))
320+
require.NoError(t, err)
321+
querier, err := queryable.Querier(data.MinTime, data.MaxTime)
322+
require.NoError(t, err)
323+
324+
// Try to query chunks that exceed the quota
325+
matchers := []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "unique", "unique_0")}
326+
ss := querier.Select(ctx, true, nil, matchers...)
327+
328+
// This should fail due to chunk bytes quota
329+
for ss.Next() {
330+
_ = ss.At()
331+
}
332+
require.Error(t, ss.Err())
333+
require.Contains(t, ss.Err().Error(), "would fetch too many chunk bytes")
334+
require.True(t, search.IsResourceExhausted(ss.Err()))
335+
336+
// Test with sufficient quota
337+
sufficientChunkQuota := func(ctx context.Context) int64 {
338+
return 1000000 // Allow 1MB
339+
}
340+
queryable, err = createQueryable(shard, WithChunkBytesLimitFunc(sufficientChunkQuota))
341+
require.NoError(t, err)
342+
querier, err = queryable.Querier(data.MinTime, data.MaxTime)
343+
require.NoError(t, err)
344+
345+
ss = querier.Select(ctx, true, nil, matchers...)
346+
var series []prom_storage.Series
347+
for ss.Next() {
348+
series = append(series, ss.At())
349+
}
350+
require.NoError(t, ss.Err())
351+
require.NotEmpty(t, series)
352+
})
353+
354+
t.Run("DataBytesQuota", func(t *testing.T) {
355+
// Test with limited data bytes quota
356+
limitedDataQuota := func(ctx context.Context) int64 {
357+
return 100 // Only allow 100 bytes
358+
}
359+
queryable, err := createQueryable(shard, WithDataBytesLimitFunc(limitedDataQuota))
360+
require.NoError(t, err)
361+
querier, err := queryable.Querier(data.MinTime, data.MaxTime)
362+
require.NoError(t, err)
363+
364+
// Try to query data that exceeds the quota
365+
matchers := []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "unique", "unique_0")}
366+
ss := querier.Select(ctx, true, nil, matchers...)
367+
368+
// This should fail due to data bytes quota
369+
for ss.Next() {
370+
_ = ss.At()
371+
}
372+
require.Error(t, ss.Err())
373+
require.Contains(t, ss.Err().Error(), "would fetch too many data bytes")
374+
require.True(t, search.IsResourceExhausted(ss.Err()))
375+
376+
// Test with sufficient quota
377+
sufficientDataQuota := func(ctx context.Context) int64 {
378+
return 1000000 // Allow 1MB
379+
}
380+
queryable, err = createQueryable(shard, WithDataBytesLimitFunc(sufficientDataQuota))
381+
require.NoError(t, err)
382+
querier, err = queryable.Querier(data.MinTime, data.MaxTime)
383+
require.NoError(t, err)
384+
385+
ss = querier.Select(ctx, true, nil, matchers...)
386+
var series []prom_storage.Series
387+
for ss.Next() {
388+
series = append(series, ss.At())
389+
}
390+
require.NoError(t, ss.Err())
391+
require.NotEmpty(t, series)
392+
})
272393
})
273394
}
274395
}
@@ -338,11 +459,11 @@ func queryWithQueryable(t *testing.T, mint, maxt int64, shard storage.ParquetSha
338459
return found
339460
}
340461

341-
func createQueryable(shard storage.ParquetShard) (prom_storage.Queryable, error) {
462+
func createQueryable(shard storage.ParquetShard, opts ...QueryableOpts) (prom_storage.Queryable, error) {
342463
d := schema.NewPrometheusParquetChunksDecoder(chunkenc.NewPool())
343464
return NewParquetQueryable(d, func(ctx context.Context, mint, maxt int64) ([]storage.ParquetShard, error) {
344465
return []storage.ParquetShard{shard}, nil
345-
})
466+
}, opts...)
346467
}
347468

348469
var benchmarkCases = []struct {

search/limits.go

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
// Copyright The Prometheus Authors
2+
// Licensed under the Apache License, Version 2.0 (the "License");
3+
// you may not use this file except in compliance with the License.
4+
// You may obtain a copy of the License at
5+
//
6+
// http://www.apache.org/licenses/LICENSE-2.0
7+
//
8+
// Unless required by applicable law or agreed to in writing, software
9+
// distributed under the License is distributed on an "AS IS" BASIS,
10+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
14+
// Copyright (c) The Thanos Authors.
15+
// Licensed under the Apache 2.0 license found in the LICENSE file or at:
16+
// https://opensource.org/licenses/Apache-2.0
17+
18+
// This package is a modified copy from
19+
// https://github.com/thanos-io/thanos-parquet-gateway/blob/cfc1279f605d1c629c4afe8b1e2a340e8b15ecdc/internal/limits/limit.go.
20+
21+
package search
22+
23+
import (
24+
"context"
25+
"errors"
26+
"fmt"
27+
"sync"
28+
)
29+
30+
type resourceExhausted struct {
31+
used int64
32+
}
33+
34+
func (re *resourceExhausted) Error() string {
35+
return fmt.Sprintf("resource exhausted (used %d)", re.used)
36+
}
37+
38+
// IsResourceExhausted checks if the error is a resource exhausted error.
39+
func IsResourceExhausted(err error) bool {
40+
var re *resourceExhausted
41+
return errors.As(err, &re)
42+
}
43+
44+
// Quota is a limiter for a resource.
45+
type Quota struct {
46+
mu sync.Mutex
47+
q int64
48+
u int64
49+
}
50+
51+
// NewQuota creates a new quota with the given limit.
52+
func NewQuota(n int64) *Quota {
53+
return &Quota{q: n, u: n}
54+
}
55+
56+
// UnlimitedQuota creates a new quota with no limit.
57+
func UnlimitedQuota() *Quota {
58+
return NewQuota(0)
59+
}
60+
61+
func (q *Quota) Reserve(n int64) error {
62+
if q.q == 0 {
63+
return nil
64+
}
65+
66+
q.mu.Lock()
67+
defer q.mu.Unlock()
68+
69+
if q.u-n < 0 {
70+
return &resourceExhausted{used: q.q}
71+
}
72+
q.u -= n
73+
return nil
74+
}
75+
76+
// QuotaLimitFunc is a function that returns the limit value.
77+
type QuotaLimitFunc func(ctx context.Context) int64
78+
79+
// NoopQuotaLimitFunc returns 0 which means no limit.
80+
func NoopQuotaLimitFunc(ctx context.Context) int64 {
81+
return 0
82+
}

0 commit comments

Comments
 (0)